4

This question is exactly the same as this one, which wasn't actually answered (that code only uses one thread). My code looks like this at the moment

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            foo.stream().parallel()
                    .forEach(bar -> {
                                //business logic
                            }
                    );
            return null;
        }, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));

completableFuture.get();

yet only one thread is correctly traced. Using .parallelStream() or a LazyTraceExecutor directly instead of a TraceableExecutorService didn't help.

1 Answer 1

2

Seems to work thanks to this example. The snippet above becomes:

TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();

where runnablesBusinessLogic is

private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    for (FooBar f : foo) {
        futures.add(CompletableFuture.runAsync(() -> {
            businessLogic(f);
            return;
        }, executorService));
    }
    return futures.toArray(new CompletableFuture[futures.size()]);
}

If I understood the example (and the discussion behind the current status of the documentation) correctly, Sleuth cannot work with a ForkJoinPool (and so with parallel streams) automatically. The main idea to make it work is not to create a CompletableFuture and split it, but to create several futures (and join them).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.