Асинхронная загрузка и обработка массива файлов из сети

Рейтинг: 1Ответов: 1Опубликовано: 24.03.2023

Я загружаю из интернета список бинарных файлов и обрабатываю их.


Task MakeAll(Uri[] uris)
{
    return Task.WhenAll(uris.Select(uri => Work(uri)).ToArray());
}

async Task Work(Uri uri)
{
    var result = await Download(uri);
    await HandleResult(result);
}

В методе "Download" я использую HttpClient c TokenBucketRateLimiter для ограничения скорости запросов.

var options = new TokenBucketRateLimiterOptions()
{
    TokenLimit = 1,
    QueueProcessingOrder = QueueProcessingOrder.OldestFirst,
    QueueLimit = 1000,
    ReplenishmentPeriod = TimeSpan.FromSeconds(4),
    TokensPerPeriod = 1,
    AutoReplenishment = true
};

Суть в том, что при таких настройках запросы на сервер происходят примерно раз в 10 сек. Но я не понимаю почему так медленно. Я не хорошо разбираюсь в асинхронном программировании.

Что мне нужно сделать, что бы файлы скачивались, как только появляется свободный токен, а обработка была второстепенной задачей?

Ответы

▲ 1Принят

Сделать развязку между производителем данных и его обработчиком можно с помощью шаблона параллельного программирования Producer/Consumer. Пример реализации этого шаблона в .NET это System.Threading.Channels.Channel<T>.

public class ProducerConsumer<T>
{
    Task MakeAll(Uri[] uris)
    {
        Channel<T> channel = Channel.CreateUnbounded<T>();
        return Task.WhenAll(Producer(uris, channel.Writer), Consumer(channel.Reader));
    }

    async Task Producer(Uri[] uris, ChannelWriter<T> writer)
    {
        await Task.WhenAll(uris.Select(uri => Work(uri, writer)));
        writer.Complete();
    }

    async Task Consumer(ChannelReader<T> reader)
    {
        await foreach(T data in reader.ReadAllAsync())
        {
            await HandleResult(data);
        }
    }

    async Task Work(Uri uri, ChannelWriter<T> writer)
    {
        T result = await Download(uri);
        await writer.WriteAsync(result);
    }

    async Task<T> Download(Uri uri)
    {
        return default;
    }

    async Task HandleResult(T item)
    {

    }
}