ScheduledTasks.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package cn.gov.customs.wxjy.task;
  2. import cn.gov.customs.wxjy.constants.TaskStatusEnum;
  3. import lombok.extern.slf4j.Slf4j;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import java.util.Map;
  9. import java.util.concurrent.ConcurrentHashMap;
  10. import java.util.concurrent.ExecutorService;
  11. import java.util.concurrent.Executors;
  12. import java.util.concurrent.ThreadFactory;
  13. import java.util.concurrent.atomic.AtomicInteger;
  14. /**
  15. * 定时任务管理类
  16. * 负责调度和执行所有定时任务
  17. */
  18. @Component
  19. @Slf4j
  20. public class ScheduledTasks {
  21. // 任务执行线程池
  22. private ExecutorService taskExecutor;
  23. // 任务映射表
  24. private final Map<String, BaseBusinessTask<?>> taskMap = new ConcurrentHashMap<>();
  25. @Autowired(required = false)
  26. private PackCatalogTask packCatalogTask;
  27. @Autowired(required = false)
  28. private GoodsCatalogTask goodsCatalogTask;
  29. @Autowired(required = false)
  30. private ChemicalsCatalogTask chemicalsCatalogTask;
  31. /**
  32. * 初始化方法
  33. */
  34. @PostConstruct
  35. public void init() {
  36. // 创建任务执行线程池
  37. taskExecutor = Executors.newFixedThreadPool(5, new TaskThreadFactory());
  38. // 注册任务
  39. registerTasks();
  40. log.info("定时任务管理器初始化完成,已注册{}个任务", taskMap.size());
  41. }
  42. /**
  43. * 注册所有任务
  44. */
  45. private void registerTasks() {
  46. if (packCatalogTask != null) {
  47. taskMap.put(packCatalogTask.getTaskCode(), packCatalogTask);
  48. log.info("注册任务:{}", packCatalogTask.getTaskName());
  49. }
  50. if (goodsCatalogTask != null) {
  51. taskMap.put(goodsCatalogTask.getTaskCode(), goodsCatalogTask);
  52. log.info("注册任务:{}", goodsCatalogTask.getTaskName());
  53. }
  54. if (chemicalsCatalogTask != null) {
  55. taskMap.put(chemicalsCatalogTask.getTaskCode(), chemicalsCatalogTask);
  56. log.info("注册任务:{}", chemicalsCatalogTask.getTaskName());
  57. }
  58. }
  59. /**
  60. * 包装种类定时任务调度
  61. */
  62. @Scheduled(cron = "${task.cron.pack-catalog:30 * * * * ?}")
  63. public void schedulePackCatalogTask() {
  64. executeTask(packCatalogTask, "包装种类目录推送数据");
  65. }
  66. /**
  67. * 危险货物定时任务调度
  68. */
  69. // @Scheduled(cron = "${task.cron.goods-catalog:30 * * * * ?}")
  70. // public void scheduleGoodsCatalogTask() {
  71. // executeTask(goodsCatalogTask, "危险货物目录推送数据");
  72. // }
  73. /**
  74. * 危化品目录定时任务调度
  75. */
  76. // @Scheduled(cron = "${task.cron.chemicals-catalog:30 * * * * ?}")
  77. // public void scheduleChemicalsCatalogTask() {
  78. // executeTask(chemicalsCatalogTask, "危化品目录推送数据");
  79. // }
  80. /**
  81. * 执行任务
  82. */
  83. private void executeTask(BaseBusinessTask<?> task, String taskDescription) {
  84. if (task == null) {
  85. log.warn("任务未配置: {}", taskDescription);
  86. return;
  87. }
  88. String taskName = task.getTaskName();
  89. log.info("{}定时任务开始执行", taskName);
  90. // 提交任务到线程池执行
  91. taskExecutor.submit(() -> {
  92. long startTime = System.currentTimeMillis();
  93. TaskStatusEnum status = TaskStatusEnum.SUCCESS;
  94. String errorMsg = null;
  95. try {
  96. // 执行任务
  97. task.execute();
  98. log.info("{}定时任务执行完成,耗时:{}ms",
  99. taskName, System.currentTimeMillis() - startTime);
  100. } catch (Exception e) {
  101. status = TaskStatusEnum.FAILED;
  102. errorMsg = String.format("执行失败:%s", e.getMessage());
  103. log.error("{}定时任务执行异常,业务类型:{}",
  104. taskName, task.getTaskCode(), e);
  105. } finally {
  106. // 记录任务执行日志
  107. try {
  108. task.recordTaskLog(status, startTime, errorMsg);
  109. } catch (Exception logEx) {
  110. log.error("记录任务日志失败,任务:{}", taskName, logEx);
  111. }
  112. }
  113. });
  114. }
  115. /**
  116. * 任务线程工厂
  117. */
  118. private static class TaskThreadFactory implements ThreadFactory {
  119. private final AtomicInteger threadNumber = new AtomicInteger(1);
  120. private final String namePrefix = "task-thread-";
  121. @Override
  122. public Thread newThread(Runnable r) {
  123. Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
  124. t.setDaemon(true);
  125. t.setPriority(Thread.NORM_PRIORITY);
  126. return t;
  127. }
  128. }
  129. }