Как сделать что бы потоки дожидались выполнения

Рейтинг: 0Ответов: 1Опубликовано: 02.02.2023
@Configuration

@EnableAsync public class AppConfig {

@Bean(name = "threadPoolTaskExecutor")
public TaskExecutor threadPoolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(4);
    executor.setMaxPoolSize(4);
    executor.setThreadNamePrefix("thread-");
    executor.initialize();
    executor.setAwaitTerminationSeconds(Integer.MAX_VALUE);
    executor.setWaitForTasksToCompleteOnShutdown(true);
    return executor;
}

Сам класс, в нем использую CountDownLatch latch , но получается что выполняется только один поток, хотел использовать Future но когда Подключаю свой бин с threadPoolTaskExecutor" то у TaskExecutor taskExecutor не доступен метод submit .

@Service

public class ThreadExecutorService {

private static final Logger log = LoggerFactory.getLogger(ThreadExecutorService.class);

@Autowired
@Qualifier("threadPoolTaskExecutor")
TaskExecutor taskExecutor;
@Autowired
CameraService cameraService;
CountDownLatch latch = new CountDownLatch(1);

public List<CameraInfoThread> generateCameraReport() throws JsonProcessingException, InterruptedException {

    List<Camera> list = cameraService.getListCameras();
    List<CameraInfoThread> cameraInfoList = new ArrayList<>();

    for (int i = 0; i < list.size(); i++) {
        CameraInfoThread cameraInfoThread = new CameraInfoThread(list.get(i));
        taskExecutor.execute(cameraInfoThread);
        latch.await();
        cameraInfoList.add(cameraInfoThread);
    }
    return cameraInfoList;
}

private class CameraInfoThread implements Runnable {
    Camera camera;
    CameraInfo cameraInfo = new CameraInfo();

    public CameraInfoThread(Camera camera) {
        this.camera = camera;
    }
    public void run() {
        try {
            log.info("START");
            cameraInfo.setId(camera.getId());
            SourceDataUrl sourceDataUrl = cameraService.getSourceDataUrl(camera.getSourceDataUrl());
            cameraInfo.setUrlType(sourceDataUrl.getUrlType());
            log.info("/////////////////////");
            cameraInfo.setVideoUrl(sourceDataUrl.getVideoUrl());
            TokenDataUrl tokenDataUrl = cameraService.getTokenDataUrl(camera.getTokenDataUrl());
            cameraInfo.setValue(tokenDataUrl.getValue());
            cameraInfo.setTtl(tokenDataUrl.getTtl());
            log.info("END");
            latch.countDown();
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "CameraInfoThread{" +
                "cameraInfo=" + cameraInfo +
                '}';
    }
}

}

Ответы

▲ 1

С использованием CountDownLatch это делается так.

public List<CameraInfoThread> generateCameraReport() throws JsonProcessingException, InterruptedException {

    List<Camera> list = cameraService.getListCameras();
    List<CameraInfoThread> cameraInfoList = new ArrayList<>();

    // важно, чтоб каждый поток мог уменьшить latch
    CountDownLatch latch = new CountDownLatch(list.size());
    for (int i = 0; i < list.size(); i++) {
        CameraInfoThread cameraInfoThread = new CameraInfoThread(list.get(i), latch);
        taskExecutor.execute(cameraInfoThread);
        cameraInfoList.add(cameraInfoThread);
    }
    latch.await();  // важно ждать после того, как все потоки запущены
    return cameraInfoList;
}

private class CameraInfoThread implements Runnable {
    Camera camera;
    CameraInfo cameraInfo = new CameraInfo();
    final CountDownLatch latch; 

    public CameraInfoThread(Camera camera, CountDownLatch latch) {
        this.camera = camera;
        this.latch = latch;
    }
    public void run() {
        try {
            log.info("START");
            cameraInfo.setId(camera.getId());
            // обработка c заполнением cameraInfo
            log.info("END");
            latch.countDown();
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

}

Альтернативный вариант это сделать бин threadPoolTaskExecutor не TaskExecutor, а AsyncTaskExecutor (просто поменять тип в конфигурации, ThreadPoolTaskExecutor реализует оба интерфейса). AsyncTaskExecutor позволяет делать submit, который возвращает Future.