package com.deloitte.system.job;
|
|
import cn.hutool.core.collection.CollUtil;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.common.core.constant.GlobalConst;
|
import com.common.core.enums.TableNameEnum;
|
import com.common.core.enums.YesNoEnum;
|
import com.common.core.exception.BizException;
|
import com.common.core.service.SFService;
|
import com.common.core.utils.DistributedLock;
|
import com.common.core.utils.RestUtil;
|
import com.common.redis.util.RedisUtil;
|
import com.deloitte.system.request.SFTokenDto;
|
import com.deloitte.system.request.TxConfirmDto;
|
import com.deloitte.system.service.ConfirmTxService;
|
import com.deloitte.system.service.FileService;
|
import com.jfinal.plugin.activerecord.Db;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.http.HttpHeaders;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.stereotype.Component;
|
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author 廖振钦
|
* @company deloitte
|
* @date 2022-01-26
|
* @descrition:
|
* 兜底机制定时任务TODO
|
* */
|
@Slf4j
|
@Component
|
public class GuaranteedTask {
|
@Autowired
|
private FileService fileService;
|
@Autowired
|
private SFService sfService;
|
|
@Autowired
|
private ConfirmTxService confirmTxService;
|
|
@Autowired
|
private RedisUtil redisUtil;
|
|
@Autowired
|
private DistributedLock distributedLock;
|
@Value("${salesforce.baseUrl}")
|
private String baseUrl;
|
/**
|
* 每分钟跑一次
|
*/
|
@Scheduled(cron = "0 * * * * ?")
|
public void execute() {
|
boolean isLock = false;
|
boolean masterisLock = distributedLock.getLock("guaranteedLock-master",3600 * 1000);
|
if(!masterisLock){
|
isLock = distributedLock.getLock("guaranteedLock-slave",3600 * 1000);
|
}
|
if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑
|
try {
|
List<String> sets = new ArrayList<>();
|
Set<String> insertsets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*");
|
Set<String> updatesets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*");
|
sets.addAll(insertsets);
|
sets.addAll(updatesets);
|
Map<String, String> hashmap = new HashMap<>();
|
for (String key : sets) {
|
String[] keys = key.split(":");
|
hashmap.put(keys[keys.length - 1], key);
|
}
|
List<String> sortkeysets = hashmap.keySet().stream().sorted(new Comparator<String>(){
|
@Override
|
public int compare(String o1, String o2) {
|
Long o1L = Long.parseLong(o1);
|
Long o2L = Long.parseLong(o2);
|
Long v = o2L-o1L; //倒序
|
// Long v = o1L-o2L; //顺序
|
if(v > 0){
|
return 1;
|
}else if(v < 0){
|
return -1;
|
}else {
|
return 0;
|
}
|
}
|
}).limit(50).collect(Collectors.toList());
|
if (CollUtil.isEmpty(sortkeysets)) {
|
return;
|
}
|
// 调用SF查询事务状态的接口,传入txId List,将查询的状态结果放在map,key为txId,value为处理状态
|
Map<String, Map<String,String>> statusMap = queryTransactionStatus(sortkeysets);
|
// Map<String, Map<String, String>> statusMap = query();
|
for (String key : sortkeysets) {
|
boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务
|
//如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑
|
if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*").contains(GlobalConst.PIPL_UNCONFIRM_INSERT + key)
|
|| redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*").contains(GlobalConst.PIPL_UNCONFIRM_UPDATE + key))
|
&& isExecute){
|
String txid = hashmap.get(key);
|
try {
|
// 获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key
|
List<String> collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList());
|
if (CollUtil.isNotEmpty(collect)) {
|
String tId = collect.get(0);
|
Map<String, String> stringStringMap = statusMap.get(tId);
|
String status = stringStringMap.get("status");
|
String sfRecordId = stringStringMap.get("sfRecordId");
|
if ("success".equals(status)) {
|
TxConfirmDto dto=new TxConfirmDto(key,sfRecordId, YesNoEnum.YES.getCode(),null);
|
confirmTxService.confirm(dto);
|
}else {
|
redisUtil.deleteKey(txid);
|
}
|
}
|
} catch (BizException e) {
|
log.error(e.getMessage(), e);
|
}finally {
|
distributedLock.releaseLock("guaranteed:"+key+":lock");
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error(e.getMessage(), e);
|
}finally {
|
if(masterisLock){
|
distributedLock.releaseLock("guaranteedLock-master");
|
}
|
if (isLock){
|
distributedLock.releaseLock("guaranteedLock-slave");
|
}
|
}
|
}
|
}
|
/**
|
* 每分钟跑一次
|
*/
|
@Scheduled(cron = "0 * * * * ?")
|
public void executeFile() {
|
boolean isLock = false;
|
boolean masterisLock = distributedLock.getLock("guaranteedLock-master-file",3600 * 1000);
|
if(!masterisLock){
|
isLock = distributedLock.getLock("guaranteedLock-slave-file",3600 * 1000);
|
}
|
if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑
|
try{
|
List<String> sets = new ArrayList<>();
|
Set<String> fileKeys = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*");
|
sets.addAll(fileKeys);
|
Map<String, String> hashmap = new HashMap<>();
|
for (String key : sets) {
|
String[] keys = key.split(":");
|
hashmap.put(keys[keys.length - 1], key);
|
}
|
List<String> sortkeysets = hashmap.keySet().stream().sorted(new Comparator<String>(){
|
@Override
|
public int compare(String o1, String o2) {
|
Long o1L = Long.parseLong(o1);
|
Long o2L = Long.parseLong(o2);
|
Long v = o2L-o1L; //倒序
|
// Long v = o1L-o2L; //顺序
|
if(v > 0){
|
return 1;
|
}else if(v < 0){
|
return -1;
|
}else {
|
return 0;
|
}
|
}
|
}).limit(50).collect(Collectors.toList());
|
if (CollUtil.isEmpty(sortkeysets)) {
|
return;
|
}
|
Map<String, Map<String, String>> statusMap = queryTransactionStatus(sortkeysets);
|
// Map<String, String> statusMap = query();
|
for (String key : sortkeysets) {
|
boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务
|
//如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑
|
if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*").contains(GlobalConst.PIPL_UNCONFIRM_FILE + key))
|
&& isExecute) {
|
String txid = hashmap.get(key);
|
try {
|
//获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key
|
List<String> collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList());
|
if (CollUtil.isNotEmpty(collect)) {
|
String tId = collect.get(0);
|
Map<String, String> stringStringMap = statusMap.get(tId);
|
String status = stringStringMap.get("status");
|
String sfRecordId = stringStringMap.get("sfRecordId");
|
if ("success".equals(status)) {
|
// confirmTxService.confirmFile(key);
|
redisUtil.deleteKey(txid);
|
}else {
|
List<String> keyList = redisUtil.getObjects(txid, String.class);
|
try {
|
fileService.cleanS3File(keyList);
|
}catch (Exception e){
|
log.error(e.getMessage(), e);
|
throw new BizException("clean s3 file fail");
|
}
|
redisUtil.deleteKey(txid);
|
}
|
}
|
} catch (BizException e) {
|
log.error(e.getMessage(), e);
|
}finally {
|
distributedLock.releaseLock("guaranteed:"+key+":lock");
|
}
|
}
|
}
|
}catch (Exception e){
|
log.error(e.getMessage(), e);
|
}finally {
|
if(masterisLock){
|
distributedLock.releaseLock("guaranteedLock-master-file");
|
}
|
if (isLock){
|
distributedLock.releaseLock("guaranteedLock-slave-file");
|
}
|
}
|
}
|
}
|
|
|
/**
|
* 同步表中的sfid(每天两点执行)
|
*/
|
@Scheduled(cron = "0 0 2 * * ?")
|
public void executeSyncSfid() {
|
boolean isLock = false;
|
boolean masterisLock = distributedLock.getLock("guaranteedLock-syncSfid-master",3600 * 1000);
|
if(!masterisLock){
|
isLock = distributedLock.getLock("guaranteedLock-syncSfid-slave",3600 * 1000);
|
}
|
if(masterisLock || isLock) { //确保两台服务器都能同时跑又不会重复跑
|
ArrayList<String> lockKeys = new ArrayList<>();
|
try {
|
//aws tableName与sf tableName的映射
|
List<String> allAwsTableName = TableNameEnum.getAllAwsTableName();
|
for (String awsTableName:allAwsTableName) {
|
List<Long> ids = Db.query("select id from `"+awsTableName+"` where is_delete = '0' and create_time>date_add(now(),interval -2 MONTH) and sf_record_id is null");
|
log.info("同步表{},查询到sf_id为空的数据共{}条",awsTableName,ids.size());
|
if (CollUtil.isEmpty(ids)) {
|
continue;
|
}
|
//加表名锁,阻止多个服务同时更新一张表
|
//判断是否有机器在执行事务
|
boolean isExecute = distributedLock.getLock("guaranteed-syncSfid:"+awsTableName+":lock",3600 * 1000);
|
if(isExecute) {
|
lockKeys.add(awsTableName);
|
String sfTableName = TableNameEnum.getSfTableNameByAwsTableName(awsTableName);
|
//截取ids每50条执行一次
|
int skip = 0;
|
int sub = 50;
|
List<String> collect = null;
|
String idColumn = "AWS_Data_Id__c";
|
while (collect==null||collect.size()>=sub) {
|
collect = ids.stream().skip(skip).limit(sub).map(String::valueOf).collect(Collectors.toList());
|
skip += sub;
|
String sql = "select Id,"+idColumn+" from "+sfTableName+" where "+idColumn+" in "+collect.stream().collect(Collectors.joining("','", "('", "')"));
|
JSONArray jsonArray = sfService.querySFData(sql);
|
if (jsonArray != null && jsonArray.size() != 0) {
|
for (int i = 0;i<jsonArray.size();i++) {
|
JSONObject data = jsonArray.getJSONObject(i);
|
String sfId = data.getString("Id");
|
String awsId = data.getString(idColumn);
|
Db.update("update `"+awsTableName+"` set sf_record_id = '"+ sfId +"' where id = "+awsId);
|
log.info("更新{} sfid,id:{},sfid:{}",awsTableName,awsId,sfId);
|
}
|
}
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error("同步contact sfid异常:"+e.getMessage(), e);
|
}finally {
|
if(masterisLock){
|
distributedLock.releaseLock("guaranteedLock-syncSfid-master");
|
}
|
if (isLock){
|
distributedLock.releaseLock("guaranteedLock-syncSfid-slave");
|
}
|
//释放数据锁
|
if(CollUtil.isNotEmpty(lockKeys)){
|
for (String awsTableName:lockKeys) {
|
distributedLock.releaseLock("guaranteed-syncSfid:"+awsTableName+":lock");
|
}
|
}
|
}
|
}
|
}
|
|
|
public Map<String,Map<String,String>> queryTransactionStatus(List<String> sortkeysets){
|
SFTokenDto tokenDto = sfService.getToken();
|
String accessToken = tokenDto.getAccessToken();
|
HttpHeaders header = new HttpHeaders();
|
header.add("Authorization", "Bearer " + accessToken);
|
StringBuilder idString = new StringBuilder();
|
// 拼接URL
|
String ids = sortkeysets.stream().collect(Collectors.joining("','", "('", "')"));
|
idString.append(baseUrl).append("services/data/v53.0/query/?q=Select Status__c,TransId__c,SFRecordId__c from Transaction_Log__c Where TransId__c in ").append(ids);
|
// 请求数据
|
JSONObject jsonObject = RestUtil.get(idString.toString(),header);
|
Map<String, Map<String,String>> map = new HashMap<>();
|
List<JSONObject> objects = new ArrayList<>();
|
Object totalSize = jsonObject.get("totalSize");
|
Boolean done =(Boolean) jsonObject.get("done");
|
String nextRecordsUrl = jsonObject.getString("nextRecordsUrl");
|
JSONArray jsonArray = jsonObject.getJSONArray("records");
|
|
if (totalSize == null) {
|
log.info("请求事务确认数据失败,result:{}",jsonObject);
|
throw new BizException("请求事务确认数据失败");
|
}
|
for (int i = 0; i <jsonArray.size() ; i++) {
|
JSONObject data = jsonArray.getJSONObject(i);
|
String status = data.getString("Status__c");
|
String transId = data.getString("TransId__c");
|
String sfRecordId = data.getString("SFRecordId__c");
|
Map<String, String> mapStatus = new HashMap<>();
|
mapStatus.put("status",status);
|
mapStatus.put("sfRecordId",sfRecordId);
|
map.put(transId,mapStatus);
|
}
|
return map;
|
}
|
|
public Map<String, String> queryNextRecords(String nextRecordsUrl,Map<String,String> map){
|
SFTokenDto tokenDto = sfService.getToken();
|
String accessToken = tokenDto.getAccessToken();
|
HttpHeaders header = new HttpHeaders();
|
header.add("Authorization", "Bearer " + accessToken);
|
JSONObject jsonObject = RestUtil.get(nextRecordsUrl,header);
|
Boolean done =(Boolean) jsonObject.get("done");
|
JSONArray jsonArray = jsonObject.getJSONArray("records");
|
Object totalSize = jsonObject.get("totalSize");
|
if (totalSize == null) {
|
return map;
|
}
|
for (int i = 0; i <jsonArray.size() ; i++) {
|
JSONObject data = jsonArray.getJSONObject(i);
|
String status = data.getString("Status__c");
|
String transId = data.getString("TransId__c");
|
map.put(transId,status);
|
}
|
return map;
|
}
|
}
|