Hummingbot 第5章:脚本开发入门

Script 模式是 Hummingbot V2 的核心功能之一,允许用户使用 Python 编写自定义交易逻辑,比 V1 策略模板更加灵活。

脚本模式概述

什么是 Script 模式

Script 模式让开发者用 Python 直接控制交易机器人的行为。与 V1 策略配置文件不同,Script 可以:

  • 完全自定义逻辑:不受策略模板限制
  • 实时数据处理:直接访问市场数据和 K 线
  • 复杂决策:实现机器学习、统计分析等高级逻辑
  • 多数据源:同时使用多个交易所的数据

Script 与传统策略对比

特性V1 策略模板V2 Script
配置方式YAML 文件Python 代码
灵活性固定参数调整完全可编程
学习曲线
数据处理有限完全控制
实时干预有限完全控制
扩展性

Script 基础结构

最小 Script 模板

from hummingbot.strategy_v2.scripts import ScriptBase
from hummingbot.core.data_type.common import OrderType, TradeType


class SimpleScript(ScriptBase):
    """最简单的交易脚本"""

    def __init__(self, connectors: list, markets: dict):
        super().__init__(connectors, markets)
        self.counter = 0

    async def on_interval(self):
        """定时执行策略逻辑(默认每秒触发一次)"""
        self.counter += 1
        if self.counter % 60 == 0:
            self.logger().info(f"Script running... counter: {self.counter}")

    async def on_tick(self):
        """每个 tick 触发一次"""
        pass

Script 核心方法

方法触发时机说明
on_tick()每秒高频执行,适合实时监控
on_interval()可配置定时执行,适合策略逻辑
on_events()事件发生时处理交易所推送的事件
on_start()启动时初始化资源
on_stop()停止时清理资源

SimpleMM 脚本示例

以下是一个完整的纯做市脚本示例:

"""
simple_mm.py - 简单做市脚本

在指定交易对上进行双向报价做市。
"""
import statistics
from decimal import Decimal
from hummingbot.strategy_v2.scripts import ScriptBase
from hummingbot.core.data_type.common import OrderType, TradeType
from hummingbot.core.data_type.order_candidate import OrderCandidate
from hummingbot.connector.connector_base import ConnectorBase


class SimpleMarketMaker(ScriptBase):
    """简单做市脚本"""

    def __init__(self, connectors: list, markets: dict):
        super().__init__(connectors, markets)

        # 策略参数
        self.exchange = "binance"
        self.trading_pair = "BTC-USDT"
        self.bid_spread = Decimal("0.005")  # 0.5%
        self.ask_spread = Decimal("0.005")  # 0.5%
        self.order_amount = Decimal("0.01")
        self.min_spread = Decimal("0.001")

        # 状态变量
        self.active_orders = []
        self.last_price = Decimal("0")

    async def on_start(self):
        """脚本启动时的初始化"""
        self.logger().info(f"Starting Simple Market Maker on {self.exchange}:{self.trading_pair}")
        self.logger().info(f"Bid Spread: {self.bid_spread * 100}%")
        self.logger().info(f"Ask Spread: {self.ask_spread * 100}%")

    async def on_tick(self):
        """每秒执行一次"""
        try:
            # 获取当前价格
            connector = self.connectors[self.exchange]
            mid_price = connector.get_mid_price(self.trading_pair)

            if mid_price <= 0:
                return

            self.last_price = Decimal(str(mid_price))

            # 计算订单价格
            bid_price = self.last_price * (Decimal("1") - self.bid_spread)
            ask_price = self.last_price * (Decimal("1") + self.ask_spread)

            # 创建订单候选
            buy_candidate = OrderCandidate(
                trading_pair=self.trading_pair,
                is_maker=True,
                order_type=OrderType.LIMIT,
                order_side=TradeType.BUY,
                price=bid_price,
                amount=self.order_amount
            )

            sell_candidate = OrderCandidate(
                trading_pair=self.trading_pair,
                is_maker=True,
                order_type=OrderType.LIMIT,
                order_side=TradeType.SELL,
                price=ask_price,
                amount=self.order_amount
            )

            # 验证并调整订单(检查余额等)
            candidates = await connector.budget_checker.check_candidates(
                self.trading_pair,
                [buy_candidate, sell_candidate]
            )

            # 取消现有订单并下达新订单
            await self.cancel_all_orders(connector, self.trading_pair)
            for candidate in candidates:
                if candidate.amount > 0:
                    await connector.place_order(
                        order_candidate=candidate
                    )
                    self.logger().info(
                        f"Placed {candidate.order_side.name} order: "
                        f"{candidate.amount} @ {candidate.price}"
                    )

        except Exception as e:
            self.logger().error(f"Error in on_tick: {str(e)}")

    async def cancel_all_orders(self, connector: ConnectorBase, trading_pair: str):
        """取消指定交易对的所有活跃订单"""
        orders = await connector.get_orders(trading_pair)
        for order in orders:
            if order.is_open:
                await connector.cancel(trading_pair, order.client_order_id)

    async def on_stop(self):
        """脚本停止时执行"""
        self.logger().info("Stopping Simple Market Maker...")
        await self.cancel_all_orders(
            self.connectors[self.exchange],
            self.trading_pair
        )

自定义脚本开发

使用 K 线数据

from hummingbot.data_feed.candles_feed.candles_factory import CandlesFactory


class CandleBasedScript(ScriptBase):
    """基于 K 线数据的脚本"""

    def __init__(self, connectors: list, markets: dict):
        super().__init__(connectors, markets)

        # 初始化 K 线数据源
        self.candles = CandlesFactory.get_candles(
            connector="binance",
            trading_pair="BTC-USDT",
            interval="1m",  # 1 分钟 K 线
            max_records=100
        )

    async def on_start(self):
        """启动 K 线数据源"""
        self.candles.start()

    async def on_tick(self):
        """使用 K 线数据做决策"""
        if not self.candles.ready:
            return

        # 获取收盘价序列
        closes = self.candles.close_prices

        # 计算简单移动平均线
        sma_short = sum(closes[-5:]) / 5
        sma_long = sum(closes[-20:]) / 20

        # 金叉/死叉策略
        if sma_short > sma_long:
            self.logger().info("Golden cross detected - bullish signal")
        elif sma_short < sma_long:
            self.logger().info("Death cross detected - bearish signal")

使用多个交易所

class MultiExchangeScript(ScriptBase):
    """多交易所套利脚本"""

    def __init__(self, connectors: list, markets: dict):
        super().__init__(connectors, markets)
        self.exchange_a = "binance"
        self.exchange_b = "okx"
        self.trading_pair = "BTC-USDT"

    async def on_tick(self):
        """检查跨交易所价差"""
        conn_a = self.connectors[self.exchange_a]
        conn_b = self.connectors[self.exchange_b]

        price_a = conn_a.get_mid_price(self.trading_pair)
        price_b = conn_b.get_mid_price(self.trading_pair)

        if price_a > 0 and price_b > 0:
            spread = abs(price_a - price_b) / min(price_a, price_b) * 100
            self.logger().info(
                f"Price A: {price_a:.2f} | Price B: {price_b:.2f} | Spread: {spread:.3f}%"
            )

            # 当价差大于阈值时触发套利
            if spread > 0.5:
                self.logger().info("Arbitrage opportunity detected!")

脚本调试

日志输出

# 使用内置日志器
self.logger().info("信息日志")
self.logger().warning("警告日志")
self.logger().error("错误日志")

# 配置日志级别
# conf_client.yml 中设置 log_level: DEBUG

运行脚本

# 将脚本放入 scripts 目录
cp simple_mm.py ~/hummingbot/scripts/

# 在 Hummingbot 中运行
>>> script simple_mm.py

# 停止脚本
>>> stop

常见调试技巧

  • 逐步增加复杂度:先跑通基础框架,再添加复杂逻辑
  • 使用模拟数据:在脚本中模拟市场数据进行测试
  • 充分日志:关键决策点都加上日志输出
  • 异常捕获:使用 try/except 包裹核心逻辑,避免脚本崩溃