Использование миллионов Task-ов приводит к ошибке System.OutOfMemoryException (C#)
У меня есть 6 миллионов небольших (средний размер около 15 байт) файлов, которые мне нужно прочитать и далее обработать с помощью процессора. Я ранее реализовал это с помощью Task.Factory
и всё работало без проблем на asp .net core 2.1. Занимало по времени около 20 часов.
Теперь я перенес приложение на asp.net 6 и на тестовом сервере мое веб-приложение перестает отвечать на любые запросы после запуска этих файловых операций и аварийно завершается. В логах я вижу ошибку System.OutOfMemoryException
.
Я полагаю, что мой способ реализации далек от идеала. Я хотел бы узнать какие-нибудь иные способы многопоточной реализации этой работы или ваши замечания по текущему коду.
Метод контроллера ImportSignatures
:
[HttpPost("ImportSignatures")]
public async Task<JsonResult> ImportSignatures()
{
try
{
ImportSigningCertsResult res = await SignatureImportService.ImportSigningCerts();
return Json(res);
}
catch (Exception e)
{
LogsHelper.WriteLog("api/Settings/ImportSignatures", e);
return Json(new ImportSigningCertsResult(e.Message, SignatureImportService.WasCancelled));
}
}
Метод ImportSigningCerts
:
public static async Task<ImportSigningCertsResult> ImportSigningCerts()
{
LogsHelper.WriteEventLog("Запуск SignatureImportService");
WasCancelled = false;
IsWorking = true;
ResultStr = "";
totalSignatures = 0;
processedSignatures = 0;
var cancelMsg = "Импорт сертификатов был прерван. \n";
var endMsg = "Импорт сертификатов успешно завершён. \n";
var toDelete = new List<string>();
try
{
var configuration = SignatureImportConfiguration.FromCfg();
using (s_tokenSource = new CancellationTokenSource())
{
IEnumerable<string> signatures = Directory.EnumerateFiles(configuration.Path, "*.sig");
totalSignatures = signatures.Count();
Store mainStore = StoreMan.GetStore("Main");
var importStats = new ImportStats();
List<Task> tasks = new();
int saveIndex = 1;
const int proccessedForSave = 100000; // Через какое кол-во обработанных подписей произвести промежуточное сохранение хранилища и удаление подписей
CancellationToken token = s_tokenSource.Token;
ThreadPool.GetMinThreads(out int minWorkerThreads, out _);
using SemaphoreSlim semaphore = new(minWorkerThreads);
foreach (string path in signatures)
{
semaphore.Wait();
if (WasCancelled)
break;
tasks.Add(Task.Factory.StartNew(() =>
{
try
{
token.ThrowIfCancellationRequested();
if (UploadSigningCerts(mainStore, path, importStats))
{
if (configuration.NeedCleaning)
{
lock (s_toDeleteListLockObj)
toDelete.Add(path);
}
}
Interlocked.Increment(ref processedSignatures);
lock (s_intermediateSaveLockObj)
{
if (processedSignatures > proccessedForSave * saveIndex)
{
LogsHelper.WriteEventLog("Промежуточное сохранение хранилища сертификатов...");
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
saveIndex++;
}
}
}
catch (Exception e)
{
if (e is not OperationCanceledException)
LogsHelper.WriteLog("SignatureImportService/ImportSigningCerts:Task.Factory.StartNew", e);
}
finally
{
semaphore.Release();
}
}, token));
}
try
{
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) { }
mainStore.WriteIfChanged();
StartRemovingSignatures(toDelete);
ResultStr = (WasCancelled ? cancelMsg : endMsg) + $"Certificates found: {importStats.all}. Was imported: {importStats.imported}." + (importStats.parsingFailed > 0 ? $" Unrecognized files: {importStats.parsingFailed}" : "");
}
LogsHelper.WriteEventLog(ResultStr);
return s_tokenSource == null ? new ImportSigningCertsResult(ResultStr) : new ImportSigningCertsResult(ResultStr, WasCancelled);
}
finally
{
IsWorking = false;
}
}
Метод UploadSigningCerts
:
private static bool UploadSigningCerts(Store store, string path, ImportStats importStats)
{
bool toBeDeleted = true;
CryptoClient client = CryptoServiceContext.DefaultInstance.CryptoClient;
try
{
List<CertInfo> certs = client.GetSignCmsInfo(File.ReadAllBytes(path)).Certs.ToList();
Interlocked.Add(ref importStats.all, certs.Count);
for (int i = 0; i < certs.Count; i++)
{
lock (s_importLockObj)
{
// Код по валидации каждого сертификата из файла, принятие решения об импорте, импорт в хранилище...
}
}
return toBeDeleted;
}
catch (Exception e)
{
LogsHelper.WriteLog("SignatureImportService/UploadSigningCerts", e);
LogsHelper.WriteEventLog($"Ошибка импорта сертификата из подписи: {Path.GetFileName(path)};");
Interlocked.Increment(ref importStats.errors);
return false;
}
}
Метод StartRemovingSignatures
:
private static void StartRemovingSignatures(List<string> toDelete)
{
if (toDelete.Count > 0)
{
List<string> tempToDelete;
lock (s_toDeleteListLockObj)
{
tempToDelete = new List<string>(toDelete);
toDelete.Clear();
}
LogsHelper.WriteEventLog("Удаление успешно обработанных файлов подписей...");
Task.Factory.StartNew(() =>
{
tempToDelete.ForEach(path =>
{
try
{
File.Delete(path);
}
catch (Exception e)
{
LogsHelper.WriteLog("ImportResult/DeleteSignatures", e);
}
});
});
}
}
Текст ошибки:
20.08.2023 11:58:01 api/Settings/ImportSignatures
Exception of type 'System.OutOfMemoryException' was thrown.
at System.Threading.Tasks.Task.EnsureContingentPropertiesInitializedUnsafe()
at System.Threading.Tasks.Task.AssignCancellationToken(CancellationToken cancellationToken, Task antecedent, TaskContinuation continuation)
at System.Threading.Tasks.Task.TaskConstructorCore(Delegate action, Object state, CancellationToken cancellationToken, TaskCreationOptions creationOptions, InternalTaskOptions internalOptions, TaskScheduler scheduler)
at Store.Services.SignatureImportService.<>c__DisplayClass20_0.<ImportSigningCerts>b__0(String path)
at System.Collections.Generic.List`1.ForEach(Action`1 action)
at Store.Services.SignatureImportService.ImportSigningCerts()
at Store.Controllers.SettingsController.ImportSignatures()
Updated: код был отредактирован