Hummingbot 第12章:高级策略开发

本章介绍 Hummingbot V2 框架的高级特性,包括自定义 Controller 开发、多机器人协作和性能优化。

V2 Controllers 开发

V2 Controller 是 Hummingbot 新一代策略开发的核心抽象,提供了比 V1 策略模板更灵活的开发方式。

Controller 架构

┌──────────────────────────────────────────┐
│              Script 脚本层                 │
│  ├── 初始化和启动 Controller               │
│  └── 管理多个 Controller 的协调            │
├──────────────────────────────────────────┤
│              Controller 层                 │
│  ├── 策略逻辑(何时买卖,价格计算)          │
│  ├── 参数验证和更新                        │
│  └── 数据源管理(K 线、Ticker 等)          │
├──────────────────────────────────────────┤
│              Executor 层                   │
│  ├── 订单生命周期管理                       │
│  ├── 订单状态追踪                          │
│  └── 风险控制                              │
└──────────────────────────────────────────┘

Controller 基类

from hummingbot.strategy_v2.controllers import ControllerBase
from hummingbot.core.data_type.common import OrderType, TradeType
from pydantic import BaseModel, Field
from typing import Optional
from decimal import Decimal


class AdvancedMMConfig(BaseModel):
    """高级做市控制器配置"""
    controller_name: str = "advanced_mm"
    connector_name: str = "binance"
    trading_pair: str = "BTC-USDT"

    # 做市参数
    bid_spread: float = Field(default=0.5, ge=0.1, le=5.0)
    ask_spread: float = Field(default=0.5, ge=0.1, le=5.0)
    order_amount: float = Field(default=0.01, gt=0.0)

    # 动态调整
    dynamic_spread_enabled: bool = True
    volatility_lookback: int = 20  # 波动率回看周期

    # 趋势跟踪
    trend_following_enabled: bool = False
    trend_ema_short: int = 9
    trend_ema_long: int = 21

    # 风控
    max_order_age: int = 300
    inventory_target_pct: float = 50.0
    max_daily_loss: float = 100.0


class AdvancedMMController(ControllerBase):
    """高级做市控制器"""

    def __init__(self, config: AdvancedMMConfig):
        super().__init__(config)
        self.config = config
        self.daily_pnl = 0.0

    async def update_strategy(self, strategy) -> None:
        """更新策略状态(主循环调用)"""
        connector = strategy.connectors[self.config.connector_name]

        # 获取市场数据
        mid_price = connector.get_mid_price(self.config.trading_pair)
        if not mid_price or mid_price <= 0:
            return

        # 获取 K 线数据计算指标
        candles = strategy.get_candles(
            self.config.connector_name,
            self.config.trading_pair
        )

        # 计算动态价差
        bid_spread, ask_spread = self._calculate_dynamic_spreads(candles)

        # 计算订单价格
        bid_price = mid_price * (1 - bid_spread / 100)
        ask_price = mid_price * (1 + ask_spread / 100)

        # 库存管理
        if self.config.inventory_target_pct != 50.0:
            bid_price, ask_price = self._apply_inventory_skew(
                connector, bid_price, ask_price, mid_price
            )

        # 创建订单
        await self._place_orders(strategy, connector, bid_price, ask_price)

    def _calculate_dynamic_spreads(self, candles) -> tuple:
        """计算动态价差"""
        if not self.config.dynamic_spread_enabled or candles is None:
            return self.config.bid_spread, self.config.ask_spread

        # 计算波动率
        closes = candles.close_prices
        if len(closes) < self.config.volatility_lookback:
            return self.config.bid_spread, self.config.ask_spread

        returns = []
        for i in range(1, len(closes)):
            if closes[i-1] > 0:
                returns.append((closes[i] - closes[i-1]) / closes[i-1])

        if not returns:
            return self.config.bid_spread, self.config.ask_spread

        import statistics
        volatility = statistics.stdev(returns) * 100  # 转为百分比

        # 根据波动率调整价差
        vol_factor = 1 + (volatility * 2)
        vol_factor = max(0.5, min(3.0, vol_factor))  # 限制范围

        bid_spread = self.config.bid_spread * vol_factor
        ask_spread = self.config.ask_spread * vol_factor

        return bid_spread, ask_spread

    def _apply_inventory_skew(
        self, connector, bid_price: float, ask_price: float, mid_price: float
    ) -> tuple:
        """应用库存偏斜"""
        balance = connector.get_balance(self.config.trading_pair.split("-")[0])
        total_value = connector.get_total_balance_value()

        if total_value <= 0:
            return bid_price, ask_price

        current_pct = balance / total_value * 100
        target_pct = self.config.inventory_target_pct

        skew_factor = (current_pct - target_pct) / 100

        # 偏离目标越多,调整幅度越大
        bid_adjustment = bid_price * (1 - skew_factor * 0.1)
        ask_adjustment = ask_price * (1 - skew_factor * 0.1)

        return bid_adjustment, ask_adjustment

    async def _place_orders(
        self, strategy, connector, bid_price: float, ask_price: float
    ):
        """下达做市订单"""
        # 取消现有挂单
        await strategy.cancel_all_orders(
            connector, self.config.trading_pair
        )

        # 创建买单
        strategy.place_order(
            connector_name=self.config.connector_name,
            trading_pair=self.config.trading_pair,
            order_type=OrderType.LIMIT,
            side=TradeType.BUY,
            price=bid_price,
            amount=self.config.order_amount
        )

        # 创建卖单
        strategy.place_order(
            connector_name=self.config.connector_name,
            trading_pair=self.config.trading_pair,
            order_type=OrderType.LIMIT,
            side=TradeType.SELL,
            price=ask_price,
            amount=self.config.order_amount
        )

注册并运行自定义 Controller

# advanced_mm_script.py
from hummingbot.strategy_v2.scripts import ScriptBase


class AdvancedMMScript(ScriptBase):
    """使用自定义 Controller 的脚本"""

    def __init__(self, connectors: list, markets: dict):
        super().__init__(connectors, markets)

        # 初始化控制器
        self.controller = AdvancedMMController(
            AdvancedMMConfig(
                connector_name="binance",
                trading_pair="BTC-USDT",
                bid_spread=0.5,
                ask_spread=0.5,
                order_amount=0.01,
                dynamic_spread_enabled=True,
                inventory_target_pct=50.0
            )
        )

    async def on_tick(self):
        """每秒执行策略更新"""
        await self.controller.update_strategy(self)

自定义控制器

技术指标控制器

class MACrossoverController(ControllerBase):
    """移动平均线交叉策略控制器"""

    def __init__(self, config):
        super().__init__(config)
        self.fast_ma = []
        self.slow_ma = []
        self.position = 0  # 1=做多, -1=做空, 0=空仓

    async def update_strategy(self, strategy):
        candles = strategy.get_candles(
            self.config.connector_name,
            self.config.trading_pair
        )
        if not candles or len(candles.close_prices) < self.config.slow_period:
            return

        closes = candles.close_prices
        fast_ma = sum(closes[-self.config.fast_period:]) / self.config.fast_period
        slow_ma = sum(closes[-self.config.slow_period:]) / self.config.slow_period

        price = closes[-1]

        # 金叉信号
        if fast_ma > slow_ma and self.position <= 0:
            self.logger().info(f"金叉信号: FA={fast_ma:.2f} > SA={slow_ma:.2f}")
            # 平空仓,开多仓
            strategy.place_order(
                connector_name=self.config.connector_name,
                trading_pair=self.config.trading_pair,
                order_type=OrderType.MARKET,
                side=TradeType.BUY,
                amount=self.config.order_amount
            )
            self.position = 1

        # 死叉信号
        elif fast_ma < slow_ma and self.position >= 0:
            self.logger().info(f"死叉信号: FA={fast_ma:.2f} < SA={slow_ma:.2f}")
            strategy.place_order(
                connector_name=self.config.connector_name,
                trading_pair=self.config.trading_pair,
                order_type=OrderType.MARKET,
                side=TradeType.SELL,
                amount=self.config.order_amount
            )
            self.position = -1

数据源集成

from hummingbot.data_feed.candles_feed.candles_factory import CandlesFactory


class DataDrivenController(ControllerBase):
    """多数据源控制器"""

    def __init__(self, config):
        super().__init__(config)

        # 初始化多个 K 线数据源
        self.candles_1m = CandlesFactory.get_candles(
            connector=config.connector_name,
            trading_pair=config.trading_pair,
            interval="1m"
        )
        self.candles_1h = CandlesFactory.get_candles(
            connector=config.connector_name,
            trading_pair=config.trading_pair,
            interval="1h"
        )
        self.candles_1d = CandlesFactory.get_candles(
            connector=config.connector_name,
            trading_pair=config.trading_pair,
            interval="1d"
        )

    async def start(self):
        """启动所有数据源"""
        self.candles_1m.start()
        self.candles_1h.start()
        self.candles_1d.start()

    async def stop(self):
        """停止所有数据源"""
        self.candles_1m.stop()
        self.candles_1h.stop()
        self.candles_1d.stop()

多机器人协作策略

主从机器人模式

# master_bot.py - 主机器人:分析和决策
class MasterBot(ScriptBase):
    """主机器人:市场分析+信号分发"""

    def __init__(self, connectors, markets):
        super().__init__(connectors, markets)
        self.slave_bots = [
            "bot-btc-maker",
            "bot-eth-maker",
            "bot-sol-maker"
        ]

    async def analyze_market(self) -> dict:
        """分析整体市场状况"""
        signals = {}
        for pair in ["BTC-USDT", "ETH-USDT", "SOL-USDT"]:
            connector = self.connectors["binance"]
            price = connector.get_mid_price(pair)

            candles = self.get_candles("binance", pair, "1h")
            if candles and len(candles.close_prices) > 20:
                trend = self._calculate_trend(candles.close_prices)
                signals[pair] = {
                    "price": price,
                    "trend": trend,
                    "action": "normal" if abs(trend) < 1 else "reduce"
                }

        return signals

    def _calculate_trend(self, prices: list) -> float:
        """计算趋势强度(%)"""
        if len(prices) < 2:
            return 0.0
        return (prices[-1] - prices[-20]) / prices[-20] * 100

分布式机器人通信

# 通过共享文件或 Redis 进行通信
import json
import redis

class DistributedStrategy:
    """分布式策略协调器"""

    def __init__(self):
        self.redis_client = redis.Redis(
            host="localhost",
            port=6379,
            db=0
        )
        self.channel = "hummingbot:signals"

    def publish_signal(self, pair: str, signal: dict):
        """发布交易信号"""
        message = json.dumps({
            "pair": pair,
            "signal": signal,
            "timestamp": time.time()
        })
        self.redis_client.publish(self.channel, message)

    def subscribe_signals(self):
        """订阅交易信号"""
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe(self.channel)
        return pubsub

策略性能分析

性能指标

指标说明计算方法
收益率总收益百分比(期末净值 - 期初净值) / 期初净值
年化收益率年化后收益(1 + 总收益率) ^ (365 / 天数) - 1
夏普比率风险调整收益(收益率 - 无风险利率) / 波动率
最大回撤最大资金回撤max(峰值 - 谷值) / 峰值
Calmar 比率收益回撤比年化收益率 / 最大回撤

性能优化建议

  • 减少 API 调用:缓存价格数据,避免频繁查询
  • 使用 WebSocket:实时数据使用 WebSocket 而非 REST
  • 批量操作:合并小订单为批量操作
  • 异步处理:确保策略代码使用异步模式
  • 内存管理:定期清理历史数据缓存