Как правильно передать динамич. массив в поток и обновить его при изменении

Рейтинг: 0Ответов: 2Опубликовано: 17.08.2023

Собственно ситуация программа для статистики. Делается выборка из блока данных по определенным правилам.

есть тип запись обьедененная в массив обьявляю так:

type
 TDataSection = record
  Data: TBytes;
  MD5: RecMD5uint64;
  PRuleN: uint32;
  NumBlock: byte;
  BtLen: word; //byte;
  SectUse: byte;
 end;

type
  TForm1 = class(TForm)
....

  public
  FDDataSect: TArray<TArray<TDataSection>>;
  OutFDDataSect: TArray<TArray<TDataSection>>;

есть поток в нем обьявляю так

type
  TFindTr = class(TThread)
  private
 { Private declarations }
  public
  DataArr: TArray<TArray<TDataSection>>;
  RuleArr: TArray<TArray<TDataSection>>;

// исполняемая часть
procedure TFindTr.Execute;

 while CloseTR = 0 do begin // закрыть поток по внешнему требованию
  StatWork := 1;
  StopFind := 0;
  for NumRule := FStart to FEnd do begin
  Move((Pointer(DataArr[NumBlock,NumData].Data))^, n1arrB[0], SectLen);
  Move((Pointer(RuleArr[NumBlock,NumRule].Data))^, n2arrB[0], SectLen);
  NOutarr[0] := N1arr[0] xor N2arr[0];
  NOutarr[1] := N1arr[1] xor N2arr[1];
  NOutarr[2] := N1arr[2] xor N2arr[2];
  Move(nOutarrB, (Pointer(OutxorArr))^, SectLen);

  PacLenSect := 0;
  BsectSet := [];
  for nBt := 0 to SectLen - 1 do begin
 if OutxorArr[nBt] in BSectSet = false then begin
  include(BSectSet, OutxorArr[nBt]);
  inc(PacLenSect, 1);
  end;
  end;

//  првоверка на условия
  if (PacLenSect <= MinLenUse) then begin
  FindResult := 1;
  break;
  end;
  if StopFind = 1 then break;
  end;
  suspend; // ждем обновления данных из формы.
end;

обработка данных в форме

     // create potok
 x := 0;
 FindTR1 := TFindTr.Create(true);
 FindTR1.Priority := tpNormal;
 FindTR1.DataArr := FDDataSect;
 FindTR1.RuleArr := OutFDDataSect;
 FindTR1.FStart := 0;
 FindTR1.FEnd := Nrule - 1;

 for n := 0 to Ndata -1 do begin
 FindTR1.NumData := n; // задаем блок для обработки из иассива FDDataSect [x,n]
 FindTR1.Resume; // запускаем поток

  tmp1 := 0;
 while FindTR1.Suspended = false do begin // ждем окончания работы потока
  if (tmp1 mod 300) = 0 then begin
  tmp1 := 0;
  application.ProcessMessages;
  end;
  inc(tmp1, 1);
  end;

 if (FindTR1.FindResult = 1) then begin // если блок найдем в масиве OutFDDataSect 
  inc(NMinLen, 1);
  FindTR1.StatWork := 0;
  end;

// если блок ненайдем в масиве OutFDDataSect добовляем блок FDDataSect[x,n]

 if (FindTR1.FindResult = 0) and (n > 0) then begin 
  OutFDLen := length(OutFDDataSect[x]);
  Setlength(OutFDDataSect[x], OutFDlen + 1);
  OutFDDataSect[x, OutFDLen] := FDDataSect[x, n];
  inc(allFindNMinLen, 1);
  Nrule := length(OutFDDataSect[x]);
  FindTR1.FEnd := Nrule - 1;
  end;

Теперь собственно грабли. Все Вычесления проходят в форме - результат правильный. повторяемость 100% - но ОЧЕНЬ медленно.

вычисления проходят в потоке БЕЗ ИЗМЕНЕНИЯ массива OutFDDataSect - результат правильный. повторяемость 100%

вычисления проходят в потоке С ИЗМЕНЕНИЯ массива OutFDDataSect - и тут начинаются грабли. правильного результата нет. часто дожодит до зависания потока намертво. при дебаге выяснилось что изменения линны OutFDDataSect не доходят до потока.

Вобщем чтото сделал не правильно, при попытке выяснить с помощь гугля и глубокого курения RTFM где именно результата не дало. Причем такоеже обьявление прекрасно и стабильно работает в паралелльной сортировке массива OutFDDataSect методом инжекции.

Вобще буду рад любым подсказкам, желательно с кодом.

Ужк начал посматривать в сторону стандартной библиотеки System.Threading (Parallel Programming Library (PPL)) в принципе для моей задачи подходит...


но поле RuleArr осталось прежним.

Это я понимаю. Специально убрал все попытки переинициализации FindTR1.RuleArr, чтобы подсказали вот здесь надо сделать так...

попытка добавить переинициализацию FindTR1.RuleArr после изменения массива результата не дала.

 ...... 
 setlength(FindTR1.RuleArr, 0);
 FindTR1.RuleArr := OutFDDataSect;

пока блок OutFDDataSect[x] находится в одном сегменте памяти - работает. как только сегмент сменился - грабли....

Вот и спрашиваю - Как ПРАВИЛЬНО провести переинициализацию дин. массива.

Писал софт в основном для АСУтп - а в нем не точто за указатели. за динамические массивы с работы вылететь можно.

Ответы

▲ 2

У вас полностью отсутствует синхронизация.

Синхронизация - это обеспечение монопольного доступа потока к данным при их изменении. В данном случае, например, изменение длины массива приводит к тому, что он располагается на новом месте, а другие потоки об этом не знают и обращаются куда попало.

Тема синхронизации большая и сложная, в ответе её не объять. Для начала можете определить критические данные, к которым обращаются несколько потоков, например, OutFDDataSect, и защитить доступ с помощью критической секции TCriticalSection: перед любым обращением к массиву входим в критическую секцию, после работы выходим из неё.

Или несколько изменить структуру хранения - вместо массива взять TThreadList, в который встроены примитивные средства разграничения доступа.

Даже while CloseTR = 0 нехорошо проверять в одном потоке, а изменять в другом, хотя это не приведёт к критическим сбоям. Для цели управления потоком можно использовать, например, события TEvent.

Что почитать - главу из Тейксейра/Пачеко, главу из Елманова/Тенцер, Многопоточность - как это делается в Дельфи

Вот наколеночный пример, зачистку я уже не стал делать, и хэндл окна в принципе может смениться.

На форме две кнопки и листбокс. Создаётся массив на 800 мегабайт, заполняется случайными числами, запускается 8 потоков, каждый обрабатывает 100 мегабайт, ищет некую сумму четырех последовательных чисел.

Если поток нашел сумму, он кладёт данные в защищенный список, асинхронно сообщает об этом форме и приостанавливается, ожидая события.

Когда форме приходит сообщение, что в списке есть данные, она его блокирует, забирает данные, показывает их, очищает список. В списке из-за асинхронности могут быть данные из нескольких потоков - но только по одной записи из каждого. Форма сообщает каждому потоку, приславшему данные, что он может продолжать работу, устанавливая соответствующее событие.

Вторая кнопка во время работы потоков вызывает полное завершение их всех. Это у меня сделано криво - не отслеживается, что потоки вообще созданы, что они работают и так далее - просто как пример управления.

const
  THR_MSG = WM_USER + 666;
  ASize = 100 * 1024 * 1024;
  ThrCnt = 8;

type

  TForm2 = class(TForm)
    Button1: TButton;
    ListBox1: TListBox;
    Button2: TButton;
    CheckBox1: TCheckBox;
    procedure Button1Click(Sender: TObject);
    procedure Button2Click(Sender: TObject);
  private
    { Private declarations }
  public
    { Public declarations }
    procedure THRMSG(var MSG: TMessage); message THR_MSG;
  end;

  TData = record
    Id: Integer;
    Position: Integer;
    Bts: TBytes;
  end;

  TTh = class(TThread)
  private
    Flist: TThreadList<TData>;
    FIdx, FDataStart, FDataLen, FCurrPos: Integer;
    // 0: immediate stop; 1: resume treatment
    FWinHandle: THandle;
    FEventHandles: array [0 .. 1] of THandle;
  protected
    procedure Execute; override;
  public
    constructor Create(AList: TThreadList<TData>;
      WinHandle, ResumeHandle, StopHandle: THandle;
      Idx, DataStart, DataLen: Integer);
  end;

Форма:

procedure TForm2.Button1Click(Sender: TObject);
var
  i: Integer;
begin
  SetLength(Src, ThrCnt * ASize);
  Flist := TThreadList<TData>.Create;
  Randomize;
  for i := 0 to High(Src) do
    Src[i] := Random(256);
  for i := 0 to ThrCnt-1 do
  begin
    EvHandles[i, 0] := CreateEvent(nil, True, False, nil);
    EvHandles[i, 1] := CreateEvent(nil, True, False, nil);
    TTh.Create(Flist, Handle, EvHandles[i, 1], EvHandles[i, 0], i,
      ASize * i, ASize);
  end;
end;

procedure TForm2.Button2Click(Sender: TObject);
var
  i: Integer;
begin
  for i := 0 to ThrCnt - 1 do
    SetEvent(EvHandles[i, 0]);
end;

procedure TForm2.THRMSG(var MSG: TMessage);
var
  i: Integer;
  List: TList<TData>;
  Data: TData;
begin
  List := Flist.LockList;
  try
    for Data in List do begin
      ListBox1.Items.Add(Format('th %d pos %d Data:[%d,%d,%d,%d]',
        [Data.Id, Data.Position, Data.Bts[0], Data.Bts[1], Data.Bts[2],
        Data.Bts[3]]));
      List.Delete(List.Count - 1);
      SetEvent(EvHandles[Data.Id, 1]);
    end;
    List.Clear;
  finally
    Flist.UnlockList;
  end;
end;

Поток:

constructor TTh.Create(AList: TThreadList<TData>;
  WinHandle, ResumeHandle, StopHandle: THandle;
  Idx, DataStart, DataLen: Integer);
begin
  inherited Create(False);
  FreeOnTerminate := True;
  Flist := AList;
  FIdx := Idx;
  FDataStart := DataStart;
  FCurrPos := FDataStart;
  FDataLen := DataLen;
  FWinHandle := WinHandle;
  FEventHandles[0] := StopHandle;
  FEventHandles[1] := ResumeHandle;
end;

procedure TTh.Execute;
var
  List: TList<TData>;
  WaitRes: Integer;
  Data: TData;
  i, sm: Integer;
begin
  repeat
    if FCurrPos < FDataStart + FDataLen - 3 then begin
      while FCurrPos < FDataStart + FDataLen - 3 do begin
        sm := Src[FCurrPos] + Src[FCurrPos + 1] + Src[FCurrPos + 2] +
          Src[FCurrPos + 3];
        if sm = 950 then begin
          Data.Id := FIdx;
          Data.Position := FCurrPos;
          Data.Bts := Copy(Src, FCurrPos, 4);
          List := Flist.LockList;
          try
            List.Add(Data);
          finally
            Flist.UnlockList;
          end;
          PostMessage(FWinHandle, THR_MSG, FIdx, 0);
        end;
        FCurrPos := FCurrPos + 4;
      end;
    end else
      SetEvent(FEventHandles[0]); // данные кончились

    WaitRes := WaitForMultipleObjects(2, @FEventHandles, False, INFINITE);
    if WaitRes = WAIT_OBJECT_0 + 1 then; // продолжить обработку

  until WaitRes = WAIT_OBJECT_0; // ImmediateStop
end;

введите сюда описание изображения

▲ -1

пошговый дамп массива из формы и потока показал что данные доходят правильно. результат в потоке тоже верный. Проблема оказалась в необходимой задержке для остаканивания системы и потока после Resume. после введения sleep(10) все заработало. результат повторяемый правда сильно не стабильный. любой чих приводит к зависанию потока.Так-что пердача массива только для чтения через глобальную переменную - вполне правильный подход.

Костыль:

 FindTR1.Resume;
 sleep(10); // собственно сам костыль. 10 минмал время принятия по стаканчику для выхода из suspend.
 while FindTR1.Suspended = false do begin

В прцессе поиска среди плагиата созданного из одной статьи обратил внимание на TTask & TParallel.For результат

TParallel.For - ну вот наконец-то перенесли аналог из FORTRAN-a - а нет. не туто было.... Нормально запустить не удалось. точнее заработало но так медленно что линейный алгоритм сильно обгонял.

TTask - заработало. код ниже. паралельность - только в таком виде работает. Сильно зависит от загрузки системы... ускорение от 2 до 2.8 раз. Вобщем если нет желания заморачиватся с диспечером - можно использовать.

для проверки основного алгоритма сойдет. для нормальной работы софта - в принципе тоже.

Резюме по PPL Lib. Ни о какой паралельности не и речи. Распределенные вычесления - да.(каждому потоку своя копия данных). Работа с общим блоком - только последовательно.... Вся прелесть паралельности теряется на копировании данных в поток... Вобщем что этой библиотекой хотели сказать индусы так и осталось тайной, покрытой матом.

Отдельное Спасибо MBo за пример с потоками. По факту - раскуриваю книжку и буду разбиратся с примером. хотелосьбы получить стабильное ускорение процесса от 2.9 и выше.

Собственно рабочий код с TTask

// WT, WTS и start/stoptime для подсчета времени - заменить как кому нравится......

uses System.Threading, System.SyncObjs, System.IOUtils;

type RecMD5uint64 = record
  MD5Hi: uint64;
  MD5Lo: uint64;
end;
type
 TDataSection = record
   Data: TBytes;
   MD5: RecMD5uint64;
   PRuleN: uint32;
   NumBlock: byte;
   BtLen: word; //byte;
   SectUse: byte;
 end;

 ..........

// TForm1 //
  public
   FDDataSect: TArray<TArray<TDataSection>>;
   OutFDDataSect: TArray<TArray<TDataSection>>;
   Ts2OutFDDataSect: TArray<TArray<TDataSection>>;
   FindResult: integer;
   TasksEnd: TBytes;

 ...........

procedure CreateTasksV2(DataArr, RuleArr:TArray<TArray<TDataSection>>; min, max: uint32; Pn: uint32; PNTask: byte; PtasksEnd: TBytes; var Wtasks: TArray<ITask>);
begin
 Wtasks[PNTask] := TTask.Create(procedure()
  var
    PNumRule: uint32;
    PnBt: uint32;
    N1arr: array[0..2] of uint64;   // calc xor sect
   N2arr: array[0..2] of uint64;
   NOutarr: array[0..2] of uint64;
   n1arrB: array[0..23] of byte absolute n1arr;
   n2arrB: array[0..23] of byte absolute n2arr;
    nOutarrB: array[0..23] of byte absolute nOutarr;
   BsectSet: set of byte;
   OutxorArr: TBytes;
   PacLenSect: uint32;
   Px: uint32;
   SectLen: uint32;
   MinLenUse: uint32;
  begin
   Px := 0;
   SectLen := 24;
   MinLenUse := 12;
   for PNumRule := min to max do begin
    setlength(OutxorArr, 24);
    Move((Pointer(DataArr[Px,Pn].Data))^, n1arrB[0], 24);
    Move((Pointer(RuleArr[Px,PNumRule].Data))^, n2arrB[0], 24);
    NOutarr[0] := N1arr[0] xor N2arr[0];
    NOutarr[1] := N1arr[1] xor N2arr[1];
    NOutarr[2] := N1arr[2] xor N2arr[2];
    Move(nOutarrB, (Pointer(OutxorArr))^, 24);
    //   tmp1 :=  CalcSectByteLen3T4(tmp);
   PacLenSect := 0;
   BsectSet := [];
    for PnBt := 0 to SectLen - 1 do begin
     if OutxorArr[PnBt] in BSectSet = false then begin
      include(BSectSet, OutxorArr[PnBt]);
      inc(PacLenSect, 1);
     end;
    end;

    if (PacLenSect <= MinLenUse) then begin
       PtasksEnd[PNTask] := 1;
       if PNTask = 1 then begin
        PacLenSect := 1;
       end;
    end;
   end;
  end);
end;

procedure TForm1.Button7Click(Sender: TObject);
var
 Tasks: TArray<ITask>;
 task: ITask;
 FStart, FEnd: uint32;
 FSectCount, FSectLen: uint32;
 FTrNum: uint32;
 FIn: TMemoryStream;
 NLoadRule, x, n: uint32;
 wt, wts, oldwts: RecTime;
 Ndata, Nrule: uint32;
 nresfind, nresnofind: uint32;
 s: string;
 NumRule: uint32;
 SectLen: uint32;
 OutFDLen: uint32;
 MinLenUse: uint32;
 NMinLen, allFindNMinLen: uint32;
 PacLen: uint32;
 FS: tstringlist;

 TaskRes: uint32;
 TasksecLen, TasksecLenEnd: uint32;
 BTasksEnd: uint32;
 Zt: uint32;
begin

 starttime(wt);
 x := 0;
 SectLen := 24;
 MinLenUse := 12;
 NMinLen := 0;
 allFindNMinLen := 0;
 setlength(FDDataSect, 0);     // clear
 setlength(OutFDDataSect, 0);      // clear
 FIn := TMemoryStream.Create;
 FIn.LoadFromFile(Memo1.Lines[0]);
 FIn.Seek(0, soBeginning);
 NLoadRule := FIn.Size div 24;
 setlength(FDDataSect, x + 1);
 setlength(FDDataSect[x], NLoadRule);
 for n := 0 to NLoadRule - 1 do begin
   setlength(FDDataSect[x,n].Data, 24);
   FIn.ReadData(FDDataSect[x,n].Data, 24);
 end;
 FIn.Free;

   setlength(OutFDDataSect, x + 1);
  OutFDLen := length(OutFDDataSect[x]);
   if OutFDLen = 0 then begin
     Setlength(OutFDDataSect[x], OutFDlen + 1);
     OutFDDataSect[x, OutFDLen] := FDDataSect[x, 0];
   end;
  Ndata := length(FDDataSect[x]);
  PBar1.Max := Ndata;
  TaskRes := 0;
  starttime(wts);
 for n := 0 to Ndata - 1 do begin // WORK

 NLoadRule := length(OutFDDataSect[x]);
 setlength(Tasks, 0);
 setlength(TasksEnd, 0);

// создаем потоки
 if NLoadRule <= 1000 then begin // 1 potok
  setlength(Tasks, 1);
  setlength(TasksEnd, 1);
 for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
// procedure CreateTasksV2(DataArr, RuleArr:TArray<TArray<TDataSection>>; min, max: uint32; Pn: uint32; PNTask: byte;
//  var PtasksEnd: TBytes; var Wtasks: TArray<ITask>);
  CreateTasksV2(FDDataSect, OutFDDataSect, 0, NLoadRule -1, n, 0, TasksEnd, tasks);
 end;

 if NLoadRule > 1000 then begin
  setlength(Tasks, 4);
  setlength(TasksEnd, 4);
 for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
  TasksecLen := NLoadRule div 4;
  TasksecLenEnd := NLoadRule - TasksecLen;
  CreateTasksV2(FDDataSect, OutFDDataSect, 0, TasksecLen - 1, n, 0, TasksEnd, Tasks);
  CreateTasksV2(FDDataSect, OutFDDataSect, TasksecLen, (TasksecLen * 2) - 1, n, 1, TasksEnd, Tasks);
  CreateTasksV2(FDDataSect, OutFDDataSect, (TasksecLen * 2), (TasksecLen * 3) - 1 , n, 2, TasksEnd, Tasks);
  CreateTasksV2(FDDataSect, OutFDDataSect, (TasksecLen * 3), NLoadRule - 1 , n, 3, TasksEnd, Tasks);
 end;

 FindResult := 0;
  for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
  for task in tasks do
 task.Start;
 FindResult := 0;
 // for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
   //Ждём выполнение всех задач.
 TTask.WaitForAll(tasks);
  FindResult := 0;
  for BTasksEnd := 0 to Length(tasksEnd) - 1 do begin
   if tasksEnd[BTasksEnd] = 1 then FindResult := 1;;
  end;
   if (FindResult > 0) then begin
   inc(NMinLen, 1);
  end;
  if (FindResult = 0) and (n > 0) then begin
   OutFDLen := length(OutFDDataSect[x]);
   Setlength(OutFDDataSect[x], OutFDlen + 1);
   OutFDDataSect[x, OutFDLen] := FDDataSect[x, n];
   inc(allFindNMinLen, 1);
  end;

    if (n mod 1000) = 0 then begin
   PBar1.Position := n;
   stoptime(wts);
   oldwts.start := wts.stop - wts.start;
   s := DecodeRecTime(oldwts);
   oldwts.stop := oldwts.start;

   memo1.Lines.Add(n.ToString +'   '+ s +'   '+ DecodeRecTime(wts));
   application.ProcessMessages;

   starttime(wts);
  end;
 end;
   stoptime(wt);
  memo1.Lines.Add(Zt.ToString + '  Dup  '+ NMinLen.ToString +'  Add  '+ allFindNMinLen.ToString +'  '+ DecodeRecTime(wt));
  Memo1.Lines.SaveToFile('LogTime');
end;