67

Say I have a task like:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

What is the easiest way to parallelize each compute() (assuming they are already parallelizable)?

I do not need an answer that matches strictly the code above, just a general answer. But if you need more info: my tasks are IO bound and this is for a Spring Web application and the tasks are going to be executed in a HTTP request.

1
  • 5
    Should the second line be Result result = compute(object);? Commented Oct 10, 2015 at 18:31

10 Answers 10

88

I would recommend taking a look at ExecutorService.

In particular, something like this:

ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

Note that using newCachedThreadPool could be bad if objects is a big list. A cached thread pool could create a thread per task! You may want to use newFixedThreadPool(n) where n is something reasonable (like the number of cores you have, assuming compute() is CPU bound).

Here's full code that actually runs:

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}
5
  • Is there a c# version of this?
    – Malfist
    Commented Jan 6, 2010 at 20:48
  • 2
    Also look at Executors, which functions as a factory for various flavors of executor services.
    – Rob H
    Commented Jan 6, 2010 at 20:49
  • @Malfist in C# there's tasks (well for the upcoming .net 4) that make all of these a breeze :). And there are delegates/lambdas and threads, funcs, threadstart, etc to do it in 3.5 Commented Jan 16, 2010 at 18:44
  • @Malfist, I know this is an old comment, but C# has Parallel.ForEach and the Task Parallels Library - aka TPL now. They're pretty complete.
    – Machado
    Commented Oct 25, 2016 at 19:42
  • I would say one way is to try with Java.lang.Thread.Thread like here, create a thread for each task and run them. This worked for me in Spring. Reading this I like to try virtual threads but didn't succeed in Spring yet. Also Tasks, Paralllel for each, tpl, async await have their own issues. One is many say to use await for async tasks which will render them as sync itself if I am not wrong. Commented Aug 2 at 0:35
9

With Java8 and later you can use a parallelStream on the collection to achieve this:

List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

Note: the order of the result list may not match the order in the objects list.

Details how to setup the right number of threads are available in this stackoverflow question how-many-threads-are-spawned-in-parallelstream-in-java-8

3
  • 1
    This in my view is code smell. You are blocking all other code using parallelStream. In test or small app, might me ok, but on big server this might be recipe for disaster.
    – user482745
    Commented Oct 17, 2018 at 11:44
  • 4
    Streams are designed for data parallelism, not task parallelism. See stackoverflow.com/a/23370799/208288. Commented Nov 2, 2018 at 21:35
  • @LairdNelson : good link, but Brian seems to talk about paralellism in general, and not about using streams, right ?
    – serv-inc
    Commented Feb 21, 2022 at 10:55
6

One can simple create a few thread and get the result.

Thread t = new Mythread(object);

if (t.done()) {
   // get result
   // add result
}

EDIT : I think other solutions are cooler.

0
1

For a more detailed answer, read Java Concurrency in Practice and use java.util.concurrent.

1
  • 1
    This should be a content mate
    – Vino
    Commented Oct 15, 2018 at 22:42
1

Here's something I use in my own projects:

public class ParallelTasks
{
    private final Collection<Runnable> tasks = new ArrayList<Runnable>();

    public ParallelTasks()
    {
    }

    public void add(final Runnable task)
    {
        tasks.add(task);
    }

    public void go() throws InterruptedException
    {
        final ExecutorService threads = Executors.newFixedThreadPool(Runtime.getRuntime()
                .availableProcessors());
        try
        {
            final CountDownLatch latch = new CountDownLatch(tasks.size());
            for (final Runnable task : tasks)
                threads.execute(new Runnable() {
                    public void run()
                    {
                        try
                        {
                            task.run();
                        }
                        finally
                        {
                            latch.countDown();
                        }
                    }
                });
            latch.await();
        }
        finally
        {
            threads.shutdown();
        }
    }
}

// ...

public static void main(final String[] args) throws Exception
{
    ParallelTasks tasks = new ParallelTasks();
    final Runnable waitOneSecond = new Runnable() {
        public void run()
        {
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException e)
            {
            }
        }
    };
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    tasks.add(waitOneSecond);
    final long start = System.currentTimeMillis();
    tasks.go();
    System.err.println(System.currentTimeMillis() - start);
}

Which prints a bit over 2000 on my dual-core box.

1

A neat way is to utilize ExecutorCompletionService.

Say you have following code (as in your example):

 public static void main(String[] args) {
    List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      List<Character> result = computeLettersBefore(letter);
      list.add(result);
    }

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }

Now to execute the tasks in parallel all you need to do is to create ExecutorCompletionService backed by thread pool. Then submit tasks and read the results. Since ExecutorCompletionService uses LinkedBlockingQueue under the hood, the results become available for pickup as soon as they are available (if you run the code you will notice that the order of results is random):

public static void main(String[] args) throws InterruptedException, ExecutionException {
    final ExecutorService threadPool = Executors.newFixedThreadPool(3);
    final ExecutorCompletionService<List<Character>> completionService = new ExecutorCompletionService<>(threadPool);

    final List<Character> letters = IntStream.range(65, 91).mapToObj(i -> (char) i).collect(Collectors.toList());
    List<List<Character>> list = new ArrayList<>();

    for (char letter : letters) {
      completionService.submit(() -> computeLettersBefore(letter));
    }

    // NOTE: instead over iterating over letters again number of submitted tasks can be used as a base for loop
    for (char letter : letters) {
      final List<Character> result = completionService.take().get();
      list.add(result);
    }

    threadPool.shutdownNow(); // NOTE: for safety place it inside finally block 

    System.out.println(list);
  }

  private static List<Character> computeLettersBefore(char letter) {
    return IntStream.range(65, 1 + letter).mapToObj(i -> (char) i).collect(Collectors.toList());
  }
0

You can use the ThreadPoolExecutor. Here is sample code: http://programmingexamples.wikidot.com/threadpoolexecutor (too long to bring it here)

0

Fork/Join's parallel array is one option

0

I to was going to mention an executor class. Here is some example code that you would place in the executor class.

    private static ExecutorService threadLauncher = Executors.newFixedThreadPool(4);

    private List<Callable<Object>> callableList = new ArrayList<Callable<Object>>();

    public void addCallable(Callable<Object> callable) {
        this.callableList.add(callable);
    }

    public void clearCallables(){
        this.callableList.clear();
    }

    public void executeThreads(){
        try {
        threadLauncher.invokeAll(this.callableList);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public Object[] getResult() {

        List<Future<Object>> resultList = null;
        Object[] resultArray = null;
        try {

            resultList = threadLauncher.invokeAll(this.callableList);

            resultArray = new Object[resultList.size()];

            for (int i = 0; i < resultList.size(); i++) {
                resultArray[i] = resultList.get(i).get();
            }

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return resultArray;
    }

Then to use it you would make calls to the executor class to populate and execute it.

executor.addCallable( some implementation of callable) // do this once for each task 
Object[] results = executor.getResult();
1
  • It always annoyed me that there is no wrapper class for a set of jobs Commented Jan 6, 2010 at 20:50
0

I know it's an old old thread, but since Rxjava (now it's v3) came out, my favorite way to do parallel programming is through its flatMap by the following several lines. (sometimes but not very intuitive at the first sight)

// Assume we're in main thread at the moment
Flowable.create(...) // upstream data provider, main thread
  .map(...) // some transformers?? main thread
  .filter(...) // some filter thread
  .flatMap(data -> Flowable.just(data)
               .subscribeOn(Schedulers.from(...your executorservice for the sub worker.....), true) // true is to delay the error. 
               .doOnNext(this::process)
           , MAX_CONCURRENT) // max number of concurrent workers
  .subscribe();

You can check it's javadoc to understand the operators. Rxjava 3- Flowable A simple example:

Flowable.range(1, 100)
                .map(Object::toString)
                .flatMap (i -> Flowable.just(i)
                        .doOnNext(j -> {
                            System.out.println("Current thread is ");
                            Thread.sleep(100);
                        }).subscribeOn(Schedulers.io()), true, 10)
        
                .subscribe(
                        integer -> log.info("We consumed {}", integer),
                        throwable -> log.error("We met errors", throwable),
                        () -> log.info("The stream completed!!!"));

And for your case:

for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

We could try:

Flowable.fromIterable(objects)
        .flatMap(obj -> 
                    Flowable.just(compute(obj)).subscribeOn(Schedulers.io()), true, YOUR_CONCURRENCY_NUMBER))
        .doOnNext(res -> list.add(res))
        .subscribe()

Bonus points: if you need to add some ordering, let's say for example, odd number all go to worker1, even number worker2, etc. Rxjava can achieve that easily by groupBy and flatMap operators together. I won't go too details about them here. Enjoy playing :)))

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.