16

I want to do a parallel map over a big list. The code looks somewhat like this:

big_list
|> Stream.map(&Task.async(Module, :do_something, [&1]))
|> Stream.map(&Task.await(&1))
|> Enum.filter filter_fun

But I was checking Stream implementation and as far as I understand Stream.map combines the functions and applies combined function to elements in the stream, which would mean that the sequence is like this:

  1. Take first element
  2. Create async task
  3. Wait for it to finish
  4. Take second elelemnt...

In that case, it doesn't do it in parallel. Am I right or am I missing something?

If I am right, what about this code?

Stream.map Task.async ...
|> Enum.map Task.await ...

Is that going to run in parallel?

3

3 Answers 3

16

The second one also doesn't do what you want. You can see it clearly with this code:

defmodule Test do
  def test do
    [1,2,3]
    |> Stream.map(&Task.async(Test, :job, [&1]))
    |> Enum.map(&Task.await(&1))
  end

  def job(number) do
    :timer.sleep 1000
    IO.inspect(number)
  end
end

Test.test

You'll see a number, then a 1 second wait, another number, and so on. The key here is that you want to create the tasks as soon as possible, so you shouldn't use the lazy Stream.map at all. Instead use the eager Enum.map at that point:

|> Enum.map(&Task.async(Test, :job, [&1]))
|> Enum.map(&Task.await(&1))

On the other hand you can use Stream.map when awaiting, as long as you do some eager operation later, like your filter. That way the awaits will be interspersed with any processing you might be doing on the results.

12

Elixir 1.4 provides the new Task.async_stream/5 function that will return a stream that runs a given function concurrently on each item in an enumerable.

There are also options to specify the maximum number of workers and a timeout, using the :max_concurrency and :timeout options parameters.

Please note that you don't have to await this Task, because the function returns a stream, so you can either use Enum.to_list/1 or use Stream.run/1.


This will make your example run concurrently:

big_list
|> Task.async_stream(Module, :do_something, [])
|> Enum.filter(filter_fun)
3

You can try Parallel Stream.

stream = 1..10 |> ParallelStream.map(fn i -> i * 2 end)
stream |> Enum.into([])
[2,4,6,8,10,12,14,16,18,20]

UPD Or better use Flow

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.