package com.deloitte.system.job; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.common.core.constant.GlobalConst; import com.common.core.enums.TableNameEnum; import com.common.core.enums.YesNoEnum; import com.common.core.exception.BizException; import com.common.core.service.SFService; import com.common.core.utils.DistributedLock; import com.common.core.utils.RestUtil; import com.common.redis.util.RedisUtil; import com.deloitte.system.request.SFTokenDto; import com.deloitte.system.request.TxConfirmDto; import com.deloitte.system.service.ConfirmTxService; import com.deloitte.system.service.FileService; import com.jfinal.plugin.activerecord.Db; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpHeaders; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; /** * @author 廖振钦 * @company deloitte * @date 2022-01-26 * @descrition: * 兜底机制定时任务TODO * */ @Slf4j @Component public class GuaranteedTask { @Autowired private FileService fileService; @Autowired private SFService sfService; @Autowired private ConfirmTxService confirmTxService; @Autowired private RedisUtil redisUtil; @Autowired private DistributedLock distributedLock; @Value("${salesforce.baseUrl}") private String baseUrl; /** * 每分钟跑一次 */ @Scheduled(cron = "0 * * * * ?") public void execute() { boolean isLock = false; boolean masterisLock = distributedLock.getLock("guaranteedLock-master",3600 * 1000); if(!masterisLock){ isLock = distributedLock.getLock("guaranteedLock-slave",3600 * 1000); } if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑 try { List sets = new ArrayList<>(); Set insertsets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*"); Set updatesets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*"); sets.addAll(insertsets); sets.addAll(updatesets); Map hashmap = new HashMap<>(); for (String key : sets) { String[] keys = key.split(":"); hashmap.put(keys[keys.length - 1], key); } List sortkeysets = hashmap.keySet().stream().sorted(new Comparator(){ @Override public int compare(String o1, String o2) { Long o1L = Long.parseLong(o1); Long o2L = Long.parseLong(o2); Long v = o2L-o1L; //倒序 // Long v = o1L-o2L; //顺序 if(v > 0){ return 1; }else if(v < 0){ return -1; }else { return 0; } } }).limit(50).collect(Collectors.toList()); if (CollUtil.isEmpty(sortkeysets)) { return; } // 调用SF查询事务状态的接口,传入txId List,将查询的状态结果放在map,key为txId,value为处理状态 Map> statusMap = queryTransactionStatus(sortkeysets); // Map> statusMap = query(); for (String key : sortkeysets) { boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务 //如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑 if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*").contains(GlobalConst.PIPL_UNCONFIRM_INSERT + key) || redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*").contains(GlobalConst.PIPL_UNCONFIRM_UPDATE + key)) && isExecute){ String txid = hashmap.get(key); try { // 获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key List collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList()); if (CollUtil.isNotEmpty(collect)) { String tId = collect.get(0); Map stringStringMap = statusMap.get(tId); String status = stringStringMap.get("status"); String sfRecordId = stringStringMap.get("sfRecordId"); if ("success".equals(status)) { TxConfirmDto dto=new TxConfirmDto(key,sfRecordId, YesNoEnum.YES.getCode(),null); confirmTxService.confirm(dto); }else { redisUtil.deleteKey(txid); } } } catch (BizException e) { log.error(e.getMessage(), e); }finally { distributedLock.releaseLock("guaranteed:"+key+":lock"); } } } } catch (Exception e) { log.error(e.getMessage(), e); }finally { if(masterisLock){ distributedLock.releaseLock("guaranteedLock-master"); } if (isLock){ distributedLock.releaseLock("guaranteedLock-slave"); } } } } /** * 每分钟跑一次 */ @Scheduled(cron = "0 * * * * ?") public void executeFile() { boolean isLock = false; boolean masterisLock = distributedLock.getLock("guaranteedLock-master-file",3600 * 1000); if(!masterisLock){ isLock = distributedLock.getLock("guaranteedLock-slave-file",3600 * 1000); } if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑 try{ List sets = new ArrayList<>(); Set fileKeys = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*"); sets.addAll(fileKeys); Map hashmap = new HashMap<>(); for (String key : sets) { String[] keys = key.split(":"); hashmap.put(keys[keys.length - 1], key); } List sortkeysets = hashmap.keySet().stream().sorted(new Comparator(){ @Override public int compare(String o1, String o2) { Long o1L = Long.parseLong(o1); Long o2L = Long.parseLong(o2); Long v = o2L-o1L; //倒序 // Long v = o1L-o2L; //顺序 if(v > 0){ return 1; }else if(v < 0){ return -1; }else { return 0; } } }).limit(50).collect(Collectors.toList()); if (CollUtil.isEmpty(sortkeysets)) { return; } Map> statusMap = queryTransactionStatus(sortkeysets); // Map statusMap = query(); for (String key : sortkeysets) { boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务 //如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑 if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*").contains(GlobalConst.PIPL_UNCONFIRM_FILE + key)) && isExecute) { String txid = hashmap.get(key); try { //获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key List collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList()); if (CollUtil.isNotEmpty(collect)) { String tId = collect.get(0); Map stringStringMap = statusMap.get(tId); String status = stringStringMap.get("status"); String sfRecordId = stringStringMap.get("sfRecordId"); if ("success".equals(status)) { // confirmTxService.confirmFile(key); redisUtil.deleteKey(txid); }else { List keyList = redisUtil.getObjects(txid, String.class); try { fileService.cleanS3File(keyList); }catch (Exception e){ log.error(e.getMessage(), e); throw new BizException("clean s3 file fail"); } redisUtil.deleteKey(txid); } } } catch (BizException e) { log.error(e.getMessage(), e); }finally { distributedLock.releaseLock("guaranteed:"+key+":lock"); } } } }catch (Exception e){ log.error(e.getMessage(), e); }finally { if(masterisLock){ distributedLock.releaseLock("guaranteedLock-master-file"); } if (isLock){ distributedLock.releaseLock("guaranteedLock-slave-file"); } } } } /** * 同步表中的sfid(每天两点执行) */ @Scheduled(cron = "0 0 2 * * ?") public void executeSyncSfid() { boolean isLock = false; boolean masterisLock = distributedLock.getLock("guaranteedLock-syncSfid-master",3600 * 1000); if(!masterisLock){ isLock = distributedLock.getLock("guaranteedLock-syncSfid-slave",3600 * 1000); } if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑 ArrayList lockKeys = new ArrayList<>(); try { //aws tableName与sf tableName的映射 List allAwsTableName = TableNameEnum.getAllAwsTableName(); for (String awsTableName:allAwsTableName) { List ids = Db.query("select id from `"+awsTableName+"` where is_delete = '0' and create_time>date_add(now(),interval -2 MONTH) and sf_record_id is null"); log.info("同步表{},查询到sf_id为空的数据共{}条",awsTableName,ids.size()); if (CollUtil.isEmpty(ids)) { continue; } //加表名锁,阻止多个服务同时更新一张表 //判断是否有机器在执行事务 boolean isExecute = distributedLock.getLock("guaranteed-syncSfid:"+awsTableName+":lock",3600 * 1000); if(isExecute) { lockKeys.add(awsTableName); String sfTableName = TableNameEnum.getSfTableNameByAwsTableName(awsTableName); //截取ids每50条执行一次 int skip = 0; int sub = 50; List collect = null; String idColumn = "AWS_Data_Id__c"; while (collect==null||collect.size()>=sub) { collect = ids.stream().skip(skip).limit(sub).map(String::valueOf).collect(Collectors.toList()); skip += sub; String sql = "select Id,"+idColumn+" from "+sfTableName+" where "+idColumn+" in "+collect.stream().collect(Collectors.joining("','", "('", "')")); JSONArray jsonArray = sfService.querySFData(sql); if (jsonArray != null && jsonArray.size() != 0) { for (int i = 0;i> queryTransactionStatus(List sortkeysets){ SFTokenDto tokenDto = sfService.getToken(); String accessToken = tokenDto.getAccessToken(); HttpHeaders header = new HttpHeaders(); header.add("Authorization", "Bearer " + accessToken); StringBuilder idString = new StringBuilder(); // 拼接URL String ids = sortkeysets.stream().collect(Collectors.joining("','", "('", "')")); idString.append(baseUrl).append("services/data/v53.0/query/?q=Select Status__c,TransId__c,SFRecordId__c from Transaction_Log__c Where TransId__c in ").append(ids); // 请求数据 JSONObject jsonObject = RestUtil.get(idString.toString(),header); Map> map = new HashMap<>(); List objects = new ArrayList<>(); Object totalSize = jsonObject.get("totalSize"); Boolean done =(Boolean) jsonObject.get("done"); String nextRecordsUrl = jsonObject.getString("nextRecordsUrl"); JSONArray jsonArray = jsonObject.getJSONArray("records"); if (totalSize == null) { log.info("请求事务确认数据失败,result:{}",jsonObject); throw new BizException("请求事务确认数据失败"); } for (int i = 0; i mapStatus = new HashMap<>(); mapStatus.put("status",status); mapStatus.put("sfRecordId",sfRecordId); map.put(transId,mapStatus); } return map; } public Map queryNextRecords(String nextRecordsUrl,Map map){ SFTokenDto tokenDto = sfService.getToken(); String accessToken = tokenDto.getAccessToken(); HttpHeaders header = new HttpHeaders(); header.add("Authorization", "Bearer " + accessToken); JSONObject jsonObject = RestUtil.get(nextRecordsUrl,header); Boolean done =(Boolean) jsonObject.get("done"); JSONArray jsonArray = jsonObject.getJSONArray("records"); Object totalSize = jsonObject.get("totalSize"); if (totalSize == null) { return map; } for (int i = 0; i