package com.deloitte.system.service; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.common.annotation.Table; import com.common.aws.util.S3Util; import com.common.core.constant.Constants; import com.common.core.constant.GlobalConst; import com.common.core.domain.BaseModel; import com.common.core.enums.ResultCodeEnum; import com.common.core.enums.YesNoEnum; import com.common.core.exception.BizException; import com.common.core.utils.DistributedLock; import com.common.core.utils.KitClassUtils; import com.common.core.utils.ModelUtils; import com.common.core.utils.StringUtils; import com.common.redis.util.RedisUtil; import com.deloitte.system.request.TxConfirmDto; import com.jfinal.plugin.activerecord.Db; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @Slf4j @Service public class ConfirmTxService { @Autowired private RedisUtil redisUtil; @Autowired private DistributedLock distributedLock; @Value("${aws.s3.bucketName}") private String bucketName; public void confirm(TxConfirmDto dto) { String txid=dto.getTxId(); String sfRecordId = dto.getSfRecordId(); if(null== redisUtil.get(GlobalConst.PIPL_UNCONFIRM_INSERT + txid) && null == redisUtil.get(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid)){ throw new BizException("不存在的事务ID(txid)"); } //遍历insert Object object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_INSERT + txid); if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) { try { log.info(object.toString()); if(YesNoEnum.YES.getCode().equals(dto.getIsSuccess())) { List models = generateModelInfo(object, sfRecordId,dto.getIdList(), true); Db.tx(() -> { for (BaseModel model : models) { if(!model.save()) return false; } return true; }); } redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_INSERT + txid); }catch (Exception e){ log.error(e.getMessage(), e); throw new BizException(ResultCodeEnum.RT_ERROR); }finally { distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid); } } //遍历update object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid); if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) { try { log.info(object.toString()); if(YesNoEnum.YES.getCode().equals(dto.getIsSuccess())) { List models = generateModelInfo(object,sfRecordId,dto.getIdList(),false); Db.tx(() -> { for (BaseModel model : models) { if(!model.update()) return false; } return true; }); } redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_UPDATE + txid); }catch (Exception e){ log.error(e.getMessage(), e); throw new BizException(ResultCodeEnum.RT_ERROR); }finally { distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid); } } } public void confirmFile(TxConfirmDto dto) { String txid = dto.getTxId(); if(null== redisUtil.get(GlobalConst.PIPL_UNCONFIRM_FILE + txid)){ throw new BizException("不存在的事务ID(txid)"); } Object object = redisUtil.get(GlobalConst.PIPL_UNCONFIRM_FILE + txid); if(object!=null && StringUtils.isNotEmpty(object.toString()) && distributedLock.getLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid,3600 * 1000)) { try { if(YesNoEnum.NO.getCode().equals(dto.getIsSuccess())) { List keys = JSONArray.parseArray(object.toString(),String.class); S3Util.deleteByKeys(keys,bucketName); } redisUtil.deleteKey(GlobalConst.PIPL_UNCONFIRM_FILE + txid); }catch (Exception e){ log.error(e.getMessage(), e); throw new BizException(ResultCodeEnum.RT_ERROR); }finally { distributedLock.releaseLock(GlobalConst.PIPL_UNCONFIRM_LOCK +txid); } } } private List generateModelInfo(Object object, String sfRecordId, List idList,boolean isCreate) throws InstantiationException, IllegalAccessException { JSONObject jsonobject = JSONObject.parseObject(object.toString()); JSONArray dataArray = jsonobject.getJSONArray("data"); String table_name = jsonobject.getString("tableName"); List models = new ArrayList<>(); Map idMap= null; if(CollectionUtil.isNotEmpty(idList)){ idMap=idList.stream().collect(Collectors.toMap(TxConfirmDto.ConfimrListDto::getAwsId, TxConfirmDto.ConfimrListDto::getSfRecordId)); } for (Class clazz : KitClassUtils.tableclass) { Table table = clazz.getAnnotation(Table.class); String tablename = table.tableName(); if (tablename.equals(table_name)) { Class> classmodel = table.clazz(); for (int i = 0; i < dataArray.size(); i++) { JSONObject jsonObject = dataArray.getJSONObject(i); BaseModel baseModel = ModelUtils.JsonToModel(jsonObject, classmodel.newInstance()); if(idMap!=null && CollectionUtil.isNotEmpty(idMap)){ baseModel.set("sf_record_id",idMap.get(baseModel.getStr("id"))); }else { if (StrUtil.isNotBlank(sfRecordId)) { baseModel.set("sf_record_id", sfRecordId); } } if(isCreate){ ModelUtils.generateCommonInfo(baseModel, Constants.ACTION_CREATE); }else { ModelUtils.generateCommonInfo(baseModel, Constants.ACTION_UPDATE); } models.add(baseModel); } } } return models; } }