|
|
@@ -0,0 +1,204 @@
|
|
|
+package cn.gov.customs.wxjy.task;
|
|
|
+
|
|
|
+import cn.gov.customs.cacp.sdks.core.user.trans.UserContextHolder;
|
|
|
+import cn.gov.customs.cacp.sdks.hgid.HgidGenerator;
|
|
|
+import cn.gov.customs.wxjy.common.utils.DateUtils;
|
|
|
+import cn.gov.customs.wxjy.constants.TaskStatusEnum;
|
|
|
+import cn.gov.customs.wxjy.ibmmq.async.AsyncPushService;
|
|
|
+import cn.gov.customs.wxjy.ibmmq.config.PushContext;
|
|
|
+import cn.gov.customs.wxjy.ibmmq.enums.IbmmqEnum;
|
|
|
+import cn.gov.customs.wxjy.task.dao.TaskLogMapper;
|
|
|
+import cn.gov.customs.wxjy.task.pojo.TaskLog;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 基础业务任务抽象类
|
|
|
+ * 所有具体业务任务都继承此类
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public abstract class BaseBusinessTask<T> {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ protected TaskLogMapper taskLogMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ protected AsyncPushService asyncPushService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ protected HgidGenerator idHelper;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取任务编码
|
|
|
+ */
|
|
|
+ public abstract String getTaskCode();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取任务名称
|
|
|
+ */
|
|
|
+ public abstract String getTaskName();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取业务类型编码
|
|
|
+ */
|
|
|
+ public abstract String getBusinessType();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取Mapper类名
|
|
|
+ */
|
|
|
+ public abstract String getMapperClassName();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取实体类
|
|
|
+ */
|
|
|
+ public abstract Class<T> getEntityClass();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查询数据
|
|
|
+ * @param lastSyncTime 上次同步时间
|
|
|
+ * @return 需要处理的数据列表
|
|
|
+ */
|
|
|
+ public abstract List<T> queryData(Date lastSyncTime);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 判断是否为新增数据
|
|
|
+ * @param data 数据对象
|
|
|
+ * @return true-新增,false-修改
|
|
|
+ */
|
|
|
+ public abstract boolean isNewData(T data);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取数据ID
|
|
|
+ * @param data 数据对象
|
|
|
+ * @return 数据ID
|
|
|
+ */
|
|
|
+ public abstract String getDataId(T data);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行任务
|
|
|
+ */
|
|
|
+ public void execute() {
|
|
|
+ Date lastSyncTime = getLastSyncTime();
|
|
|
+ List<T> dataList = queryData(lastSyncTime);
|
|
|
+
|
|
|
+ if (CollectionUtils.isEmpty(dataList)) {
|
|
|
+ log.info("{}:没有需要同步的数据", getTaskName());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 区分新增和修改数据
|
|
|
+ List<T> addList = new ArrayList<>();
|
|
|
+ List<T> updateList = new ArrayList<>();
|
|
|
+
|
|
|
+ for (T data : dataList) {
|
|
|
+ if (isNewData(data)) {
|
|
|
+ addList.add(data);
|
|
|
+ } else {
|
|
|
+ updateList.add(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 推送数据
|
|
|
+ if (CollectionUtils.isNotEmpty(addList)) {
|
|
|
+ pushData(addList, IbmmqEnum.SAVE.getCode());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(updateList)) {
|
|
|
+ pushData(updateList, IbmmqEnum.UPDATE.getCode());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取上次同步时间
|
|
|
+ */
|
|
|
+ protected Date getLastSyncTime() {
|
|
|
+ return taskLogMapper.getMaxBussinessDate(getBusinessType());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 推送数据
|
|
|
+ */
|
|
|
+ protected void pushData(List<T> dataList, String operType) {
|
|
|
+ String taskName = getTaskName();
|
|
|
+ log.info("{}:开始推送{}条数据,操作类型:{}",
|
|
|
+ taskName, dataList.size(), operType);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 构建推送上下文
|
|
|
+ T firstData = dataList.get(0);
|
|
|
+ String pushDataId = getDataId(firstData);
|
|
|
+
|
|
|
+ PushContext context = PushContext.of(
|
|
|
+ getBusinessType(),
|
|
|
+ pushDataId,
|
|
|
+ UserContextHolder.currentUser()
|
|
|
+ );
|
|
|
+
|
|
|
+ // 发送异步消息
|
|
|
+ asyncPushService.sendAsync(
|
|
|
+ context,
|
|
|
+ dataList,
|
|
|
+ getMapperClassName(),
|
|
|
+ operType,
|
|
|
+ null,
|
|
|
+ new Class[]{getEntityClass()},
|
|
|
+ null
|
|
|
+ );
|
|
|
+
|
|
|
+ log.info("{}:数据推送成功,数量:{}", taskName, dataList.size());
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("{}:数据推送失败", taskName, e);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 记录任务日志
|
|
|
+ */
|
|
|
+ public void recordTaskLog(TaskStatusEnum status, long startTime, String errorMsg) {
|
|
|
+ try {
|
|
|
+ TaskLog taskLog = new TaskLog();
|
|
|
+ taskLog.setPkTaskLog(idHelper.nextId());
|
|
|
+ taskLog.setBusinessType(getBusinessType());
|
|
|
+ taskLog.setTaskTime(DateUtils.getNowDate());
|
|
|
+ taskLog.setStatus(status.getCode());
|
|
|
+ taskLog.setCostTime(System.currentTimeMillis() - startTime);
|
|
|
+ taskLog.setErrorMessage(errorMsg);
|
|
|
+ taskLogMapper.insert(taskLog);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("保存任务日志失败,业务类型:{}", getBusinessType(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 安全类型转换
|
|
|
+ */
|
|
|
+ protected <R> List<R> safeCastList(List<?> source, Class<R> clazz) {
|
|
|
+ if (source == null) {
|
|
|
+ return Collections.emptyList();
|
|
|
+ }
|
|
|
+
|
|
|
+ List<R> result = new ArrayList<>(source.size());
|
|
|
+ for (Object item : source) {
|
|
|
+ if (item == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (!clazz.isInstance(item)) {
|
|
|
+ throw new ClassCastException(String.format(
|
|
|
+ "期望类型: %s, 实际类型: %s",
|
|
|
+ clazz.getName(),
|
|
|
+ item.getClass().getName()
|
|
|
+ ));
|
|
|
+ }
|
|
|
+ result.add(clazz.cast(item));
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+}
|