ETH充值到账提醒程序

2021-04-18 16:08
277
0
@Data
public abstract class Watcher implements Runnable{
    private Logger logger = LoggerFactory.getLogger(Watcher.class);
    private boolean stop = false;
    //默认同步间隔20秒
    private Long checkInterval = 2000L;
    private Long currentBlockHeight = 0L;
    private int step = 5;
    //区块确认数
    private int confirmation = 3;
    private DepositEvent depositEvent;
    private Coin coin;
    private WatcherLogService watcherLogService;

    public void check(){
        try {
            Long networkBlockNumber = getNetworkBlockHeight() - confirmation + 1;
            if (currentBlockHeight < networkBlockNumber) {
                long startBlockNumber = currentBlockHeight + 1;
                currentBlockHeight = (networkBlockNumber - currentBlockHeight > step) ? currentBlockHeight + step : networkBlockNumber;
                logger.info("replay block from {} to {}", startBlockNumber, currentBlockHeight);
                List<Deposit> deposits = replayBlock(startBlockNumber, currentBlockHeight);
                if(deposits != null) {
                 deposits.forEach(deposit -> {
                       depositEvent.onConfirmed(deposit);
                   });
                   //记录日志
                 watcherLogService.update(coin.getName(), currentBlockHeight);
              }else {
                 logger.info("扫块失败!!!");
                 // 未扫描成功
                 currentBlockHeight = startBlockNumber - 1;
              }
            } else {
                logger.info("Already latest height: {}, networkBlockHeight: {},nothing to do!", currentBlockHeight, networkBlockNumber);
            }
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }

    public abstract List<Deposit> replayBlock(Long startBlockNumber, Long endBlockNumber);

    public abstract Long getNetworkBlockHeight();

    @Override
    public void run() {
        stop = false;
        long nextCheck = 0;
        while(!(Thread.interrupted() || stop)) {
            if (nextCheck <= System.currentTimeMillis()) {
                try {
                    nextCheck = System.currentTimeMillis() + checkInterval;
                    logger.info("check...");
                    check();
                } catch (Exception ex) {
                    logger.info(ex.getMessage());
                }
            }
            else {
                try {
                    Thread.sleep(Math.max(nextCheck - System.currentTimeMillis(), 100));
                } catch (InterruptedException ex) {
                    logger.info(ex.getMessage());
                }
            }
        }
    }
}
//////////////////////////////////////////////////////////////
@Component
public class EthWatcher extends Watcher{
    private Logger logger = LoggerFactory.getLogger(EthWatcher.class);
    @Autowired
    private Web3j web3j;
    @Autowired
    private EthService ethService;
    @Autowired
    private AccountService accountService;

    @Autowired
    private DepositEvent depositEvent;

    @Override
    public List<Deposit>  replayBlock(Long startBlockNumber, Long endBlockNumber) {
        List<Deposit> deposits = new ArrayList<>();
        try {
            for (Long i = startBlockNumber; i <= endBlockNumber; i++) {
                EthBlock block = web3j.ethGetBlockByNumber(new DefaultBlockParameterNumber(i), true).send();

                block.getBlock().getTransactions().stream().forEach(transactionResult -> {
                    EthBlock.TransactionObject transactionObject = (EthBlock.TransactionObject) transactionResult;
                    Transaction transaction = transactionObject.get();
                    if (StringUtils.isNotEmpty(transaction.getTo())
                            && accountService.isAddressExist(transaction.getTo())
                            && !transaction.getFrom().equalsIgnoreCase(getCoin().getIgnoreFromAddress())) {
                        Deposit deposit = new Deposit();
                        deposit.setTxid(transaction.getHash());
                        deposit.setBlockHeight(transaction.getBlockNumber().longValue());
                        deposit.setBlockHash(transaction.getBlockHash());
                        deposit.setAmount(Convert.fromWei(transaction.getValue().toString(), Convert.Unit.ETHER));
                        deposit.setAddress(transaction.getTo());
                        deposits.add(deposit);
                        logger.info("received coin {} at height {}", transaction.getValue(), transaction.getBlockNumber());
                        //同步余额
                        try {
                            ethService.syncAddressBalance(deposit.getAddress());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    //如果是地址簿里转出去的地址,需要同步余额
                    if (StringUtils.isNotEmpty(transaction.getTo()) && accountService.isAddressExist(transaction.getFrom())) {
                        logger.info("sync address:{} balance", transaction.getFrom());
                        try {
                            ethService.syncAddressBalance(transaction.getFrom());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        }
        catch (Exception e){
            e.printStackTrace();
        }
        return deposits;
    }


    public synchronized int replayBlockInit(Long startBlockNumber,Long endBlockNumber) throws IOException {
        int count = 0;
        for(Long i=startBlockNumber;i <= endBlockNumber;i++){
            EthBlock block =  web3j.ethGetBlockByNumber(new DefaultBlockParameterNumber(i),true).send();

            block.getBlock().getTransactions().stream().forEach(transactionResult -> {
                EthBlock.TransactionObject transactionObject = (EthBlock.TransactionObject) transactionResult;
                Transaction transaction = transactionObject.get();
                if(StringUtils.isNotEmpty(transaction.getTo())
                        && accountService.isAddressExist(transaction.getTo())
                        && !transaction.getFrom().equalsIgnoreCase(getCoin().getIgnoreFromAddress())) {
                    Deposit deposit = new Deposit();
                    deposit.setTxid(transaction.getHash());
                    deposit.setBlockHeight(transaction.getBlockNumber().longValue());
                    deposit.setBlockHash(transaction.getBlockHash());
                    deposit.setAmount(Convert.fromWei(transaction.getValue().toString(), Convert.Unit.ETHER));
                    deposit.setAddress(transaction.getTo());
                    logger.info("received coin {} at height {}",transaction.getValue(),transaction.getBlockNumber());
                    depositEvent.onConfirmed(deposit);
                    //同步余额
                    try {
                        ethService.syncAddressBalance(deposit.getAddress());
                    }
                    catch (Exception e){
                        e.printStackTrace();
                    }
                }
                //如果是地址簿里转出去的地址,需要同步余额
                if(StringUtils.isNotEmpty(transaction.getTo()) && accountService.isAddressExist(transaction.getFrom())) {
                    logger.info("sync address:{} balance",transaction.getFrom());
                    try {
                        ethService.syncAddressBalance(transaction.getFrom());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        return count;
    }

    @Override
    public Long getNetworkBlockHeight() {
        try {
            EthBlockNumber blockNumber = web3j.ethBlockNumber().send();
            return blockNumber.getBlockNumber().longValue();
        }catch (Exception e){
            e.printStackTrace();
            return 0L;
        }
    }
}

全部评论