测试用户
2023-04-13 43393f2bb11cbf9e6af40077bbc5284660e8a754
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
package com.deloitte.system.job;
 
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.common.core.constant.GlobalConst;
import com.common.core.enums.TableNameEnum;
import com.common.core.enums.YesNoEnum;
import com.common.core.exception.BizException;
import com.common.core.service.SFService;
import com.common.core.utils.DistributedLock;
import com.common.core.utils.RestUtil;
import com.common.redis.util.RedisUtil;
import com.deloitte.system.request.SFTokenDto;
import com.deloitte.system.request.TxConfirmDto;
import com.deloitte.system.service.ConfirmTxService;
import com.deloitte.system.service.FileService;
import com.jfinal.plugin.activerecord.Db;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
 
import java.util.*;
import java.util.stream.Collectors;
 
/**
 * @author 廖振钦
 * @company deloitte
 * @date 2022-01-26
 * @descrition:
 *     兜底机制定时任务TODO
 * */
@Slf4j
@Component
public class GuaranteedTask {
    @Autowired
    private FileService fileService;
    @Autowired
    private SFService sfService;
 
    @Autowired
    private ConfirmTxService confirmTxService;
 
    @Autowired
    private RedisUtil redisUtil;
 
    @Autowired
    private DistributedLock distributedLock;
    @Value("${salesforce.baseUrl}")
    private String baseUrl;
    /**
     * 每分钟跑一次
     */
    @Scheduled(cron = "0 * * * * ?")
    public void execute() {
        boolean isLock = false;
        boolean masterisLock = distributedLock.getLock("guaranteedLock-master",3600 * 1000);
        if(!masterisLock){
            isLock = distributedLock.getLock("guaranteedLock-slave",3600 * 1000);
        }
        if(masterisLock || isLock) {  //确保两台服务器都能同时跑又不会重复跑
            try {
                List<String> sets = new ArrayList<>();
                Set<String> insertsets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*");
                Set<String> updatesets = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*");
                sets.addAll(insertsets);
                sets.addAll(updatesets);
                Map<String, String> hashmap = new HashMap<>();
                for (String key : sets) {
                    String[] keys = key.split(":");
                    hashmap.put(keys[keys.length - 1], key);
                }
                List<String> sortkeysets = hashmap.keySet().stream().sorted(new Comparator<String>(){
                    @Override
                    public int compare(String o1, String o2) {
                        Long o1L = Long.parseLong(o1);
                        Long o2L = Long.parseLong(o2);
                        Long v = o2L-o1L; //倒序
//                        Long v = o1L-o2L; //顺序
                        if(v > 0){
                            return 1;
                        }else if(v < 0){
                            return -1;
                        }else {
                            return 0;
                        }
                    }
                }).limit(50).collect(Collectors.toList());
                if (CollUtil.isEmpty(sortkeysets)) {
                    return;
                }
                // 调用SF查询事务状态的接口,传入txId List,将查询的状态结果放在map,key为txId,value为处理状态
                Map<String, Map<String,String>> statusMap = queryTransactionStatus(sortkeysets);
//                Map<String, Map<String, String>> statusMap = query();
                for (String key : sortkeysets) {
                    boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务
                    //如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑
                    if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_INSERT + "*").contains(GlobalConst.PIPL_UNCONFIRM_INSERT + key)
                    || redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_UPDATE + "*").contains(GlobalConst.PIPL_UNCONFIRM_UPDATE + key))
                    && isExecute){
                        String txid = hashmap.get(key);
                        try {
                            // 获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key
                            List<String> collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList());
                            if (CollUtil.isNotEmpty(collect)) {
                                String tId = collect.get(0);
                                Map<String, String> stringStringMap = statusMap.get(tId);
                                String status = stringStringMap.get("status");
                                String sfRecordId = stringStringMap.get("sfRecordId");
                                if ("success".equals(status)) {
                                    TxConfirmDto dto=new TxConfirmDto(key,sfRecordId, YesNoEnum.YES.getCode(),null);
                                    confirmTxService.confirm(dto);
                                }else {
                                    redisUtil.deleteKey(txid);
                                }
                            }
                        } catch (BizException e) {
                            log.error(e.getMessage(), e);
                        }finally {
                            distributedLock.releaseLock("guaranteed:"+key+":lock");
                        }
                    }
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }finally {
                if(masterisLock){
                    distributedLock.releaseLock("guaranteedLock-master");
                }
                if (isLock){
                    distributedLock.releaseLock("guaranteedLock-slave");
                }
            }
        }
    }
    /**
     * 每分钟跑一次
     */
    @Scheduled(cron = "0 * * * * ?")
    public void executeFile() {
        boolean isLock = false;
        boolean masterisLock = distributedLock.getLock("guaranteedLock-master-file",3600 * 1000);
        if(!masterisLock){
            isLock = distributedLock.getLock("guaranteedLock-slave-file",3600 * 1000);
        }
        if(masterisLock || isLock) {  //确保两台服务器都能同时跑又不会重复跑
            try{
                List<String> sets = new ArrayList<>();
                Set<String> fileKeys = redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*");
                sets.addAll(fileKeys);
                Map<String, String> hashmap = new HashMap<>();
                for (String key : sets) {
                    String[] keys = key.split(":");
                    hashmap.put(keys[keys.length - 1], key);
                }
                List<String> sortkeysets = hashmap.keySet().stream().sorted(new Comparator<String>(){
                    @Override
                    public int compare(String o1, String o2) {
                        Long o1L = Long.parseLong(o1);
                        Long o2L = Long.parseLong(o2);
                        Long v = o2L-o1L; //倒序
//                        Long v = o1L-o2L; //顺序
                        if(v > 0){
                            return 1;
                        }else if(v < 0){
                            return -1;
                        }else {
                            return 0;
                        }
                    }
                }).limit(50).collect(Collectors.toList());
                if (CollUtil.isEmpty(sortkeysets)) {
                    return;
                }
                Map<String, Map<String, String>> statusMap = queryTransactionStatus(sortkeysets);
//                Map<String, String> statusMap = query();
                for (String key : sortkeysets) {
                    boolean isExecute = distributedLock.getLock("guaranteed:"+key+":lock",3600 * 1000);//判断是否有机器在执行事务
                    //如果redis里面不存在这个key与没有机器在执行该事务确认,就执行确认事务,防止多台服务器重复跑
                    if((redisUtil.getKeys(GlobalConst.PIPL_UNCONFIRM_FILE + "*").contains(GlobalConst.PIPL_UNCONFIRM_FILE + key))
                            && isExecute) {
                        String txid = hashmap.get(key);
                        try {
                            //获取该TXID的处理状态,如果成功,则确认事务,如果失败则删除key
                            List<String> collect = statusMap.keySet().stream().filter(i -> i.equals(key)).collect(Collectors.toList());
                            if (CollUtil.isNotEmpty(collect)) {
                                String tId = collect.get(0);
                                Map<String, String> stringStringMap = statusMap.get(tId);
                                String status = stringStringMap.get("status");
                                String sfRecordId = stringStringMap.get("sfRecordId");
                                if ("success".equals(status)) {
//                                    confirmTxService.confirmFile(key);
                                    redisUtil.deleteKey(txid);
                                }else {
                                    List<String> keyList = redisUtil.getObjects(txid, String.class);
                                    try {
                                        fileService.cleanS3File(keyList);
                                    }catch (Exception e){
                                        log.error(e.getMessage(), e);
                                        throw new BizException("clean s3 file fail");
                                    }
                                    redisUtil.deleteKey(txid);
                                }
                            }
                        } catch (BizException e) {
                            log.error(e.getMessage(), e);
                        }finally {
                            distributedLock.releaseLock("guaranteed:"+key+":lock");
                        }
                    }
                }
            }catch (Exception e){
                log.error(e.getMessage(), e);
            }finally {
                if(masterisLock){
                    distributedLock.releaseLock("guaranteedLock-master-file");
                }
                if (isLock){
                    distributedLock.releaseLock("guaranteedLock-slave-file");
                }
            }
        }
    }
 
 
    /**
     * 同步表中的sfid(每天两点执行)
     */
    @Scheduled(cron = "0 0 2 * * ?")
    public void executeSyncSfid() {
        boolean isLock = false;
        boolean masterisLock = distributedLock.getLock("guaranteedLock-syncSfid-master",3600 * 1000);
        if(!masterisLock){
            isLock = distributedLock.getLock("guaranteedLock-syncSfid-slave",3600 * 1000);
        }
        if(masterisLock || isLock) {  //确保两台服务器都能同时跑又不会重复跑
            ArrayList<String> lockKeys = new ArrayList<>();
            try {
                //aws tableName与sf tableName的映射
                List<String> allAwsTableName = TableNameEnum.getAllAwsTableName();
                for (String awsTableName:allAwsTableName) {
                    List<Long> ids = Db.query("select id from `"+awsTableName+"` where is_delete = '0' and create_time>date_add(now(),interval -2 MONTH) and sf_record_id is null");
                    log.info("同步表{},查询到sf_id为空的数据共{}条",awsTableName,ids.size());
                    if (CollUtil.isEmpty(ids)) {
                        continue;
                    }
                    //加表名锁,阻止多个服务同时更新一张表
                    //判断是否有机器在执行事务
                    boolean isExecute = distributedLock.getLock("guaranteed-syncSfid:"+awsTableName+":lock",3600 * 1000);
                    if(isExecute) {
                        lockKeys.add(awsTableName);
                        String sfTableName = TableNameEnum.getSfTableNameByAwsTableName(awsTableName);
                        //截取ids每50条执行一次
                        int skip = 0;
                        int sub = 50;
                        List<String> collect = null;
                        String idColumn = "AWS_Data_Id__c";
                        while (collect==null||collect.size()>=sub) {
                            collect = ids.stream().skip(skip).limit(sub).map(String::valueOf).collect(Collectors.toList());
                            skip += sub;
                            String sql = "select Id,"+idColumn+"  from "+sfTableName+" where "+idColumn+" in "+collect.stream().collect(Collectors.joining("','", "('", "')"));
                            JSONArray jsonArray = sfService.querySFData(sql);
                            if (jsonArray != null && jsonArray.size() != 0) {
                                for (int i = 0;i<jsonArray.size();i++) {
                                    JSONObject data = jsonArray.getJSONObject(i);
                                    String sfId = data.getString("Id");
                                    String awsId = data.getString(idColumn);
                                    Db.update("update `"+awsTableName+"` set sf_record_id = '"+ sfId +"' where id = "+awsId);
                                    log.info("更新{} sfid,id:{},sfid:{}",awsTableName,awsId,sfId);
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                log.error("同步contact sfid异常:"+e.getMessage(), e);
            }finally {
                if(masterisLock){
                    distributedLock.releaseLock("guaranteedLock-syncSfid-master");
                }
                if (isLock){
                    distributedLock.releaseLock("guaranteedLock-syncSfid-slave");
                }
                //释放数据锁
                if(CollUtil.isNotEmpty(lockKeys)){
                    for (String awsTableName:lockKeys) {
                        distributedLock.releaseLock("guaranteed-syncSfid:"+awsTableName+":lock");
                    }
                }
            }
        }
    }
 
 
    public Map<String,Map<String,String>> queryTransactionStatus(List<String> sortkeysets){
        SFTokenDto tokenDto = sfService.getToken();
        String accessToken = tokenDto.getAccessToken();
        HttpHeaders header = new HttpHeaders();
        header.add("Authorization", "Bearer " + accessToken);
        StringBuilder idString = new StringBuilder();
        // 拼接URL
        String ids = sortkeysets.stream().collect(Collectors.joining("','", "('", "')"));
        idString.append(baseUrl).append("services/data/v53.0/query/?q=Select Status__c,TransId__c,SFRecordId__c from Transaction_Log__c Where TransId__c in ").append(ids);
        // 请求数据
        JSONObject jsonObject = RestUtil.get(idString.toString(),header);
        Map<String, Map<String,String>> map = new HashMap<>();
        List<JSONObject> objects = new ArrayList<>();
        Object totalSize = jsonObject.get("totalSize");
        Boolean done =(Boolean) jsonObject.get("done");
        String nextRecordsUrl = jsonObject.getString("nextRecordsUrl");
        JSONArray jsonArray = jsonObject.getJSONArray("records");
 
        if (totalSize == null) {
            log.info("请求事务确认数据失败,result:{}",jsonObject);
            throw new BizException("请求事务确认数据失败");
        }
        for (int i = 0; i <jsonArray.size() ; i++) {
            JSONObject data = jsonArray.getJSONObject(i);
            String status = data.getString("Status__c");
            String transId = data.getString("TransId__c");
            String sfRecordId = data.getString("SFRecordId__c");
            Map<String, String> mapStatus = new HashMap<>();
            mapStatus.put("status",status);
            mapStatus.put("sfRecordId",sfRecordId);
            map.put(transId,mapStatus);
        }
        return map;
    }
 
    public Map<String, String> queryNextRecords(String nextRecordsUrl,Map<String,String> map){
        SFTokenDto tokenDto = sfService.getToken();
        String accessToken = tokenDto.getAccessToken();
        HttpHeaders header = new HttpHeaders();
        header.add("Authorization", "Bearer " + accessToken);
        JSONObject jsonObject = RestUtil.get(nextRecordsUrl,header);
        Boolean done =(Boolean) jsonObject.get("done");
        JSONArray jsonArray = jsonObject.getJSONArray("records");
        Object totalSize = jsonObject.get("totalSize");
        if (totalSize == null) {
            return map;
        }
        for (int i = 0; i <jsonArray.size() ; i++) {
            JSONObject data = jsonArray.getJSONObject(i);
            String status = data.getString("Status__c");
            String transId = data.getString("TransId__c");
            map.put(transId,status);
        }
        return map;
    }
}