ETH充值到账提醒程序
@Data
public abstract class Watcher implements Runnable{
private Logger logger = LoggerFactory.getLogger(Watcher.class);
private boolean stop = false;
//默认同步间隔20秒
private Long checkInterval = 2000L;
private Long currentBlockHeight = 0L;
private int step = 5;
//区块确认数
private int confirmation = 3;
private DepositEvent depositEvent;
private Coin coin;
private WatcherLogService watcherLogService;
public void check(){
try {
Long networkBlockNumber = getNetworkBlockHeight() - confirmation + 1;
if (currentBlockHeight < networkBlockNumber) {
long startBlockNumber = currentBlockHeight + 1;
currentBlockHeight = (networkBlockNumber - currentBlockHeight > step) ? currentBlockHeight + step : networkBlockNumber;
logger.info("replay block from {} to {}", startBlockNumber, currentBlockHeight);
List<Deposit> deposits = replayBlock(startBlockNumber, currentBlockHeight);
if(deposits != null) {
deposits.forEach(deposit -> {
depositEvent.onConfirmed(deposit);
});
//记录日志
watcherLogService.update(coin.getName(), currentBlockHeight);
}else {
logger.info("扫块失败!!!");
// 未扫描成功
currentBlockHeight = startBlockNumber - 1;
}
} else {
logger.info("Already latest height: {}, networkBlockHeight: {},nothing to do!", currentBlockHeight, networkBlockNumber);
}
}
catch (Exception e){
e.printStackTrace();
}
}
public abstract List<Deposit> replayBlock(Long startBlockNumber, Long endBlockNumber);
public abstract Long getNetworkBlockHeight();
@Override
public void run() {
stop = false;
long nextCheck = 0;
while(!(Thread.interrupted() || stop)) {
if (nextCheck <= System.currentTimeMillis()) {
try {
nextCheck = System.currentTimeMillis() + checkInterval;
logger.info("check...");
check();
} catch (Exception ex) {
logger.info(ex.getMessage());
}
}
else {
try {
Thread.sleep(Math.max(nextCheck - System.currentTimeMillis(), 100));
} catch (InterruptedException ex) {
logger.info(ex.getMessage());
}
}
}
}
}
//////////////////////////////////////////////////////////////
@Component
public class EthWatcher extends Watcher{
private Logger logger = LoggerFactory.getLogger(EthWatcher.class);
@Autowired
private Web3j web3j;
@Autowired
private EthService ethService;
@Autowired
private AccountService accountService;
@Autowired
private DepositEvent depositEvent;
@Override
public List<Deposit> replayBlock(Long startBlockNumber, Long endBlockNumber) {
List<Deposit> deposits = new ArrayList<>();
try {
for (Long i = startBlockNumber; i <= endBlockNumber; i++) {
EthBlock block = web3j.ethGetBlockByNumber(new DefaultBlockParameterNumber(i), true).send();
block.getBlock().getTransactions().stream().forEach(transactionResult -> {
EthBlock.TransactionObject transactionObject = (EthBlock.TransactionObject) transactionResult;
Transaction transaction = transactionObject.get();
if (StringUtils.isNotEmpty(transaction.getTo())
&& accountService.isAddressExist(transaction.getTo())
&& !transaction.getFrom().equalsIgnoreCase(getCoin().getIgnoreFromAddress())) {
Deposit deposit = new Deposit();
deposit.setTxid(transaction.getHash());
deposit.setBlockHeight(transaction.getBlockNumber().longValue());
deposit.setBlockHash(transaction.getBlockHash());
deposit.setAmount(Convert.fromWei(transaction.getValue().toString(), Convert.Unit.ETHER));
deposit.setAddress(transaction.getTo());
deposits.add(deposit);
logger.info("received coin {} at height {}", transaction.getValue(), transaction.getBlockNumber());
//同步余额
try {
ethService.syncAddressBalance(deposit.getAddress());
} catch (Exception e) {
e.printStackTrace();
}
}
//如果是地址簿里转出去的地址,需要同步余额
if (StringUtils.isNotEmpty(transaction.getTo()) && accountService.isAddressExist(transaction.getFrom())) {
logger.info("sync address:{} balance", transaction.getFrom());
try {
ethService.syncAddressBalance(transaction.getFrom());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
catch (Exception e){
e.printStackTrace();
}
return deposits;
}
public synchronized int replayBlockInit(Long startBlockNumber,Long endBlockNumber) throws IOException {
int count = 0;
for(Long i=startBlockNumber;i <= endBlockNumber;i++){
EthBlock block = web3j.ethGetBlockByNumber(new DefaultBlockParameterNumber(i),true).send();
block.getBlock().getTransactions().stream().forEach(transactionResult -> {
EthBlock.TransactionObject transactionObject = (EthBlock.TransactionObject) transactionResult;
Transaction transaction = transactionObject.get();
if(StringUtils.isNotEmpty(transaction.getTo())
&& accountService.isAddressExist(transaction.getTo())
&& !transaction.getFrom().equalsIgnoreCase(getCoin().getIgnoreFromAddress())) {
Deposit deposit = new Deposit();
deposit.setTxid(transaction.getHash());
deposit.setBlockHeight(transaction.getBlockNumber().longValue());
deposit.setBlockHash(transaction.getBlockHash());
deposit.setAmount(Convert.fromWei(transaction.getValue().toString(), Convert.Unit.ETHER));
deposit.setAddress(transaction.getTo());
logger.info("received coin {} at height {}",transaction.getValue(),transaction.getBlockNumber());
depositEvent.onConfirmed(deposit);
//同步余额
try {
ethService.syncAddressBalance(deposit.getAddress());
}
catch (Exception e){
e.printStackTrace();
}
}
//如果是地址簿里转出去的地址,需要同步余额
if(StringUtils.isNotEmpty(transaction.getTo()) && accountService.isAddressExist(transaction.getFrom())) {
logger.info("sync address:{} balance",transaction.getFrom());
try {
ethService.syncAddressBalance(transaction.getFrom());
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
return count;
}
@Override
public Long getNetworkBlockHeight() {
try {
EthBlockNumber blockNumber = web3j.ethBlockNumber().send();
return blockNumber.getBlockNumber().longValue();
}catch (Exception e){
e.printStackTrace();
return 0L;
}
}
}
全部评论