Parallel Collection Processing: Leveraging Batching (2/3)

栏目: IT技术 · 发布时间: 4年前

内容简介:In this article, we’ll see how to increase the performance and cap maximum parallelism by introducing batching to our home-made parallel streams.This article is a part of the series about parallel collection processing in Java without parallel streams:

In this article, we’ll see how to increase the performance and cap maximum parallelism by introducing batching to our home-made parallel streams.

This article is a part of the series about parallel collection processing in Java without parallel streams:

Parallel Collection Processing: Without Parallel Streams (1/3)

The Problem

At the end of the previous article, we ended up with fully functional home-made async parallel streams:

CompletableFuture<List<Integer>> results = integers.stream()
  .map(i -> CompletableFuture.supplyAsync(() -> process(i), executor))
  .collect(collectingAndThen(toList(), l -> allOfOrException(l)));

This implementation features some interesting characteristics.

Every element from the source stream is represented as a separate entry in your thread pool’s work queue.

The good thing about it is that threads compete for each item separately which results in a primitive form of work-stealing . If some threads finish early, they continue picking up items from the work queue until it’s empty.

At the same time, the bad thing is that this brings a performance penalty(due to thread contention) in situations where work-stealing doesn’t bring significant value.

Is this something to be concerned about? Let’s see the actual numbers.

Benchmarks

In order to measure the performance overhead, a collection with 1000 elements was processed using an identity function with 1,10,100, and 1000 threads respectively. Minimizing the processing time allows us to focus on the infrastructure overhead instead:

private static final List<Integer> source = IntStream
  .range(0, 1000)
  .boxed()
  .collect(toList());

@Benchmark
public List<Integer> no_batching(BenchmarkState state) {
    return ParallelStreams.inParallel(
      source, i -> i, state.executor).join();
}

Before we have a look at the results, try to run a thought experiment and then compare your intuition with actual numbers.

How faster will it be to process the collection using 1000 threads instead of one?

Results

Parallel Collection Processing: Leveraging Batching (2/3)

Threads    Mode  Cnt  Score      Error  Units
1          avgt  5    0.226 ±    0.002  ms/op
10         avgt  5    0.278 ±    0.005  ms/op
100        avgt  5    0.317 ±    0.005  ms/op
1000       avgt  5    0.905 ±    0.097  ms/op

Intel i7-4980HQ (8) @ 2.80GHz

Surprised? When the number of threads increases, synchronization costs become more and more significant which makes the case with 1000 threads more than three times slower than single thread processing!

What if we could decrease the overhead(thread contention) by letting individual threads compete for batches instead of individual items?

Introducing Batching

Luckily, introducing batching is less complicated than it sounds.

It’s a good idea to not burden consumers of your API with implementation details so the API remains unchanged. However, we need to introduce one extra parameter to control the number of batches to create(which we would need to introduce anyway later on):

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {
    return ...
}

In order to introduce batching, we need to solve three distinct problems:

  1. How to partition a source collection into a collection of batches
  2. How to write an adapter for mapping functions so that it processes a collection of elements
  3. How to combine partial results into a single collection

In order to partition a list of elements, we could reuse the existing splitting mechanism provided by existing Spliterators but in our case, we know that the data structure that we need to partition is ArrayList, so we can write a dedicated partitioning iterator to be able to split a list into N as-equal-as-possible parts:

final class BatchingStream<T> implements Iterator<List<T>> {

    private final List<T> source;
    private final int size;

    private int chunks;
    private int chunkSize;
    private int leftElements;
    private int i;

    private BatchingStream(List<T> list, int numberOfParts) {
        source = list;
        size = list.size();
        chunks = numberOfParts;
        chunkSize = (int) Math.ceil(((double) size) / numberOfParts);
        leftElements = size;
    }

    private static <T> Iterator<List<T>> from(
      List<T> source, int chunks) {
        return new BatchingStream<>(source, chunks);
    }

    static <T> Stream<List<T>> partitioned(
          List<T> list, int numberOfParts) {

        int size = list.size();
        
        if (size == numberOfParts) {
            return list.stream().map(Collections::singletonList);
        } else if (numberOfParts == 1) {
            return Stream.of(list);
        } else if (size == 0) {
            return Stream.empty();
        } else {
            return stream(spliterator(from(
              list, numberOfParts), numberOfParts, ORDERED), false);
        }
    }

    @Override
    public boolean hasNext() {
        return i < size && chunks != 0;
    }

    @Override
    public List<T> next() {
        List<T> batch = source.subList(i, i + chunkSize);
        i = i + chunkSize;
        leftElements = leftElements - chunkSize;
        chunkSize = (int) Math.ceil(((double) leftElements) / --chunks);
        return batch;
    }
}

Implementation considerations of the above will be covered in a separate article.

In order to adapt a mapping function, we need to prepare an adapter that converts Function<T, R> to Function<List<T>, List<R>> which is fairly straightforward :

private static <T, R> Function<List<T>, List<R>> batching(
  Function<T, R> mapper) {
    return batch -> {
        List<R> list = new ArrayList<>(batch.size());
        for (T t : batch) {
            list.add(mapper.apply(t));
        }
        return list;
    };
}

If we combine the above, we get something like:

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {

    return BatchingStream.partitioned(source, batches)
      .map(batch -> supplyAsync(
        () -> batching(mapper).apply(batch), executor))
      .collect(collectingAndThen(toList(), l -> allOfOrException(l)))
      ...
}

And the last thing left to do is to flatten the result ( CompletableFuture<List<List<R>> ) into a single list, which can be achieved using Stream#flatMap, or a simple loop. In this case, I choose a loop in order to avoid extra overhead of the Stream API.

So the final result is:

static <T, R> CompletableFuture<List<R>> inParallelBatching(
  List<T> source, Function<T, R> mapper, Executor executor, int batches) {

    return BatchingStream.partitioned(source, batches)
      .map(batch -> supplyAsync(
        () -> batching(mapper).apply(batch), executor))
      .collect(collectingAndThen(toList(), l -> allOfOrException(l)))
      .thenApply(list -> {
          List<R> result = new ArrayList<>(source.size());
              for (List<R> rs : list) {
                  result.addAll(rs);
              }
              return result;
       });
}

Benchmarks

Finally, we can compare results with our starting point! Keep in mind that in our benchmarks number of batches is equal to the number of threads.

Again, before we have a look at the results, try to run a thought experiment and then compare your intuition with actual numbers.

Parallel Collection Processing: Leveraging Batching (2/3)

Benchmark      Threads Mode Cnt Score   Error Units
----
no_batching    1       avgt 5   0.226 ± 0.002 ms/op
no_batching    10      avgt 5   0.278 ± 0.005 ms/op
no_batching    100     avgt 5   0.317 ± 0.005 ms/op
no_batching    1000    avgt 5   0.905 ± 0.097 ms/op
with_batching  1       avgt 5   0.015 ± 0.001 ms/op
with_batching  10      avgt 5   0.020 ± 0.001 ms/op
with_batching  100     avgt 5   0.071 ± 0.028 ms/op
with_batching  1000    avgt 5   0.985 ± 0.054 ms/op

Intel i7-4980HQ (8) @ 2.80GHz

The result is really impressive. As you can see introducing batching made it possible to decrease overhead in almost all the cases.

In the best-case scenario (1 thread), the overhead was ~9 times lower, and in the worst-case scenario (1000 threads), the overhead was resulting in worse results than without batching at all. At the end of the day, 1000 threads processing 1000 elements results in 1000 batches.

What a great success, isn’t it?

Let me recall the warning you get each time you run JMH:

REMEMBER: The numbers below are just data.
To gain reusable insights, you need to follow up on why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial  experiments, perform baseline and negative tests that provide experimental control, make sure  the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

The benchmarks we created are telling the truth, but not necessarily the one that we care about(remember the Answer to the Ultimate Question of Life, The Universe, and Everything? ).

The relative performance improvement is impressive, but what we really care about is if this improvement is significant enough for our use cases, and our benchmark was designed to show the relative difference between overhead, which doesn’t answer our question!

In our case, we care more about absolute values and not relative ones, so let’s have a look at the speedup we gained in each case:

Benchmark      Threads Diff    Units
----
with_batching  1       + ~211  ns/op
with_batching  10      + ~258  ns/op
with_batching  100     + ~246  ns/op
with_batching  1000    - ~80   ns/op

Intel i7-4980HQ (8) @ 2.80GHz

As you can see, in the best-case scenario, we saved around 258ns.

Depending on your use case, this can be a significant difference or not really. But batching has other advantages besides offering higher throughput.

Other Considerations

Besides the throughput increase, batching allows you to control the maximum parallelism level.

If you decide to apply the processing in N batches, it also implies that not more than N items will be processed in parallel(assuming that the parallelized action doesn’t orchestrate more parallel processing itself!).

Naturally, you can implement parallelism limits for the non-batching example, but it becomes non-trivial if you want to keep it asynchronous, and requires extra resources (most likely an extra dispatcher thread guarded by a semaphore).

Besides the parallelism level control, batching can give you more predictable latencies when reusing a heavily-shared thread pool.

If no batching involved, each item becomes a separate entry in the thread pool’s work queue, which might result in situations where 99% of the work is already done, but there are other items in the queue that need to be processed first!

And once a batch gets picked up, all the items inside will be processed at once by a single thread.

Another thing to consider is that properly-tuned thread-pools usually feature a fixed-size work queue and that the size(among other parameters) needs to be properly tuned in relation to the expected rate of arrival of new tasks.

Batching generates always N tasks where N corresponds to the number of batches and not the number of all items in the source.

Conclusion

In the next article, we’ll see how to achieve the same by using parallel-collectors .

Code samples can be found on GitHub.


以上所述就是小编给大家介绍的《Parallel Collection Processing: Leveraging Batching (2/3)》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

赛博空间的奥德赛

赛博空间的奥德赛

(荷兰)约斯·德·穆尔 (Jos de Mul) / 麦永雄 / 广西师范大学出版社 / 2007-2 / 38.00元

本书揭示了数码信息时代的电子传媒与赛博空间为人类历史的发展提供的新的可能性。本书第一部分“通向未来的高速公路”,涉及无线想象、政治技术和极权主义在赛博空间的消解等题旨;第二部分“赛博空间的想象” ,讨论空间文学探索简史、电影和文化的数码化;第三部分”可能的世界” ,关涉世界观的信息化、数码复制时代的世界、数码此在等层面;第四、五部分探讨主页时代的身份、虚拟人类学、虚拟多神论、赛博空间的进化、超人文......一起来看看 《赛博空间的奥德赛》 这本书的介绍吧!

JSON 在线解析
JSON 在线解析

在线 JSON 格式化工具

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

UNIX 时间戳转换
UNIX 时间戳转换

UNIX 时间戳转换