BSC链上监听钱包是否收到USDT转账
使用Django 的Management方式监听BSC链上钱包的收款消息
class Command(BaseCommand):
help = 'Listen for incoming transactions to a Binance Smart Chain wallet'
def handle(self, *args, **kwargs):
# 初始化Web3连接
web3 = Web3(Web3.HTTPProvider(settings.BSC_RPC_URL, request_kwargs={'timeout': 60}))
token_contract_address = Web3.to_checksum_address(settings.TOKEN_CONTRACT_ADDRESS)
wallet_address = get_setting('node_wallet_address', '')
wallet_address = web3.to_checksum_address(wallet_address) # 替换为要监听的钱包地址
# 检查连接是否成功
if not web3.is_connected():
print("Failed to connect to BSC network")
return
print('connect to BSC network success')
# 加载ERC-20 ABI
erc20_abi = load_abi(settings.CONTRACT_ABI_PATH)
if not erc20_abi:
print(f"警告: {settings.CONTRACT_ABI_PATH} 未找到。使用最小ERC20_ABI_SNIPPET。")
return
# 创建合约实例
token_contract = web3.eth.contract(address=token_contract_address, abi=erc20_abi)
# 创建事件过滤器,监听指定钱包地址的转账事件
event_filter = token_contract.events.Transfer.create_filter(
from_block='latest',
argument_filters={'to': wallet_address}
)
def handle_event(event):
# print(f"Transfer event detected: {event}")
from_address = event.args['from']
to_address = event.args['to']
# print(event.args['value'])
value_raw = decimal.Decimal(event.args['value'] / 1000000000000000000).quantize(decimal.Decimal("0.00"))
print(value_raw)
wallet_get_transfer(from_address, value_raw)
# 保持命令运行
self.stdout.write(self.style.SUCCESS('Listening... Press Ctrl+C to stop.'))
while True:
for event in event_filter.get_new_entries():
handle_event(event)
time.sleep(2)
使用服务来运行django management
在etc/systemd/system/ 下创建 listen_bsc.services 文件
[Unit]
Description=Django Management Command Listen BSC
After=network.target
[Service]
User=root
Group=root
WorkingDirectory=/www/wwwroot/site_name
Environment="PATH=/root/webenv/bin"
ExecStart=/root/webenv/bin/python manage.py listen_bsc
[Install]
WantedBy=multi-user.target
然后
运行
systemctl start listen_bsc
启动 listen_bsc 服务
文档信息
版权声明:可自由转载(请注明转载出处)-非商用-非衍生
发表时间:2025年7月1日 10:46