Hummingbot 第5章:脚本开发入门
Script 模式是 Hummingbot V2 的核心功能之一,允许用户使用 Python 编写自定义交易逻辑,比 V1 策略模板更加灵活。
脚本模式概述
什么是 Script 模式
Script 模式让开发者用 Python 直接控制交易机器人的行为。与 V1 策略配置文件不同,Script 可以:
- 完全自定义逻辑:不受策略模板限制
- 实时数据处理:直接访问市场数据和 K 线
- 复杂决策:实现机器学习、统计分析等高级逻辑
- 多数据源:同时使用多个交易所的数据
Script 与传统策略对比
| 特性 | V1 策略模板 | V2 Script |
|---|---|---|
| 配置方式 | YAML 文件 | Python 代码 |
| 灵活性 | 固定参数调整 | 完全可编程 |
| 学习曲线 | 低 | 中 |
| 数据处理 | 有限 | 完全控制 |
| 实时干预 | 有限 | 完全控制 |
| 扩展性 | 低 | 高 |
Script 基础结构
最小 Script 模板
from hummingbot.strategy_v2.scripts import ScriptBase
from hummingbot.core.data_type.common import OrderType, TradeType
class SimpleScript(ScriptBase):
"""最简单的交易脚本"""
def __init__(self, connectors: list, markets: dict):
super().__init__(connectors, markets)
self.counter = 0
async def on_interval(self):
"""定时执行策略逻辑(默认每秒触发一次)"""
self.counter += 1
if self.counter % 60 == 0:
self.logger().info(f"Script running... counter: {self.counter}")
async def on_tick(self):
"""每个 tick 触发一次"""
pass
Script 核心方法
| 方法 | 触发时机 | 说明 |
|---|---|---|
on_tick() | 每秒 | 高频执行,适合实时监控 |
on_interval() | 可配置 | 定时执行,适合策略逻辑 |
on_events() | 事件发生时 | 处理交易所推送的事件 |
on_start() | 启动时 | 初始化资源 |
on_stop() | 停止时 | 清理资源 |
SimpleMM 脚本示例
以下是一个完整的纯做市脚本示例:
"""
simple_mm.py - 简单做市脚本
在指定交易对上进行双向报价做市。
"""
import statistics
from decimal import Decimal
from hummingbot.strategy_v2.scripts import ScriptBase
from hummingbot.core.data_type.common import OrderType, TradeType
from hummingbot.core.data_type.order_candidate import OrderCandidate
from hummingbot.connector.connector_base import ConnectorBase
class SimpleMarketMaker(ScriptBase):
"""简单做市脚本"""
def __init__(self, connectors: list, markets: dict):
super().__init__(connectors, markets)
# 策略参数
self.exchange = "binance"
self.trading_pair = "BTC-USDT"
self.bid_spread = Decimal("0.005") # 0.5%
self.ask_spread = Decimal("0.005") # 0.5%
self.order_amount = Decimal("0.01")
self.min_spread = Decimal("0.001")
# 状态变量
self.active_orders = []
self.last_price = Decimal("0")
async def on_start(self):
"""脚本启动时的初始化"""
self.logger().info(f"Starting Simple Market Maker on {self.exchange}:{self.trading_pair}")
self.logger().info(f"Bid Spread: {self.bid_spread * 100}%")
self.logger().info(f"Ask Spread: {self.ask_spread * 100}%")
async def on_tick(self):
"""每秒执行一次"""
try:
# 获取当前价格
connector = self.connectors[self.exchange]
mid_price = connector.get_mid_price(self.trading_pair)
if mid_price <= 0:
return
self.last_price = Decimal(str(mid_price))
# 计算订单价格
bid_price = self.last_price * (Decimal("1") - self.bid_spread)
ask_price = self.last_price * (Decimal("1") + self.ask_spread)
# 创建订单候选
buy_candidate = OrderCandidate(
trading_pair=self.trading_pair,
is_maker=True,
order_type=OrderType.LIMIT,
order_side=TradeType.BUY,
price=bid_price,
amount=self.order_amount
)
sell_candidate = OrderCandidate(
trading_pair=self.trading_pair,
is_maker=True,
order_type=OrderType.LIMIT,
order_side=TradeType.SELL,
price=ask_price,
amount=self.order_amount
)
# 验证并调整订单(检查余额等)
candidates = await connector.budget_checker.check_candidates(
self.trading_pair,
[buy_candidate, sell_candidate]
)
# 取消现有订单并下达新订单
await self.cancel_all_orders(connector, self.trading_pair)
for candidate in candidates:
if candidate.amount > 0:
await connector.place_order(
order_candidate=candidate
)
self.logger().info(
f"Placed {candidate.order_side.name} order: "
f"{candidate.amount} @ {candidate.price}"
)
except Exception as e:
self.logger().error(f"Error in on_tick: {str(e)}")
async def cancel_all_orders(self, connector: ConnectorBase, trading_pair: str):
"""取消指定交易对的所有活跃订单"""
orders = await connector.get_orders(trading_pair)
for order in orders:
if order.is_open:
await connector.cancel(trading_pair, order.client_order_id)
async def on_stop(self):
"""脚本停止时执行"""
self.logger().info("Stopping Simple Market Maker...")
await self.cancel_all_orders(
self.connectors[self.exchange],
self.trading_pair
)
自定义脚本开发
使用 K 线数据
from hummingbot.data_feed.candles_feed.candles_factory import CandlesFactory
class CandleBasedScript(ScriptBase):
"""基于 K 线数据的脚本"""
def __init__(self, connectors: list, markets: dict):
super().__init__(connectors, markets)
# 初始化 K 线数据源
self.candles = CandlesFactory.get_candles(
connector="binance",
trading_pair="BTC-USDT",
interval="1m", # 1 分钟 K 线
max_records=100
)
async def on_start(self):
"""启动 K 线数据源"""
self.candles.start()
async def on_tick(self):
"""使用 K 线数据做决策"""
if not self.candles.ready:
return
# 获取收盘价序列
closes = self.candles.close_prices
# 计算简单移动平均线
sma_short = sum(closes[-5:]) / 5
sma_long = sum(closes[-20:]) / 20
# 金叉/死叉策略
if sma_short > sma_long:
self.logger().info("Golden cross detected - bullish signal")
elif sma_short < sma_long:
self.logger().info("Death cross detected - bearish signal")
使用多个交易所
class MultiExchangeScript(ScriptBase):
"""多交易所套利脚本"""
def __init__(self, connectors: list, markets: dict):
super().__init__(connectors, markets)
self.exchange_a = "binance"
self.exchange_b = "okx"
self.trading_pair = "BTC-USDT"
async def on_tick(self):
"""检查跨交易所价差"""
conn_a = self.connectors[self.exchange_a]
conn_b = self.connectors[self.exchange_b]
price_a = conn_a.get_mid_price(self.trading_pair)
price_b = conn_b.get_mid_price(self.trading_pair)
if price_a > 0 and price_b > 0:
spread = abs(price_a - price_b) / min(price_a, price_b) * 100
self.logger().info(
f"Price A: {price_a:.2f} | Price B: {price_b:.2f} | Spread: {spread:.3f}%"
)
# 当价差大于阈值时触发套利
if spread > 0.5:
self.logger().info("Arbitrage opportunity detected!")
脚本调试
日志输出
# 使用内置日志器
self.logger().info("信息日志")
self.logger().warning("警告日志")
self.logger().error("错误日志")
# 配置日志级别
# conf_client.yml 中设置 log_level: DEBUG
运行脚本
# 将脚本放入 scripts 目录
cp simple_mm.py ~/hummingbot/scripts/
# 在 Hummingbot 中运行
>>> script simple_mm.py
# 停止脚本
>>> stop
常见调试技巧
- 逐步增加复杂度:先跑通基础框架,再添加复杂逻辑
- 使用模拟数据:在脚本中模拟市场数据进行测试
- 充分日志:关键决策点都加上日志输出
- 异常捕获:使用
try/except包裹核心逻辑,避免脚本崩溃