Java8 Stream의 parallel 처리

이번 글에서는 Java8 Stream의 parallel에 대해 살펴보도록 하겠습니다.

Stream API는 설계 시 실제 실행에 대한 부분을 추상화 시켜 각 Element에 대한 처리가 어떻게 실행되는지에 대해서는 프로그래머는 신경쓰지 않이도 된다. 즉, 병렬 처리 등도 쉽게 할 수 있게 설계되어 있다. 하지만 쉽다는 의미는 좋다, 효과적이다 라고 할 수 없기 때문에 Stream에서 병렬 처리의 원리를 알고 있어야 필요한 시점에 적절하게 "쉽다" 라는 장점을 활용할 수 있다.

fork_join_framework

(그림: https://javatechnocampus.wordpress.com/2015/10/03/544/)

일반적인 병렬 처리

일반적으로 Java에서 병렬 처리를 위해서는 ExecutorService를 이용한다. ExecutorService를 이용하지 않고 직접 개발하는 경우도 있지만 대부분은 ExecutorService를 이용하여 다음과 같이 병렬 처리 로직을 만들 수 있다. 필자의 경우 한번 손에 익은 것으로 많이 구현하는데 그냥 Thread와 Queue 등을 이용해서 만드는 방식을 선호하는 편이지만, 언어 차원에서 제공해주는 기능을 사용하는 것이 가장 안정적이고 코드의 가독성도 좋다고 할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < 10; i++) {
 final int index = i;
 futures.add(executor.submit(() -> {
   Thread.sleep(5000);
   return Thread.currentThread().getName() + ", index=" + index + ", ended at " + new Date();
 }));
}
for (Future<String> eachFuture: futures) {
 String result = eachFuture.get();
 System.out.println("Thread result: " + result);
}
executor.shutdown();

위 코드는 0 ~ 10까지의 item을 입력으로 받은 Thread가 어떤 업무를 수행하고(Thread.sleep) 결과를 출력하는 기능을 수행한다. 이 코드의 실행 결과는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
Thread result: pool-1-thread-1, index=0, ended at Tue Jul 12 00:52:16 KST 2016
Thread result: pool-1-thread-2, index=1, ended at Tue Jul 12 00:52:16 KST 2016
Thread result: pool-1-thread-3, index=2, ended at Tue Jul 12 00:52:16 KST 2016
Thread result: pool-1-thread-4, index=3, ended at Tue Jul 12 00:52:16 KST 2016
Thread result: pool-1-thread-5, index=4, ended at Tue Jul 12 00:52:16 KST 2016
Thread result: pool-1-thread-4, index=5, ended at Tue Jul 12 00:52:21 KST 2016
Thread result: pool-1-thread-3, index=6, ended at Tue Jul 12 00:52:21 KST 2016
Thread result: pool-1-thread-1, index=7, ended at Tue Jul 12 00:52:21 KST 2016
Thread result: pool-1-thread-5, index=8, ended at Tue Jul 12 00:52:21 KST 2016
Thread result: pool-1-thread-2, index=9, ended at Tue Jul 12 00:52:21 KST 2016

Thread pool 갯수를 5개만 만들었기 때문에 5개는 동시에 수행되고 다시 5초 후(sleep을 5초 주었기 때문에)에 나머지 다섯개가 실행되는 것을 볼 수 있다.

Java8의 Stream을 이용한 병렬 연산

Stream을 이용하면 병렬 연산을 쉽고 간단하게 처리할 수 있다. 기존 Stream pipeline에 parallel() 이라는 signature method 만 추가해주면 된다.

1
2
3
4
5
6
IntStream.range(0, 10).parallel().forEach(index -> {
  System.out.println("Starting " + Thread.currentThread().getName() + ",    index=" + index + ", " + new Date());
  try {
    Thread.sleep(5000);
  } catch (InterruptedException e) { }
});

실제 환경에서 이런 코드는 거의 사용하지 않겠지만 Stream에서 Thread 가 어떻게 생성되는지 확인하기 위해 위와 같은 테스트 코드를 만들었다.  parallel() 메소드는 실제 이 메소드가 호출 되는 시점에 어떤 작업을 처리하는 것이 아니라 이 pipeline을 병렬로 처리하라고 하는 정보를 제공해주는 메소드이다.

코드의 실행 결과는 다음과 같다.

1
2
3
4
5
6
7
8
9
10
Starting ForkJoinPool.commonPool-worker-7, index=4, Tue Jul 12 11:02:52 KST 2016
Starting main, index=6, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-6, index=5, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-1, index=2, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-3, index=1, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-2, index=8, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-5, index=7, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-4, index=0, Tue Jul 12 11:02:52 KST 2016
Starting ForkJoinPool.commonPool-worker-2, index=9, Tue Jul 12 11:02:57 KST 2016
Starting ForkJoinPool.commonPool-worker-7, index=3, Tue Jul 12 11:02:57 KST 2016

결과 로그를 보면 8개의 Thread가 11:02:52초에 시작되었고, 5초후(프로그램 내에 sleep을 5초로 했음)에 8개 Thread가 종료되면서 나머지 2개가 실행되는 것을 볼 수 있다.  처음에 실행되었던 8개의 Thread ID를 보면 하나는 Main Thread가 실행하였고 나머지 7개는 ForkJoinPool에서 생성된 worker 7개 생성되었다. 즉 Stream의 parallel은 미리 만들어져 있는 ForkJoinPool을 이용해서 worker를 생성하는 것을 알 수 있다. ForkJoinPool은 ForkJoinTask를 실행하는 ExecutorService 라고 할 수 있다.

Pool의 크기 제어는?

그러면 앞의 ExecutorService를 사용하면서 설정한 동시에 실행 가능한 Thread의 갯수는(newFixedThreadPool(5)) Stream의 parallel에서는 어떻게 결정되는 것일까?  테스트 환경에서는 Pool의 갯수가 자동으로 8로 설정되었는데, ForkJoinPool의 commonPool은 실행되는 장비의 CPU 갯수 만큼 Pool의 사이즈를 지정한다. Java doc의 설명에는 Runtime.availableProcessors() 를 호출한 결과값을 사용하고 있다고 한다. 필자의 테스트 환경인 맥북 프로의 경우 물리적으로는 4 Core이지만 논리적으로는 8Core로 표시되기 때문에 Pool의 크기가 8로 설정되었다.

개발자가 임의의로 Pool의 크기를 조절하는 방법은 두가지가 있는데 첫 번째 방법은 다음과 같이 "java.util.concurrent.ForkJoinPool.common.parallelism" Property 값을 설정하는 방법이다.

1
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")

이 방법은 현재 실행되는 프로세스의 모든  ForkJoinPool의 commonPool에 영향을 미칠 수 있기 때문에 가급적 사용하지 않는 것을 권장한다.

두번째 방법은 기본 commonPool을 사용하지 않고 개발자가 정의한 ForkJoinPool을 사용하는 방법이다. 다음 예제 코드와 같이 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
ForkJoinPool myPool = new ForkJoinPool(5);
myPool.submit(() -> {
  IntStream.range(0, 10).parallel().forEach(index -> {
    System.out.println("Starting " + Thread.currentThread().getName() + ", index=" + index + ", " + new Date());
    try {
      Thread.sleep(5000);
    } catch (InterruptedException e) {
    }
  });
}).get();

ForkJoinPool의 생성자에 Pool의 갯수를 전달하면 된다. 실행 결과는 아래와 같다. ForkJoinPool 생성자에서 설정한 값 만큼 5개의 Thread(worker 1부터 5까지)만 생성되었고 5초 sleep 후에도 새로운 Thread가 만들어지지 않고 기존에 만들어진 Thread가 나머지 element를 처리하였다.

1
2
3
4
5
6
7
8
9
10
Starting ForkJoinPool-1-worker-2, index=2, Tue Jul 12 11:48:52 KST 2016
Starting ForkJoinPool-1-worker-4, index=1, Tue Jul 12 11:48:52 KST 2016
Starting ForkJoinPool-1-worker-5, index=5, Tue Jul 12 11:48:52 KST 2016
Starting ForkJoinPool-1-worker-1, index=6, Tue Jul 12 11:48:52 KST 2016
Starting ForkJoinPool-1-worker-3, index=8, Tue Jul 12 11:48:52 KST 2016
Starting ForkJoinPool-1-worker-2, index=4, Tue Jul 12 11:48:57 KST 2016
Starting ForkJoinPool-1-worker-4, index=0, Tue Jul 12 11:48:57 KST 2016
Starting ForkJoinPool-1-worker-5, index=7, Tue Jul 12 11:48:57 KST 2016
Starting ForkJoinPool-1-worker-1, index=3, Tue Jul 12 11:48:57 KST 2016
Starting ForkJoinPool-1-worker-3, index=9, Tue Jul 12 11:48:57 KST 2016

ForkJoin 프레임워크

ForkJoin 프레임워크은 자바에서 병렬 처리를 지원하는 프레임워크로 새롭게 소개된 개념이 아니라 Java 7 부터 포함된 개념이다. 하나의 작업을 여러개의 작은 Task로 split 한 다음 이 Task를 fork 하여(물리적인 프로세스로 fork 하는 것이 아니라 실제로는 Thread) 병렬로 처리하고 각 Task의 수행된 결과를 병합하기 위해 join 단계를 실행하는 프레임워크이다.

fork_join_framework2

(이미지: https://javatechnocampus.wordpress.com/2015/10/03/544/)

ForkJoin 프레임워크의 ForkJoinTask는  아래 유사 코드에서 설명하는 것과 같이 처리 대상을 두 개로 분할하여 첫번째 Task는 fork 시켜 async로 실행 시킨다. 두번째 Task는 compute() 메소드를 호출하여(개념적으로 보면 재귀 호출과 유사) Task를 계속 분할, 생성, 실행한다. 더 이상 분할이 되지 않는 최소 단위까지 이런 작업을 반복하며, 분할이 되지 않는 최소 단위가 되었을 때 처리 대상이 되는 데이터를 이용하여 순차적 처리를 수행한다.

다음 설명과 예제 코드는 "자바 8 인 액션, 한빛미디어" (http://www.hanbit.co.kr/store/books/look.php?p_code=B1999551123) 의 내용을 인용하였다(저작권의 문제가 있으면 알려주시면 바로 삭제 하겠습니다.)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public T compute() {
  if (더이상 분할 필요 없음) {
    //순차적으로 실제 계산 처리
  } else {
    // left 절반 분할
    ForkJoinTask leftTask = new ForkJoinTask(처리 대상 데이터의 절반);
    // Thread pool에 추가하여 실행 요청(실제 실행될 때 다시 compute 호출됨, 그때 필요하면 다시 분할이 됨)
    leftTask.fork();
    // 나머지 절반 분할
    ForkJoinTask rightTask = new ForkJoinTask(나머지 절반);     
    // computer 호출(computer 내부에서 필요하면 다시 분할, 즉 Recursive 호출 개념)  
    T rightResult = rightrightTask.compute();
    T leftResult = leftTask.join(); //leftTask는 비동기 실행되어 결과는 join을 통해서 받음
    return mergeResult(leftResult, rightResult);  //left, right 결과를 merge 하여 반환
  }
}

이 가상 코드의 개념으로 실제 실행되는 코드를 만들면 다음과 같다. 예제는 정해진 int[]의 모든 element의 sum을 계산하는 프로그램으로 int[]를 여러개의 작은 단위로 분할하여 병렬로 계산을 수행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ForkJoinTest {
  static class ForkJoinSumTask extends RecursiveTask<Integer> {
    static final int SPLIT_THRESHOLD = 10;
    int[] values;
    int startPos;
    int endPos;
    public ForkJoinSumTask(int[] values) {
      this(values, 0, values.length);
    }
    public ForkJoinSumTask(int[] values, int startPos, int endPos) {
      this.values = values;
      this.startPos = startPos;
      this.endPos = endPos;
    }
    @Override
    protected Integer compute() {
      int computeTargetValueLength = endPos - startPos;
      if (computeTargetValueLength <= SPLIT_THRESHOLD) {
        return sumValuesSequentially();
      }
      int splitPosition = startPos + computeTargetValueLength/2;
      ForkJoinSumTask left = new ForkJoinSumTask(values, startPos, splitPosition);
      left.fork();
      ForkJoinSumTask right = new ForkJoinSumTask(values, splitPosition, endPos);
      int rightTaskResult = right.compute();
      int leftTaskResult = left.join();
      return leftTaskResult + rightTaskResult;
    }
    private int sumValuesSequentially() {
      int sum = Arrays.stream(values, startPos, endPos).reduce(0, (a, b) -> a + b);
      System.out.println(Thread.currentThread().getName() + ", startPos=" + startPos + ", endPos=" + endPos + ", sum=" + sum);
      return sum;
    }
  }
  public static void main (String[] args) throws Exception {
    int[] values = IntStream.rangeClosed(1, 80).toArray();
    ForkJoinTask<Integer> task = new ForkJoinSumTask(values);
    int totalSum = new ForkJoinPool().commonPool().invoke(task);
    System.out.println("Total sum: " + totalSum);
  }
}

위 코드에서 자세히 봐야 할 부분은 compute() 메소드이다. 현재의 task가 처리해야 하는 대상 데이터의 갯수가 threshold 값보다 작으면 순차적 계산을 실행하고 결과를 반환한다. 그렇지 않으면 values를 균등하게 두 부분으로 분리하여 다시 Task를 생성한다. 앞의 가상 코드에서 설명한 내용과 위의 코드 구현을 비교해 보고, 처리 결과를 확인하면 어떻게 Task가 분할되고 병렬 처리되는지 확인할 수 있다. 다음은 위 코드의 처리 결과이다. 8개의 Task가 생성되었고 각 Task는 10개의 element 씩 처리하였으며 8개의 ForkJoin Worker에 의해 병렬 처리 되었다.

1
2
3
4
5
6
7
8
9
ForkJoinPool.commonPool-worker-5, startPos=0, endPos=10, sum=55
main, startPos=70, endPos=80, sum=755
ForkJoinPool.commonPool-worker-3, startPos=20, endPos=30, sum=255
ForkJoinPool.commonPool-worker-4, startPos=50, endPos=60, sum=555
ForkJoinPool.commonPool-worker-2, startPos=10, endPos=20, sum=155
ForkJoinPool.commonPool-worker-7, startPos=40, endPos=50, sum=455
ForkJoinPool.commonPool-worker-6, startPos=60, endPos=70, sum=655
ForkJoinPool.commonPool-worker-1, startPos=30, endPos=40, sum=355
Total sum: 3240

Stream의 paralle과 ForkJoin 프레임워크의 관계

이제 Stram의 parallel() 호출에 의한 병렬 처리와 ForkJoin 프레임워크와의 관계를 살펴보자. 다음 코드는 IntStream이 parallel()로 처리될 때 어떤 call stack을 가지고 있는지 확인하는 코드와 실제 call stack 이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
IntStream.range(0, 1).parallel().forEach(i -> {
  Thread.dumpStack();
});
java.lang.Exception: Stack trace
	at java.lang.Thread.dumpStack(Thread.java:1329)
	at com.jobplanet.s2graph.ParallelTest.lambda$main$0(ParallelTest.java:58)
	at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205)
	at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
	at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.IntPipeline.forEach(IntPipeline.java:404)
	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:560)
	at com.jobplanet.s2graph.ParallelTest.main(ParallelTest.java:57)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

Stack 중에 앞에서 설명한 ForkJoin 프레임워크와 유사한 내용이 보이는데 바로 compute() 메소드 호출 부분이다. compute() 메소드는 ForEachOp$ForEachTask 클래스의 멤버 변수인데 실제 JDK 소스 코드는 다음과 같이 구현되어 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void compute() {
  Spliterator<S> rightSplit = spliterator, leftSplit;
  long sizeEstimate = rightSplit.estimateSize(), sizeThreshold;
  if ((sizeThreshold = targetSize) == 0L)
      targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate);
  boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags());
  boolean forkRight = false;
  Sink<S> taskSink = sink;
  ForEachTask<S, T> task = this;
  while (!isShortCircuit || !taskSink.cancellationRequested()) {
      if (sizeEstimate <= sizeThreshold ||
          (leftSplit = rightSplit.trySplit()) == null) {
          task.helper.copyInto(taskSink, rightSplit);
          break;
      }
      ForEachTask<S, T> leftTask = new ForEachTask<>(task, leftSplit);
      task.addToPendingCount(1);
      ForEachTask<S, T> taskToFork;
      if (forkRight) {
          forkRight = false;
          rightSplit = leftSplit;
          taskToFork = task;
          task = leftTask;
      }
      else {
          forkRight = true;
          taskToFork = leftTask;
      }
      taskToFork.fork();
      sizeEstimate = rightSplit.estimateSize();
  }
  task.spliterator = null;
  task.propagateCompletion();
}

중간에 ShortCircuit 등은 내용은 findFirst() 등과 같이 Stream 처리 도중 특정 조건이 되면 Stream을 중단하기 위한 등의 처리이기 때문에 병렬 처리를 확인하기 위해서는 무시해도 된다. 먼저 threshHold 값을 구하고 Spliterator의 trySplit() 메소드를 이용하여 Stream를 분할한다. 앞의 int[] sum 예제에서는 int[] 값이라 크기를 알기 때문에 쉽게 분할이 가능하지만, Stream의 경우 element의 전체 size 를 알수 없는 경우도 많기 때문에(File 에서 데이터를 읽는 경우 등) 단순히 분할하는 것이 쉽지 않다. 이것을 위한 기능이 Spliterator의 trySplit()  메소드이다. Spliterator에 대해서는 뒤에서 설명한다. 일단 split 하여 Task를 만든 후 fork()를 호출하고 있다. 이후 내용에 대해서는 자세히 분석은 하지 않았지만 right task에 대해 compute() 를 다시 호출하는 부분은 fork() 내부에서 처리할 것으로 예상되고 join()은 task.propagationCompletation() 에서 처리할 것으로 예상된다.

 Spliterator

Spliterator는 클래스명으로부터 그 기능을 유추해 볼 수 있는데 Split 되는 Iterator 이다. 이 클래스 명을 보자마자 가장 먼저 떠오는 것은 바로 Hadoop MapReduce 프레임워크의 InputFormat이다. Hadoop의 InputFormat은 병렬 처리를 위해 두 가지 중요한 일을 수행한다. 첫 번째 역할은 분산/병렬의 처리의 단위가 되는 정보를 제공한다. 이 기능은 getSplit() 메소드를 통해 구현된다. 두 번째 역할은 입력 데이터를 읽어 하나의 레코드로 만들어 map() 메소드의 입력 값으로 전달된다. 이 기능은 InputFormat의 getRecordReader() 메소드에서 반환되는 Reader  객체에 의해 수행된다.

Stream parallel 의 Spliterator도 Hadoop의 InputFormat과 거의 유사한 기능을 수행한다고 할 수 있다. trySplit() 메소드에서는 Stream을 병렬 처리할 수 있는 단위로 분할한다. tryAdvance() 메소드에서는 각 element를 소비하는 기능을 수행한다. 소비한다는 의미는 실제 Stream으로 부터 데이터를 읽어서 Consumer를 호출한다는 의미이다. 다음 코드는 Java 8 에서 기본적으로 제공하는 ArraySpliterator의 구현 중 trySplit(), tryAdvance()의 구현 부분이다. 구현 내용을 보면 앞에서 설명한 ForkJoinSumTask에서 Task를 분할하는 코드와 비슷하다는 것을 알 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Spliterator<T> trySplit() {
    int lo = index, mid = (lo + fence) >>> 1;
    return (lo >= mid)
           ? null
           : new ArraySpliterator<>(array, lo, index = mid, characteristics);
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
    if (action == null)
        throw new NullPointerException();
    if (index >= 0 && index < fence) {
        @SuppressWarnings("unchecked") T e = (T) array[index++];
        action.accept(e);
        return true;
    }
    return false;
}

tryAdvance() 에서는 array에 저장된 하나의 값을 가져와 action의 파라미터로 전달하여 action을 실행한다. action이 System.out::println 인 경우 실제로는 System.out.println(array[index++]); 이 실행되는 것이다.

좀 더 실질적인 예제를 위해 사용자 정의 Spliterator를 구현해 보자. 파일에서 라인을 읽어 데이터를 특정 저장소(필자의 경우 S2Graph)의 API를 호출하여 저장을 하는 프로그램이다. 한번 호출 시 한 라인씩 저장하면 성능이 나오지 않기 때문에 여러 라인의 데이터를 묶어 한번에 저장하는 기능이 필요하다. 이를 위해 Stream의 parallel 기능을 이용하는 경우를 고려해보자. 단순히 Stream<String> stream = Files.lines(file) 과 같이 한 라인씩 읽는 Stream을 생성하여 처리할 경우 묶음으로 처리할 수 없게 된다. 원하는 기능을 위해 다음과 같이 구현하였다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public static void main (String[] args) throws Exception {
  int batchSize = 1_000;
  int taskLineNum = 100_000;
  try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(dataFile)))) {
    MultiLineSpliterator spliterator = new MultiLineSpliterator(reader, batchSize, taskLineNum);
    Stream<List<String>> stream = StreamSupport.stream(spliterator, false);
    stream.parallel().forEach(S2GraphUtil::insertVertex);
  } catch (FileNotFoundException e) {
    e.printStackTrace();
  } catch (IOException e) {
    e.printStackTrace();
  }
}      
class MultiLineSpliterator implements Spliterator<List<String>> {
    List<String> lines;
    int lineBuffSize;
    int batchSize;
    BufferedReader reader;
    Iterator<String> it;
    public MultiLineSpliterator(BufferedReader reader, int lineBuffSize, int batchSize) {
      this.reader = reader;
      this.lineBuffSize = lineBuffSize;
      this.batchSize = batchSize;
      this.lines = new ArrayList<>(lineBuffSize);
    }
    @Override
    public boolean tryAdvance(Consumer<? super List<String>> action) {
      if (it == null) {
        it = lines.iterator();
      }
      List<String> batchList = new ArrayList<>(batchSize);
      int count = 0;
      while (it.hasNext()) {
        batchList.add(it.next());
        count++;
        if (count >= batchSize) break;
      }
      action.accept(batchList);
      return it.hasNext();
    }
    @Override
    public Spliterator<List<String>> trySplit() {
      try {
        String line;
        while ((line = reader.readLine()) != null) {
          lines.add(line);
          if (lines.size() >= lineBuffSize) {
            return new MultiLineSpliterator(reader, lineBuffSize, batchSize);
          }
        }
        if (lines.isEmpty()) {
          return null;
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return null;
    }
    @Override
    public long estimateSize() {
      return lines.isEmpty() ? Long.MAX_VALUE : lines.size();
    }
    @Override
    public int characteristics() {
      return IMMUTABLE;
    }
}

위 코드에 대한 상세 설명은 생략한다. 핵심은  main()  메소드에서는 Spliterator를 이용하여 Stream을 생성하였고, MultiLineSpliterator의 trySplit() 에서 정해진 라인 수 만큼 element를 모아서 반환한다.

위 코드를 조금 더 개선하면 BufferedReader를 MultiLineSpliterator로 전달하는 것이 아니라 Files.lines()를 이용하여 생성한 Stream을 전달하는 것이 아무래도 조금 더 Stream 스러운 코드라고 할 수 있을 것 같은데 아직 완성하지는 못했다. 이 부분에 대해서는 "Java 8 인 액션" 이나 다음 StackOverFlow에 있는 내용을 참고해서 진행할 예정이다.

http://stackoverflow.com/questions/25408350/is-there-a-good-way-to-extract-chunks-of-data-from-a-java-8-stream

중간 연산에 대한 제어

일반적인 상황에서는 다음과 같이 Stream API를 사용하게 되는 경우가 많다. 아래 예제는 N개의 라인에서 word를 추출하고, 중복된 word는 제거하여 하나만 출력하면서 정렬 시키는 예제이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
List<String> phrases = Arrays.asList(
   "sporadic perjury",
   "confounded skimming",
   "incumbent jailer",
   "confounded jailer");
List<String> uniqueWords = phrases
   .stream()
   .parallel()
   .flatMap(phrase -> Stream.of(phrase.split(" ")))
   .distinct()
   .sorted()
   .collect(Collectors.toList());
System.out.println("Unique words: " + uniqueWords);

이와 같은 Stream 파이프라인에서 distinct, sorted 등은 어떻게 병렬로 처리되는 것일까? distinct(), sorted()와 같은 중간 단계 연산들(intermediate operation) 중 일부 연산자들은 연산자 내부에 상태(State) 정보를 가지고 있다. 모든 Worker 들은 독립적으로 다른 Thread에 의해 실행도지만 처리 결과를 이런 상태 정보에 저장하고, distinct() 연산자는 이 정보를 이용하여 정해진 기능을 수행한다. 즉, 내부적으로 어떤 공용 변수를 만들어 놓고 각 worker 들이 이 변수에 접근할 경우 동기화 작업(synchronized) 등을 통해 변수를 안전하게 유지하면서 처리하고 있다. 내부적으로는 기존 Thread 작업 시 개발자가 해줘야 했던 동기화 등의 작업을 모두 수행하고 있는 것이다. 이런 이유 때문에 parallel을 잘못 사용할 경우 순차적 실행보다 더 느릴 수도 있다.

Stream parallel을 실제 환경에서는?

Stream의 parallel에 대해 여러가지 논쟁과 토론이 많다. 주로 내용들은 Deadlock 상황이나 Thread가 의도하지 않게 많이 만들어 질 수 있다라는 내용이다. 주로 원인은 ForkJoinPool을 사용하면서 발생하는 문제이다. 여러 블로그나 문서를 참조하면 다음과 같은 parallel 사용 시 주의 사항이 많이 언급되고 있다.

  • parallel stream 내부에서 다시 parallel stream 사용할 경우 synchronized 키워드는 deadlock 을 발생시킬 수 있다.
  • 특정 Container 내부에서 사용하는 경우에는 parallel은 신중하게 사용해야 하며, Container가 default pool을 어떻게 처리하는지 정확하게 모르는 경우에는 Default pool은 절대 사용하지 마라.
  • Java EE Container에서는 Stream의 parallel을 사용하지 마라.

아직 많은 내용을 살펴 보지는 못했지만 현재까지 살펴본 내용으로 판단해보면 필자의 의견은 다음과 같다.

  • 간단하게 독립된 프로그램으로 아주 큰 파일 또는 데이터를 가공하는 작업을 할때는 parallel을 이용하는 것도 쉽게 개발할 수 있는 하나의 방법이라고 생각한다.
  • 하지만 데몬 프로그램이나 WAS에서 동작하는 기능 등에서는 권장하지 않는다.

글을 마치며

처음에는 파일에서 데이터를 읽어 S2Graph에 저장하는 간단한 기능을 구현하려고 하였는데 Java 8 스타일로 만들어 보자는 생각을 했다. 저장해야할 데이터가 많아 병렬 처리 하려고 parallel을 이용하려고 조금 생각해보니 multi line에 대한 처리가 필요했서 Stream의 parallel을 보기 시작한 것이 결국은 이 글을 작성하는데까지 왔다. 어쨌든 Java 8의 내부 동작 원리 중 일부를 조금 자세히 보았고 이전 Java 와는 완전히 다른 언어라는 생각을 하게 되었으며, 새로운 개념을 잘 사용만 하면 좋은 도구가 될 수 있다고 생각한다. 다른 언어를 얕게 아는 것보다 하나의 언어를 깊게 아는 것이 더 중요하다고 생각하기에 오늘도 삽질과 디버거의 늪에 빠져 있다.

Stream에도 익숙하지 않은 상태에서 parallel 까지 확인을 하려다 보니 글 중간 일부 오류가 있을 것으로 예상됩니다. 오류가 있거나 맞지 않는 내용이 있으면 바로 알려 주시면 반영하도록 하겠습니다.


Popit은 페이스북 댓글만 사용하고 있습니다. 페이스북 로그인 후 글을 보시면 댓글이 나타납니다.