1.安装web3 包
pip install web3
2.代码
目录结构
bsc_usdt_abi.json
[{
"inputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "constructor"
},
{
"anonymous": false,
"inputs": [{
"indexed": true,
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256"
}
],
"name": "Approval",
"type": "event"
}, {
"anonymous": false,
"inputs": [{
"indexed": true,
"internalType": "address",
"name": "previousOwner",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "newOwner",
"type": "address"
}
],
"name": "OwnershipTransferred",
"type": "event"
},
{
"anonymous": false,
"inputs": [{
"indexed": true,
"internalType": "address",
"name": "from",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "to",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "value",
"type": "uint256"
}
],
"name": "Transfer",
"type": "event"
}, {
"constant": true,
"inputs": [],
"name": "_decimals",
"outputs": [{
"internalType": "uint8",
"name": "",
"type": "uint8"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "_name",
"outputs": [{
"internalType": "string",
"name": "",
"type": "string"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": true,
"inputs": [],
"name": "_symbol",
"outputs": [{
"internalType": "string",
"name": "",
"type": "string"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [{
"internalType": "address",
"name": "owner",
"type": "address"
},
{
"internalType": "address",
"name": "spender",
"type": "address"
}
],
"name": "allowance",
"outputs": [{
"internalType": "uint256",
"name": "",
"type": "uint256"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": false,
"inputs": [{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "approve",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [{
"internalType": "address",
"name": "account",
"type": "address"
}],
"name": "balanceOf",
"outputs": [{
"internalType": "uint256",
"name": "",
"type": "uint256"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": false,
"inputs": [{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}],
"name": "burn",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "decimals",
"outputs": [{
"internalType": "uint8",
"name": "",
"type": "uint8"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": false,
"inputs": [{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "subtractedValue",
"type": "uint256"
}
],
"name": "decreaseAllowance",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "getOwner",
"outputs": [{
"internalType": "address",
"name": "",
"type": "address"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": false,
"inputs": [{
"internalType": "address",
"name": "spender",
"type": "address"
},
{
"internalType": "uint256",
"name": "addedValue",
"type": "uint256"
}
],
"name": "increaseAllowance",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": false,
"inputs": [{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}],
"name": "mint",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "name",
"outputs": [{
"internalType": "string",
"name": "",
"type": "string"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": true,
"inputs": [],
"name": "owner",
"outputs": [{
"internalType": "address",
"name": "",
"type": "address"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": false,
"inputs": [],
"name": "renounceOwnership",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "symbol",
"outputs": [{
"internalType": "string",
"name": "",
"type": "string"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
},
{
"constant": true,
"inputs": [],
"name": "totalSupply",
"outputs": [{
"internalType": "uint256",
"name": "",
"type": "uint256"
}],
"payable": false,
"stateMutability": "view",
"type": "function"
}, {
"constant": false,
"inputs": [{
"internalType": "address",
"name": "recipient",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "transfer",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
}, {
"constant": false,
"inputs": [{
"internalType": "address",
"name": "sender",
"type": "address"
},
{
"internalType": "address",
"name": "recipient",
"type": "address"
},
{
"internalType": "uint256",
"name": "amount",
"type": "uint256"
}
],
"name": "transferFrom",
"outputs": [{
"internalType": "bool",
"name": "",
"type": "bool"
}],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
},
{
"constant": false,
"inputs": [{
"internalType": "address",
"name": "newOwner",
"type": "address"
}],
"name": "transferOwnership",
"outputs": [],
"payable": false,
"stateMutability": "nonpayable",
"type": "function"
}
]
config.yaml
# 节点地址
provider:
bsc: "https://bsc-dataseed.binance.org/"
# 私钥
private_key:
bsc: ""
# 合约地址
contract:
bsc:
usdt: "0x55d398326f99059ff775485246999027b3197955"
# abi文件路径
abi:
usdt:
bsc: "/abi/usdt/bsc_usdt_abi.json"
logging.conf
[loggers]
keys=root
[handlers]
keys=consoleHandler,fileHandler
[formatters]
keys=generic
[logger_root]
level=DEBUG
handlers=consoleHandler,fileHandler
[handler_consoleHandler]
class=StreamHandler
level=DEBUG
formatter=generic
args=(sys.stdout,)
[handler_fileHandler]
class=logging.handlers.TimedRotatingFileHandler
level=DEBUG
formatter=generic
args=('logs/app.log', 'midnight', 1, 30)
[formatter_generic]
format=%(asctime)s - %(levelname)s - %(funcName)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S
EnumConstant.py
from enum import Enum
class BaseResponseEnum(Enum):
def __init__(self, code, msg, data):
self.code = code
self.msg = msg
self.data = data
def __str__(self):
return f"Code: {self.code}, Msg: {self.msg}, Data: {self.data}"
class ErrorResponseEnum(BaseResponseEnum):
BALANCE_INSUFFICIENT = (5501, "The account balance is insufficient.", None)
CHAIN_NOT_EXIST = (5502, "There is no chain type you have selected.", None)
SYSTEM_ABNORMAL = (5503, "The system is abnormal.", None)
RespEntity.py
from constant.EnumConstant import BaseResponseEnum
class RespEntity:
def __init__(self, enum: BaseResponseEnum = None):
if enum:
self.code = enum.code
self.msg = enum.msg
self.data = enum.data
else:
self.code = 0
self.msg = ""
self.data = None
def get_succeed(self, data):
self.code = 200
self.msg = "succeed"
self.data = data
return self
@classmethod
def create_succeed_response(cls, data):
instance = cls()
instance.code = 200
instance.msg = "succeed"
instance.data = data
return instance
FilePathUtil.py
import yaml
import os
class FilePathUtil:
@staticmethod
def get_config_value(*args):
try:
current_dir = FilePathUtil.get_home_directory_path()
if current_dir is None:
raise Exception("Home directory not found")
config_file_path = os.path.join(current_dir, 'config', 'config.yaml')
if not os.path.exists(config_file_path):
raise FileNotFoundError(f"Config file {config_file_path} not found")
# 打开并加载配置文件
with open(config_file_path, "r") as file:
config = yaml.safe_load(file)
if not config:
raise Exception("Config file is empty or not valid YAML")
# 逐层获取配置值
for key in args:
if isinstance(config, dict) and key in config:
config = config[key]
else:
raise KeyError(f"Config key '{key}' not found")
return config
except Exception as e:
print(f"Error while reading config: {e}")
return None
@staticmethod
def get_home_directory_path(*args, max_depth=10):
if len(args) > 0:
path = args[0]
else:
path = 'app.py'
current_dir = os.path.dirname(os.path.abspath(__file__))
depth = 0
while current_dir and depth < max_depth:
if path in os.listdir(current_dir):
return current_dir
current_dir = os.path.dirname(current_dir)
depth += 1
return None
Web3Utils.py
import time
import logging
logger = logging.getLogger()
class Web3Utils:
# Get a transaction receipt and check the number of confirmations
def check_transaction_confirmation(self, tx_hash, required_confirmations=32):
while True:
time.sleep(10)
receipt = self.eth.get_transaction_receipt(tx_hash)
if receipt:
# Gets the height of the current block
block_number = receipt['blockNumber']
# Get the block where the transaction is located
latest_block = self.eth.block_number
# Calculate the number of confirmations
confirmations = latest_block - block_number + 1
logging.info(f"hash:{tx_hash.hex()},Number of confirmations:{confirmations}")
if confirmations >= required_confirmations:
logging.info(f"hash:{tx_hash.hex()},Number of confirmations:{confirmations},"
f"The transaction is confirmed")
return tx_hash.hex()
else:
logging.info(f"The transaction has not yet been packed by the miner, "
f"please wait for the transaction hash:{tx_hash.hex()}")
app.py
import logging
import logging.config
import os
import sys
from fastapi import FastAPI
from pydantic import BaseModel
from TransferStrategyFactory import TransferStrategyFactory
from constant.EnumConstant import ErrorResponseEnum
from entity.RespEntity import RespEntity
from utils.FilePathUtil import FilePathUtil
app = FastAPI()
factory = TransferStrategyFactory()
logger = logging.getLogger()
@app.exception_handler(Exception)
async def global_exception_handler(request, exc):
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return RespEntity(ErrorResponseEnum.SYSTEM_ABNORMAL)
def handle_uncaught_exception(exc_type, exc_value, exc_traceback):
if issubclass(exc_type, KeyboardInterrupt):
sys.__excepthook__(exc_type, exc_value, exc_traceback)
return
logger.error("Uncaught exception", exc_info=(exc_type, exc_value, exc_traceback))
sys.excepthook = handle_uncaught_exception
def setup_logger():
if getattr(sys, 'frozen', False):
path = sys._MEIPASS
else:
path = FilePathUtil.get_home_directory_path()
path = os.path.join(path, 'config', 'logging.conf')
logging.config.fileConfig(path)
setup_logger()
class TransferRequests(BaseModel):
to_address: str = None
amount: float = None
chain_type: str = None
@app.post("/transfer")
async def initiate_transfer(request: TransferRequests):
req = request.model_dump()
return await transfer_handle(req)
async def transfer_handle(req: dict):
to_address = req.get("to_address", False)
amount = req.get("amount", False)
chain_type = req.get("chain_type", False)
strategy = factory.get_strategy(chain_type)
if strategy is None:
return RespEntity(ErrorResponseEnum.CHAIN_NOT_EXIST)
return strategy.transfer(to_address, amount)
requirements.txt
fastapi==0.115.6
pydantic==2.10.4
PyYAML==6.0.2
uvicorn==0.32.1
web3==7.6.1
gunicorn==23.0.0
Strategy.py
import json
import logging
from web3 import Web3
from web3.exceptions import ContractLogicError
from constant.EnumConstant import ErrorResponseEnum
from entity.RespEntity import RespEntity
from utils import Web3Utils, FilePathUtil
logger = logging.getLogger()
fileUtil = FilePathUtil.FilePathUtil
w3Util = Web3Utils.Web3Utils
class BSCStrategy:
def __init__(self, provider, private_key):
self.w3 = Web3(Web3.HTTPProvider(provider))
self.private_key = private_key
def transfer(self, to_address, amount):
path = fileUtil.get_home_directory_path()
path += fileUtil.get_config_value("abi", "usdt", "bsc")
with open(path, 'r') as file:
usdt_abi = json.load(file)
usdt_address = Web3.to_checksum_address(fileUtil.get_config_value("contract", "bsc", "usdt"))
to_address = Web3.to_checksum_address(to_address)
contract = self.w3.eth.contract(address=usdt_address, abi=usdt_abi)
# 获取发送方地址
sender_address = self.w3.eth.account.from_key(self.private_key).address
nonce = self.w3.eth.get_transaction_count(sender_address)
decimals = 6
amount_in_wei = int(amount * (1000 ** decimals))
logging.info(f"Strategy.BSCStrategy.transfer amount:{amount}, amount_in_wei:{amount_in_wei}")
try:
gas_estimate = contract.functions.transfer(to_address, amount_in_wei).estimate_gas({
'from': sender_address,
})
except ContractLogicError as e:
error_message = str(e)
logger.error("Transaction failed: %s", error_message)
return RespEntity(ErrorResponseEnum.BALANCE_INSUFFICIENT)
logging.info(f"Strategy.BSCStrategy.transfer gas: gas_estimate:{gas_estimate}")
# Increase the margin of safety
gas_estimate = int(gas_estimate * 1.1)
logging.info(f"gas_estimate:{gas_estimate}")
# Set Gas Price (optional: get the current default gas price)
# gas_price = self.w3.toWei('20', 'gwei')
# Or use self.w3.eth.gasPrice to get real-time gas prices
gas_price = self.w3.eth.gas_price
# Structuring the deal
transaction = contract.functions.transfer(to_address, amount_in_wei).build_transaction({
'gas': gas_estimate,
'gasPrice': gas_price,
'nonce': nonce,
})
# Signature Transactions
signed_txn = self.w3.eth.account.sign_transaction(transaction, self.private_key)
# Send the transaction and return the transaction hash
tx_hash = self.w3.eth.send_raw_transaction(signed_txn.raw_transaction)
logging.info(
f"Strategy.BSCStrategy.transfer tx_hash:{tx_hash.hex()},The payment address is:{to_address},The amount to be paid is:{amount},gas:{gas_estimate}")
return RespEntity.create_succeed_response(w3Util.check_transaction_confirmation(self.w3, tx_hash))
TransferStrategyFactory.py
from Strategy import BSCStrategy
import logging
from utils.FilePathUtil import FilePathUtil
logger = logging.getLogger()
class TransferStrategyFactory:
def __init__(self):
self.bsc_provider = FilePathUtil.get_config_value("provider", "bsc")
self.bsc_private_key = FilePathUtil.get_config_value("private_key", "bsc")
def get_strategy(self, chain_type):
logging.info(f"chain type: {chain_type}")
match chain_type:
case "bsc":
return BSCStrategy(self.bsc_provider, self.bsc_private_key)
case _:
logging.error(f"Unknown chain type: {chain_type}")
return None
启动命令:
linux:gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app -b 127.0.0.1:9880
windows:uvicorn app:app --host 127.0.0.1 --port 9880 --workers 4