package com.deloitte.system.service;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.util.StrUtil;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.common.annotation.Table;
|
import com.common.aws.util.S3Util;
|
import com.common.core.constant.Constants;
|
import com.common.core.constant.GlobalConst;
|
import com.common.core.domain.BaseModel;
|
import com.common.core.enums.ResultCodeEnum;
|
import com.common.core.enums.YesNoEnum;
|
import com.common.core.exception.BizException;
|
import com.common.core.utils.DistributedLock;
|
import com.common.core.utils.KitClassUtils;
|
import com.common.core.utils.ModelUtils;
|
import com.common.core.utils.StringUtils;
|
import com.common.redis.util.RedisUtil;
|
import com.deloitte.system.request.TxConfirmDto;
|
import com.jfinal.plugin.activerecord.Db;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
|
@Slf4j
|
@Service
|
public class ConfirmTxService {
|
|
@Autowired
|
private RedisUtil redisUtil;
|
|
@Autowired
|
private DistributedLock distributedLock;
|
@Value("${aws.s3.bucketName}")
|
private String bucketName;
|
|
public void confirm(TxConfirmDto dto) {
|
String txid=dto.getTxId();
|
String sfRecordId = dto.getSfRecordId();
|
if(null== redisUtil.get(GlobalConst.PIPL_UNCONFIRM_INSERT + txid) && null == redisUtil.get(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid)){
|
throw new BizException("不存在的事务ID(txid)");
|
}
|
//遍历insert
|
Object object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_INSERT + txid);
|
if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) {
|
try {
|
log.info(object.toString());
|
if(YesNoEnum.YES.getCode().equals(dto.getIsSuccess())) {
|
List<BaseModel> models = generateModelInfo(object, sfRecordId,dto.getIdList(), true);
|
Db.tx(() -> {
|
for (BaseModel model : models) {
|
if(!model.save()) return false;
|
}
|
return true;
|
});
|
}
|
redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_INSERT + txid);
|
}catch (Exception e){
|
log.error(e.getMessage(), e);
|
throw new BizException(ResultCodeEnum.RT_ERROR);
|
}finally {
|
distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid);
|
}
|
}
|
|
//遍历update
|
object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid);
|
if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) {
|
try {
|
log.info(object.toString());
|
if(YesNoEnum.YES.getCode().equals(dto.getIsSuccess())) {
|
List<BaseModel> models = generateModelInfo(object,sfRecordId,dto.getIdList(),false);
|
Db.tx(() -> {
|
for (BaseModel model : models) {
|
if(!model.update()) return false;
|
}
|
return true;
|
});
|
}
|
redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid);
|
}catch (Exception e){
|
log.error(e.getMessage(), e);
|
throw new BizException(ResultCodeEnum.RT_ERROR);
|
}finally {
|
distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid);
|
}
|
}
|
}
|
|
public void confirmFile(TxConfirmDto dto) {
|
String txid = dto.getTxId();
|
if(null== redisUtil.get(GlobalConst.PIPL_UNCONFIRM_FILE + txid)){
|
throw new BizException("不存在的事务ID(txid)");
|
}
|
Object object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_FILE + txid);
|
if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) {
|
try {
|
if(YesNoEnum.NO.getCode().equals(dto.getIsSuccess())) {
|
List<String> keys = JSONArray.parseArray(object.toString(),String.class);
|
S3Util.deleteByKeys(keys,bucketName);
|
}
|
redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_FILE + txid);
|
}catch (Exception e){
|
log.error(e.getMessage(), e);
|
throw new BizException(ResultCodeEnum.RT_ERROR);
|
}finally {
|
distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid);
|
}
|
}
|
}
|
|
private List<BaseModel> generateModelInfo(Object object, String sfRecordId, List<TxConfirmDto.ConfimrListDto> idList,boolean isCreate) throws InstantiationException, IllegalAccessException {
|
JSONObject jsonobject = JSONObject.parseObject(object.toString());
|
JSONArray dataArray = jsonobject.getJSONArray("data");
|
String table_name = jsonobject.getString("tableName");
|
List<BaseModel> models = new ArrayList<>();
|
Map<String,String> idMap= null;
|
if(CollectionUtil.isNotEmpty(idList)){
|
idMap=idList.stream().collect(Collectors.toMap(TxConfirmDto.ConfimrListDto::getAwsId, TxConfirmDto.ConfimrListDto::getSfRecordId));
|
}
|
for (Class<?> clazz : KitClassUtils.tableclass) {
|
Table table = clazz.getAnnotation(Table.class);
|
String tablename = table.tableName();
|
if (tablename.equals(table_name)) {
|
Class<? extends BaseModel<?>> classmodel = table.clazz();
|
for (int i = 0; i < dataArray.size(); i++) {
|
JSONObject jsonObject = dataArray.getJSONObject(i);
|
BaseModel baseModel = ModelUtils.JsonToModel(jsonObject, classmodel.newInstance());
|
if(idMap!=null && CollectionUtil.isNotEmpty(idMap)){
|
baseModel.set("sf_record_id",idMap.get(baseModel.getStr("id")));
|
}else {
|
if (StrUtil.isNotBlank(sfRecordId)) {
|
baseModel.set("sf_record_id", sfRecordId);
|
}
|
}
|
if(isCreate){
|
ModelUtils.generateCommonInfo(baseModel, Constants.ACTION_CREATE);
|
}else {
|
ModelUtils.generateCommonInfo(baseModel, Constants.ACTION_UPDATE);
|
}
|
models.add(baseModel);
|
}
|
}
|
}
|
return models;
|
}
|
}
|