Hummingbot 第12章:高级策略开发
本章介绍 Hummingbot V2 框架的高级特性,包括自定义 Controller 开发、多机器人协作和性能优化。
V2 Controllers 开发
V2 Controller 是 Hummingbot 新一代策略开发的核心抽象,提供了比 V1 策略模板更灵活的开发方式。
Controller 架构
┌──────────────────────────────────────────┐
│ Script 脚本层 │
│ ├── 初始化和启动 Controller │
│ └── 管理多个 Controller 的协调 │
├──────────────────────────────────────────┤
│ Controller 层 │
│ ├── 策略逻辑(何时买卖,价格计算) │
│ ├── 参数验证和更新 │
│ └── 数据源管理(K 线、Ticker 等) │
├──────────────────────────────────────────┤
│ Executor 层 │
│ ├── 订单生命周期管理 │
│ ├── 订单状态追踪 │
│ └── 风险控制 │
└──────────────────────────────────────────┘
Controller 基类
from hummingbot.strategy_v2.controllers import ControllerBase
from hummingbot.core.data_type.common import OrderType, TradeType
from pydantic import BaseModel, Field
from typing import Optional
from decimal import Decimal
class AdvancedMMConfig(BaseModel):
"""高级做市控制器配置"""
controller_name: str = "advanced_mm"
connector_name: str = "binance"
trading_pair: str = "BTC-USDT"
# 做市参数
bid_spread: float = Field(default=0.5, ge=0.1, le=5.0)
ask_spread: float = Field(default=0.5, ge=0.1, le=5.0)
order_amount: float = Field(default=0.01, gt=0.0)
# 动态调整
dynamic_spread_enabled: bool = True
volatility_lookback: int = 20 # 波动率回看周期
# 趋势跟踪
trend_following_enabled: bool = False
trend_ema_short: int = 9
trend_ema_long: int = 21
# 风控
max_order_age: int = 300
inventory_target_pct: float = 50.0
max_daily_loss: float = 100.0
class AdvancedMMController(ControllerBase):
"""高级做市控制器"""
def __init__(self, config: AdvancedMMConfig):
super().__init__(config)
self.config = config
self.daily_pnl = 0.0
async def update_strategy(self, strategy) -> None:
"""更新策略状态(主循环调用)"""
connector = strategy.connectors[self.config.connector_name]
# 获取市场数据
mid_price = connector.get_mid_price(self.config.trading_pair)
if not mid_price or mid_price <= 0:
return
# 获取 K 线数据计算指标
candles = strategy.get_candles(
self.config.connector_name,
self.config.trading_pair
)
# 计算动态价差
bid_spread, ask_spread = self._calculate_dynamic_spreads(candles)
# 计算订单价格
bid_price = mid_price * (1 - bid_spread / 100)
ask_price = mid_price * (1 + ask_spread / 100)
# 库存管理
if self.config.inventory_target_pct != 50.0:
bid_price, ask_price = self._apply_inventory_skew(
connector, bid_price, ask_price, mid_price
)
# 创建订单
await self._place_orders(strategy, connector, bid_price, ask_price)
def _calculate_dynamic_spreads(self, candles) -> tuple:
"""计算动态价差"""
if not self.config.dynamic_spread_enabled or candles is None:
return self.config.bid_spread, self.config.ask_spread
# 计算波动率
closes = candles.close_prices
if len(closes) < self.config.volatility_lookback:
return self.config.bid_spread, self.config.ask_spread
returns = []
for i in range(1, len(closes)):
if closes[i-1] > 0:
returns.append((closes[i] - closes[i-1]) / closes[i-1])
if not returns:
return self.config.bid_spread, self.config.ask_spread
import statistics
volatility = statistics.stdev(returns) * 100 # 转为百分比
# 根据波动率调整价差
vol_factor = 1 + (volatility * 2)
vol_factor = max(0.5, min(3.0, vol_factor)) # 限制范围
bid_spread = self.config.bid_spread * vol_factor
ask_spread = self.config.ask_spread * vol_factor
return bid_spread, ask_spread
def _apply_inventory_skew(
self, connector, bid_price: float, ask_price: float, mid_price: float
) -> tuple:
"""应用库存偏斜"""
balance = connector.get_balance(self.config.trading_pair.split("-")[0])
total_value = connector.get_total_balance_value()
if total_value <= 0:
return bid_price, ask_price
current_pct = balance / total_value * 100
target_pct = self.config.inventory_target_pct
skew_factor = (current_pct - target_pct) / 100
# 偏离目标越多,调整幅度越大
bid_adjustment = bid_price * (1 - skew_factor * 0.1)
ask_adjustment = ask_price * (1 - skew_factor * 0.1)
return bid_adjustment, ask_adjustment
async def _place_orders(
self, strategy, connector, bid_price: float, ask_price: float
):
"""下达做市订单"""
# 取消现有挂单
await strategy.cancel_all_orders(
connector, self.config.trading_pair
)
# 创建买单
strategy.place_order(
connector_name=self.config.connector_name,
trading_pair=self.config.trading_pair,
order_type=OrderType.LIMIT,
side=TradeType.BUY,
price=bid_price,
amount=self.config.order_amount
)
# 创建卖单
strategy.place_order(
connector_name=self.config.connector_name,
trading_pair=self.config.trading_pair,
order_type=OrderType.LIMIT,
side=TradeType.SELL,
price=ask_price,
amount=self.config.order_amount
)
注册并运行自定义 Controller
# advanced_mm_script.py
from hummingbot.strategy_v2.scripts import ScriptBase
class AdvancedMMScript(ScriptBase):
"""使用自定义 Controller 的脚本"""
def __init__(self, connectors: list, markets: dict):
super().__init__(connectors, markets)
# 初始化控制器
self.controller = AdvancedMMController(
AdvancedMMConfig(
connector_name="binance",
trading_pair="BTC-USDT",
bid_spread=0.5,
ask_spread=0.5,
order_amount=0.01,
dynamic_spread_enabled=True,
inventory_target_pct=50.0
)
)
async def on_tick(self):
"""每秒执行策略更新"""
await self.controller.update_strategy(self)
自定义控制器
技术指标控制器
class MACrossoverController(ControllerBase):
"""移动平均线交叉策略控制器"""
def __init__(self, config):
super().__init__(config)
self.fast_ma = []
self.slow_ma = []
self.position = 0 # 1=做多, -1=做空, 0=空仓
async def update_strategy(self, strategy):
candles = strategy.get_candles(
self.config.connector_name,
self.config.trading_pair
)
if not candles or len(candles.close_prices) < self.config.slow_period:
return
closes = candles.close_prices
fast_ma = sum(closes[-self.config.fast_period:]) / self.config.fast_period
slow_ma = sum(closes[-self.config.slow_period:]) / self.config.slow_period
price = closes[-1]
# 金叉信号
if fast_ma > slow_ma and self.position <= 0:
self.logger().info(f"金叉信号: FA={fast_ma:.2f} > SA={slow_ma:.2f}")
# 平空仓,开多仓
strategy.place_order(
connector_name=self.config.connector_name,
trading_pair=self.config.trading_pair,
order_type=OrderType.MARKET,
side=TradeType.BUY,
amount=self.config.order_amount
)
self.position = 1
# 死叉信号
elif fast_ma < slow_ma and self.position >= 0:
self.logger().info(f"死叉信号: FA={fast_ma:.2f} < SA={slow_ma:.2f}")
strategy.place_order(
connector_name=self.config.connector_name,
trading_pair=self.config.trading_pair,
order_type=OrderType.MARKET,
side=TradeType.SELL,
amount=self.config.order_amount
)
self.position = -1
数据源集成
from hummingbot.data_feed.candles_feed.candles_factory import CandlesFactory
class DataDrivenController(ControllerBase):
"""多数据源控制器"""
def __init__(self, config):
super().__init__(config)
# 初始化多个 K 线数据源
self.candles_1m = CandlesFactory.get_candles(
connector=config.connector_name,
trading_pair=config.trading_pair,
interval="1m"
)
self.candles_1h = CandlesFactory.get_candles(
connector=config.connector_name,
trading_pair=config.trading_pair,
interval="1h"
)
self.candles_1d = CandlesFactory.get_candles(
connector=config.connector_name,
trading_pair=config.trading_pair,
interval="1d"
)
async def start(self):
"""启动所有数据源"""
self.candles_1m.start()
self.candles_1h.start()
self.candles_1d.start()
async def stop(self):
"""停止所有数据源"""
self.candles_1m.stop()
self.candles_1h.stop()
self.candles_1d.stop()
多机器人协作策略
主从机器人模式
# master_bot.py - 主机器人:分析和决策
class MasterBot(ScriptBase):
"""主机器人:市场分析+信号分发"""
def __init__(self, connectors, markets):
super().__init__(connectors, markets)
self.slave_bots = [
"bot-btc-maker",
"bot-eth-maker",
"bot-sol-maker"
]
async def analyze_market(self) -> dict:
"""分析整体市场状况"""
signals = {}
for pair in ["BTC-USDT", "ETH-USDT", "SOL-USDT"]:
connector = self.connectors["binance"]
price = connector.get_mid_price(pair)
candles = self.get_candles("binance", pair, "1h")
if candles and len(candles.close_prices) > 20:
trend = self._calculate_trend(candles.close_prices)
signals[pair] = {
"price": price,
"trend": trend,
"action": "normal" if abs(trend) < 1 else "reduce"
}
return signals
def _calculate_trend(self, prices: list) -> float:
"""计算趋势强度(%)"""
if len(prices) < 2:
return 0.0
return (prices[-1] - prices[-20]) / prices[-20] * 100
分布式机器人通信
# 通过共享文件或 Redis 进行通信
import json
import redis
class DistributedStrategy:
"""分布式策略协调器"""
def __init__(self):
self.redis_client = redis.Redis(
host="localhost",
port=6379,
db=0
)
self.channel = "hummingbot:signals"
def publish_signal(self, pair: str, signal: dict):
"""发布交易信号"""
message = json.dumps({
"pair": pair,
"signal": signal,
"timestamp": time.time()
})
self.redis_client.publish(self.channel, message)
def subscribe_signals(self):
"""订阅交易信号"""
pubsub = self.redis_client.pubsub()
pubsub.subscribe(self.channel)
return pubsub
策略性能分析
性能指标
| 指标 | 说明 | 计算方法 |
|---|---|---|
| 收益率 | 总收益百分比 | (期末净值 - 期初净值) / 期初净值 |
| 年化收益率 | 年化后收益 | (1 + 总收益率) ^ (365 / 天数) - 1 |
| 夏普比率 | 风险调整收益 | (收益率 - 无风险利率) / 波动率 |
| 最大回撤 | 最大资金回撤 | max(峰值 - 谷值) / 峰值 |
| Calmar 比率 | 收益回撤比 | 年化收益率 / 最大回撤 |
性能优化建议
- 减少 API 调用:缓存价格数据,避免频繁查询
- 使用 WebSocket:实时数据使用 WebSocket 而非 REST
- 批量操作:合并小订单为批量操作
- 异步处理:确保策略代码使用异步模式
- 内存管理:定期清理历史数据缓存