ForEachAsync Idiom

Last time we looked at the idiom that allows you to process results in the order of task completion, not in the order of launching. But we skipped one interesting moment there. Let’s say we all have the same weather service and we want to obtain the results for every city as quickly as possible. Does this mean that we can take all the cities in the world and send thousands of requests at the same time? The forecast service may think that the client has gone mad and may try to throttle the requests that exceed a certain limit (by the way, this throttling is one big pain in the ass for all cloud services, both for the authors of the services and the customers).

Let's continue to review the methods that are useful when working with TPL.

Last time
we looked at the idiom that allows you to process results in the order of task completion, not in the order of launching. But we skipped one interesting moment there. Let’s say we all have the same weather service and we want to obtain the results for every city as quickly as possible. Does this mean that we can take all the cities in the world and send thousands of requests at the same time? The forecast service may think that the client has gone mad and may try to throttle the requests that exceed a certain limit (by the way, this throttling is one big pain in the ass for all cloud services, both for the authors of the services and the customers).

So, we need to somehow limit the number of such requests or other asynchronous operations.

Generally, there is one small problem with limiting the number of tasks. In the case of CPU-intensive operations (number crunchers, operations that load CPU/GPU) there are simple heuristics - the number of working tasks should be limited by the number of computing devices. However, in the case of IO-intensive operations there are no such restrictions. Moreover, there are no built-in tools for monitoring the number of such operations.

NOTE

Thread pool, in theory, could help in the situation when a mix of IO bound and CPU bound operations is executed in the system. It has a pile of heuristics which try to find the optimal number of simultaneously working tasks to ensure maximum throughput for task processing. However, these heuristics are not much help when we have a large number of long-running IO bound operations and only we know when our backend will start to fail from the overload. If you are interested, there is a wonderful article on how thread pools work in .NET: Throttling Concurrency in the CLR 4.0 ThreadPool.

So, we need a method, for example, ForEachAsync, that takes a sequence of elements, and the factory method for task launching. And it will also limit the number of simultaneous operations.

ForEachAsync_Idiom_1.jpg


It looks a bit scary, but it’s not all that bad!

There are over 9000 ways to limit the number of simultaneous operations. And the first one in the list is the semaphore. Everything is fine with it, and it can be used for this task, but you can choose something more high-level, for example, the class Partitioner from TPL.

Partitioner.Create (source) returns an object which can divide the input sequence for parallel processing. The algorithm of "division" can be different; it depends on the type of sequence/collection, and is not of interest in our case. The important thing is that partitioner allows you to obtain several "iterators" that can work in parallel, each with its own piece of the input sequence.

Parallel processing is good, but we need a place to store the results. For this, at the beginning of the method, we perform a "materialization" of the sequence in the list, and an array of TaskCompletionSource objects for each future task is created. Then we run the parallel processing of each partition, and "add" the results into an array of TaskCompletionSources as they become available.

ForEachAsync_Idiom_2.jpg


And here are the results of the execution:

[1:22:09 PM]: Getting the weather for 'Moscow'
[1:22:09 PM]: Getting the weather for 'Seattle'
-- Task number limiter is ON! Waiting for the end of the first task!
[1:22:10 PM]: Processing weather for 'Moscow': 'Temp: 6C'
-- Launching the following task immediately after the completion of the previous one
[1:22:10 PM]: Getting the weather for 'New York'
-- The newest task reached completion first
[1:22:15 PM]: Processing weather for 'New York': 'Temp: 8C'
-- Launching the next one
[1:22:15 PM]: Getting the weather for 'Kiev'
-- The second task has finished just now
[1:22:16 PM]: Processing weather for 'Seattle': 'Temp: 7C'
-- And now the last one
[1:22:20 PM]: Processing weather for 'Kiev': 'Temp: 4C'

In graphical form:

ForEachAsync_Idiom_3.jpg


It turns out that this thing not just limits the number of concurrent operations, but also allows you to process the results in order of completion, not in the order of launching! Wonderful!

What to return? Task or IEnumerable<Task>?

Comrade Taub describes the implementation of ForEachAsync in two parts (Implementing a simple ForEachAsync and Implementing a simple ForEachAsync, part 2). But his implementation is somewhat different. The main difference is that the method ForEachAsync returns Task instead of IEnumerable<Task>. This can be a very important difference:

ForEachAsync_Idiom_4.jpg


The code is shorter (which is good), but behaves differently (and we don’t know whether it is good or not). First of all, this approach works only for commands, but does not work for requests (command is a mutator, request is a getter; for details read an appropriate book). Second of all, the presence of just one result greatly complicates the process of error handling, even in the case of restriction of operations such as saving data.

For example, in this case, the handling will completely stop when an exception occurs in each of the existing partitions! When the first error occurs and await body ends with an error, the while loop terminates, and Dispose will be called on the partition object. Partitioning is implemented with the use of a work stealing idiom, and this means that the current elements will be (logically) added to the queue for processing by a different partition. If processing of another partition fails, the number of handlers will decrease even more. And this will go on until all the partitions fail with errors. At best, you will get only a portion of the processed data, and at worst you will have problems with efficiency, due to the fact that you will have far less active task handlers than you thought.

The problem can be solved by accumulation of errors and generation of AggregateException:

ForEachAsync_Idiom_5.jpg


This approach is fully operational, although the problem of association of the task and the occurred error remains.

Share the knowledge

Still have questions?
Connect with us
Thank you.
Your request has been received.
Thank you!
The form has been submitted successfully.