|
|
@@ -8,19 +8,19 @@ import cn.gov.customs.wxjy.ibmmq.async.AsyncPushService;
|
|
|
import cn.gov.customs.wxjy.ibmmq.config.PushContext;
|
|
|
import cn.gov.customs.wxjy.ibmmq.enums.IbmmqEnum;
|
|
|
import cn.gov.customs.wxjy.task.dao.TaskLogMapper;
|
|
|
+import cn.gov.customs.wxjy.task.dao.SyncDataStatusMapper; // 新增
|
|
|
import cn.gov.customs.wxjy.task.pojo.TaskLog;
|
|
|
+import cn.gov.customs.wxjy.task.pojo.SyncDataStatus; // 新增
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.collections.CollectionUtils;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.Date;
|
|
|
-import java.util.List;
|
|
|
+import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* 基础业务任务抽象类
|
|
|
- * 所有具体业务任务都继承此类
|
|
|
+ * 使用 sync_data_status 表来区分新增和修改数据
|
|
|
*/
|
|
|
@Slf4j
|
|
|
public abstract class BaseBusinessTask<T> {
|
|
|
@@ -28,6 +28,9 @@ public abstract class BaseBusinessTask<T> {
|
|
|
@Autowired
|
|
|
protected TaskLogMapper taskLogMapper;
|
|
|
|
|
|
+ @Autowired
|
|
|
+ protected SyncDataStatusMapper syncDataStatusMapper; // 新增
|
|
|
+
|
|
|
@Autowired
|
|
|
protected AsyncPushService asyncPushService;
|
|
|
|
|
|
@@ -66,13 +69,6 @@ public abstract class BaseBusinessTask<T> {
|
|
|
*/
|
|
|
public abstract List<T> queryData(Date lastSyncTime);
|
|
|
|
|
|
- /**
|
|
|
- * 判断是否为新增数据
|
|
|
- * @param data 数据对象
|
|
|
- * @return true-新增,false-修改
|
|
|
- */
|
|
|
- public abstract boolean isNewData(T data);
|
|
|
-
|
|
|
/**
|
|
|
* 获取数据ID
|
|
|
* @param data 数据对象
|
|
|
@@ -84,33 +80,67 @@ public abstract class BaseBusinessTask<T> {
|
|
|
* 执行任务
|
|
|
*/
|
|
|
public void execute() {
|
|
|
- Date lastSyncTime = getLastSyncTime();
|
|
|
- List<T> dataList = queryData(lastSyncTime);
|
|
|
+ long startTime = System.currentTimeMillis();
|
|
|
+ String errorMsg = null;
|
|
|
+ List<String> businessIds = new ArrayList<>();
|
|
|
|
|
|
- if (CollectionUtils.isEmpty(dataList)) {
|
|
|
- log.info("{}:没有需要同步的数据", getTaskName());
|
|
|
- return;
|
|
|
- }
|
|
|
+ try {
|
|
|
+ // 1. 获取上次同步时间
|
|
|
+ Date lastSyncTime = getLastSyncTime();
|
|
|
|
|
|
- // 区分新增和修改数据
|
|
|
- List<T> addList = new ArrayList<>();
|
|
|
- List<T> updateList = new ArrayList<>();
|
|
|
+ // 2. 查询需要同步的数据
|
|
|
+ List<T> dataList = queryData(lastSyncTime);
|
|
|
|
|
|
- for (T data : dataList) {
|
|
|
- if (isNewData(data)) {
|
|
|
- addList.add(data);
|
|
|
- } else {
|
|
|
- updateList.add(data);
|
|
|
+ if (CollectionUtils.isEmpty(dataList)) {
|
|
|
+ log.info("{}:没有需要同步的数据", getTaskName());
|
|
|
+ recordTaskLog(TaskStatusEnum.SUCCESS, startTime, null);
|
|
|
+ return;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- // 推送数据
|
|
|
- if (CollectionUtils.isNotEmpty(addList)) {
|
|
|
- pushData(addList, IbmmqEnum.SAVE.getCode());
|
|
|
- }
|
|
|
+ // 3. 获取已同步的数据ID集合(从 sync_data_status 表)
|
|
|
+ Set<String> syncedDataIds = getSyncedDataIds();
|
|
|
+
|
|
|
+ // 4. 区分新增和修改
|
|
|
+ List<T> addList = new ArrayList<>();
|
|
|
+ List<T> updateList = new ArrayList<>();
|
|
|
+
|
|
|
+ for (T data : dataList) {
|
|
|
+ String dataId = getDataId(data);
|
|
|
+ businessIds.add(dataId);
|
|
|
+
|
|
|
+ // 判断逻辑:如果在 sync_data_status 表中存在,则是修改;否则是新增
|
|
|
+ if (syncedDataIds.contains(dataId)) {
|
|
|
+ updateList.add(data);
|
|
|
+ } else {
|
|
|
+ addList.add(data);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5. 推送数据
|
|
|
+ if (CollectionUtils.isNotEmpty(addList)) {
|
|
|
+ pushData(addList, IbmmqEnum.SAVE.getCode());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (CollectionUtils.isNotEmpty(updateList)) {
|
|
|
+ pushData(updateList, IbmmqEnum.UPDATE.getCode());
|
|
|
+ }
|
|
|
+
|
|
|
+ // 6. 更新同步状态到 sync_data_status 表
|
|
|
+ updateSyncDataStatus(dataList);
|
|
|
+
|
|
|
+ log.info("{}:同步完成,新增{}条,修改{}条",
|
|
|
+ getTaskName(), addList.size(), updateList.size());
|
|
|
|
|
|
- if (CollectionUtils.isNotEmpty(updateList)) {
|
|
|
- pushData(updateList, IbmmqEnum.UPDATE.getCode());
|
|
|
+ } catch (Exception e) {
|
|
|
+ errorMsg = e.getMessage();
|
|
|
+ log.error("{}:任务执行失败", getTaskName(), e);
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ recordTaskLog(
|
|
|
+ errorMsg == null ? TaskStatusEnum.SUCCESS : TaskStatusEnum.FAILED,
|
|
|
+ startTime,
|
|
|
+ errorMsg
|
|
|
+ );
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -118,7 +148,57 @@ public abstract class BaseBusinessTask<T> {
|
|
|
* 获取上次同步时间
|
|
|
*/
|
|
|
protected Date getLastSyncTime() {
|
|
|
- return taskLogMapper.getMaxBussinessDate(getBusinessType());
|
|
|
+ Date lastDate = taskLogMapper.getMaxBussinessDate(getBusinessType());
|
|
|
+ if (lastDate == null) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return lastDate;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取已同步的数据ID集合(从 sync_data_status 表)
|
|
|
+ */
|
|
|
+ protected Set<String> getSyncedDataIds() {
|
|
|
+ List<String> dataIds = syncDataStatusMapper.selectDataIdsByBusinessType(
|
|
|
+ getBusinessType()
|
|
|
+ );
|
|
|
+ return new HashSet<>(dataIds);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 更新同步数据状态到 sync_data_status 表
|
|
|
+ */
|
|
|
+ protected void updateSyncDataStatus(List<T> dataList) {
|
|
|
+ if (CollectionUtils.isEmpty(dataList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Date now = new Date();
|
|
|
+ String businessType = getBusinessType();
|
|
|
+
|
|
|
+ for (T data : dataList) {
|
|
|
+ String dataId = getDataId(data);
|
|
|
+
|
|
|
+ // 查询是否已存在
|
|
|
+ SyncDataStatus status = syncDataStatusMapper.selectByBusinessAndDataId(
|
|
|
+ businessType, dataId
|
|
|
+ );
|
|
|
+
|
|
|
+ if (status == null) {
|
|
|
+ // 新增状态记录
|
|
|
+ status = new SyncDataStatus();
|
|
|
+ status.setID(idHelper.nextId());
|
|
|
+ status.setBusinessType(businessType);
|
|
|
+ status.setBusinessId(dataId);
|
|
|
+ status.setSyncTime(DateUtils.getDate());
|
|
|
+ status.setSyncVersion("1");
|
|
|
+ status.setIsDeleted("0");
|
|
|
+ syncDataStatusMapper.insert(status);
|
|
|
+ } else {
|
|
|
+ // 更新状态记录(只更新时间戳,版本号+1)
|
|
|
+ syncDataStatusMapper.updateSyncTime(businessType, dataId, now);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|