Backend

자바에서 페이징 처리 구현하기(feat. batch)

keepbang 2022. 12. 24. 11:49

개발을 하다보면 조회할 때 페이징 처리를 해야하는 경우가 생긴다.

이 글에서는 조회할 때가 아닌 데이터를 저장/수정하거나 삭제 할 때 사용할 수 있는 페이징 방법을 소개해 볼려고 한다.


페이징 처리

데이터를 처리 할 때 너무 많은양의 데이터를 처리할 경우 db성능이나 timeout, 속도등의 문제로 데이터를 일정한 개수만큼 나눠서 처리하는 것을 말한다.

 

주로 API에서 데이터를 조회할 때 많이 사용했었는데 이번에는 배치에서 데이터를 제어할 때 페이징 처리를 구현하게 되었다.


공통 코드

public class LimitRangeUtil<T> {

  // default size : 1000
  private int fetchSize = 1_000;

  public LimitRangeUtil(int fetchSize) {
    this.fetchSize = fetchSize;
  }

  // 반복 작업을 위한 IntStream을 구한다.
  public IntStream getRangeStream(int size) {
    int cycleCount = size / fetchSize;
    int modSize = size % fetchSize;
    if (modSize > 0) {
      cycleCount++;
    }
    return IntStream.range(0, cycleCount);
  }

  // fetch size만큼 list에서 데이터를 짤라서 return한다.
  public CopyOnWriteArrayList<T> copyByList(List<T> list, int i) {
    int fromIndex = i * fetchSize;
    int toIndex = (i + 1) * fetchSize;
    if (toIndex >= saveDtoList.size()) {
      toIndex = saveDtoList.size();
    }
    return new CopyOnWriteArrayList<>(saveDtoList.subList(fromIndex, toIndex));
  }

}

여러곳에서 공통으로 사용하기 위해 제네릭을 추가하였고 사용 할 때 fetch count를 동적으로 변경하기 위해서 입력 받을 수 있게 작성했다.

 

Method 설명

getRangeStream(int size)

  • 리스트의 사이즈를 받아서 반복문을 처리할 수 있는 intStream으로 반환한다.
  • 나머지 값이 있을 경우 마지막까지 반복 작업을 할 수 있도록 처리했다.

Test code

@ParameterizedTest
@CsvSource(value = {
    "1000,1", "999,1", "1001,2", "10000,10", "9999,10", "10001,11"
})
public void limitRangeTest(int dataSize, int maxIndex) {
  // given
  int fetchSize = 1_000;

  List<Integer> dataList = new ArrayList<>();

  for (int i = 0; i < dataSize; i++) {
    dataList.add(i);
  }

  LimitRangeUtil<Integer> util = new LimitRangeUtil<>(fetchSize);

  // when
  final IntStream rangeStream = util.getRangeStream(dataList.size());

  // then
  assertThat(rangeStream.count()).isEqualTo(maxIndex);
}

 

copyByList(List<T> list, int i)


아래처럼 생성해서 사용하면된다.

LimitRangeUtil<String> limitRangeUtil = new LimitRangeUtil<>(500);

 

먼저 반복을 위한 IntStream을 만들어준다.

final IntStream rangeStream = limitRangeUtil.getRangeStream(ids.size());

 

IntStream을 병렬로 돌려주면서 데이터를 조회한다.

...
    return rangeStream.parallel().mapToObj(i -> {
      final CopyOnWriteArrayList<String> copyIds = limitRangeUtil.copyByDtoList(
          ids, i);

      return repository.findAllByIdIn(ids);
    }).flatMap(Collection::parallelStream)
    .toList();
...

 

데이터를 저장할 때에도 비슷하게 하면 된다. 저장할 때에도 페이징을 했던 이유는 mysql에서 한번에 10만개 이상의 데이터를 제어할려고하면 아래와 같은 에러가 발생했기 때문이다...

audience.-.primary: vttablet: rpc error: code = Aborted desc = caller id: admin: row count exceeded 100000 (CallerID: admin)

사이즈를 적절하게 조정해서 사용해 주면 된다.

    LimitRangeUtil<Member> limitRangeUtil = new LimitRangeUtil<>(1_000);
    final IntStream rangeStream = limitRangeUtil.getRangeStream(Members.size());

    rangeStream.parallel().forEach(i -> {
      final CopyOnWriteArrayList<Member> copyMembers = limitRangeUtil.copyByDtoList(
          Members, i);
      repository.saveAll(copyMembers);
    });​