BSC链上监听钱包是否收到USDT转账

使用Django 的Management方式监听BSC链上钱包的收款消息

class Command(BaseCommand):
    help = 'Listen for incoming transactions to a Binance Smart Chain wallet'

    def handle(self, *args, **kwargs):
        # 初始化Web3连接
        web3 = Web3(Web3.HTTPProvider(settings.BSC_RPC_URL, request_kwargs={'timeout': 60}))

        token_contract_address = Web3.to_checksum_address(settings.TOKEN_CONTRACT_ADDRESS)

        wallet_address = get_setting('node_wallet_address', '')
        wallet_address = web3.to_checksum_address(wallet_address)  # 替换为要监听的钱包地址

        # 检查连接是否成功
        if not web3.is_connected():
            print("Failed to connect to BSC network")
            return
        print('connect to BSC network success')
        # 加载ERC-20 ABI
        erc20_abi = load_abi(settings.CONTRACT_ABI_PATH)
        if not erc20_abi:
            print(f"警告: {settings.CONTRACT_ABI_PATH} 未找到。使用最小ERC20_ABI_SNIPPET。")
            return
        # 创建合约实例
        token_contract = web3.eth.contract(address=token_contract_address, abi=erc20_abi)

        # 创建事件过滤器,监听指定钱包地址的转账事件
        event_filter = token_contract.events.Transfer.create_filter(
            from_block='latest',
            argument_filters={'to': wallet_address}
        )

        def handle_event(event):
            # print(f"Transfer event detected: {event}")
            from_address = event.args['from']
            to_address = event.args['to']
            # print(event.args['value'])
            value_raw = decimal.Decimal(event.args['value'] / 1000000000000000000).quantize(decimal.Decimal("0.00"))
            print(value_raw)
            wallet_get_transfer(from_address, value_raw)

        # 保持命令运行
        self.stdout.write(self.style.SUCCESS('Listening... Press Ctrl+C to stop.'))
        while True:
            for event in event_filter.get_new_entries():
                handle_event(event)
            time.sleep(2)

使用服务来运行django management

在etc/systemd/system/ 下创建 listen_bsc.services 文件

[Unit]
Description=Django Management Command Listen BSC
After=network.target

[Service]
User=root
Group=root
WorkingDirectory=/www/wwwroot/site_name
Environment="PATH=/root/webenv/bin"
ExecStart=/root/webenv/bin/python manage.py listen_bsc

[Install]
WantedBy=multi-user.target

然后

运行

systemctl start listen_bsc 

启动 listen_bsc 服务

文档信息

版权声明:可自由转载(请注明转载出处)-非商用-非衍生

发表时间:2025年7月1日 10:46