数据提供与指标

本章详细介绍 Freqtrade 的 DataProvider 接口,涵盖历史数据获取、TA-Lib 指标计算、自定义指标开发以及数据缓存机制,帮助读者高效获取和处理市场数据。

DataProvider 接口

DataProvider 概述

DataProvider 是 Freqtrade 中用于获取市场数据的核心接口,在策略中通过 self.dp 访问。它提供了统一的数据访问层,屏蔽了不同交易所的数据格式差异。

from freqtrade.strategy import IStrategy
from pandas import DataFrame

class DataAwareStrategy(IStrategy):
    timeframe = "5m"

    def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
        dp = self.dp

        # 检查 DataProvider 是否可用
        if not dp:
            return dataframe

        # ... 数据获取逻辑
        return dataframe

核心方法速查

方法返回值说明
get_pair_dataframe(pair, timeframe)DataFrame / None获取指定交易对的 K 线数据
get_analyzed_dataframe(pair, timeframe)(DataFrame, datetime)获取分析后的数据(含指标)
available_pairslist已下载数据的交易对列表
current_whitelist()list当前白名单交易对列表
get_pair_list()list当前 Pairlist 结果
ticker(pair)dict / None获取实时 Ticker
orderbook(pair, limit)dict / None获取订单簿深度
market(pair)dict / None获取交易对元信息
get_signal(pair, timeframe, signal)bool / None获取其他策略的信号
get_producer_pairs(producer_name)list获取生产者实例的交易对
get_producer_df(producer_name, pair, timeframe)DataFrame获取生产者实例的分析数据

获取历史数据

get_pair_dataframe 详解

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    pair = metadata["pair"]

    # 获取当前交易对的不同时间周期数据
    df_5m = self.dp.get_pair_dataframe(pair, "5m")
    df_1h = self.dp.get_pair_dataframe(pair, "1h")
    df_4h = self.dp.get_pair_dataframe(pair, "4h")

    # 获取其他交易对的数据用于计算相关性
    btc_data = self.dp.get_pair_dataframe("BTC/USDT", "1h")
    eth_data = self.dp.get_pair_dataframe("ETH/USDT", "1h")

    if df_1h is not None and len(df_1h) > 0:
        # 将 1h 数据合并到当前周期
        dataframe["rsi_1h"] = self._calc_rsi(df_1h).reindex(dataframe.index, method="ffill")

    return dataframe

多时间帧数据合并

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    pair = metadata["pair"]

    # 获取更高时间周期的数据
    for tf in ["15m", "1h", "4h"]:
        higher_tf_data = self.dp.get_pair_dataframe(pair, tf)
        if higher_tf_data is not None:
            # 计算高周期 RSI
            higher_tf_data["rsi"] = ta.RSI(higher_tf_data, timeperiod=14)

            # 使用 ffill 将高周期数据对齐到当前周期
            resampled_rsi = higher_tf_data["rsi"].reindex(dataframe.index, method="ffill")
            dataframe[f"rsi_{tf}"] = resampled_rsi

    return dataframe

获取分析后的数据

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # 获取已经过 populate_indicators 处理的数据
    analyzed_df, last_analyzed = self.dp.get_analyzed_dataframe(
        metadata["pair"], self.timeframe
    )

    if analyzed_df is not None:
        # 获取上一根 K 线的指标值
        previous_rsi = analyzed_df["rsi"].iloc[-2] if len(analyzed_df) > 1 else None
        if previous_rsi is not None:
            self.logger.info(f"上一根 K 线的 RSI: {previous_rsi:.2f}")

    return dataframe

数据获取的常见问题

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    pair = metadata["pair"]

    # 安全获取数据:必须检查 None
    ext_data = self.dp.get_pair_dataframe("NONEXISTENT/USDT", "5m")
    if ext_data is None:
        # 数据不可用时,不进行处理
        self.logger.warning(f"无法获取外部数据,跳过跨交易对指标")
        return dataframe

    # 安全的数据长度检查
    if len(ext_data) < 50:
        self.logger.warning(f"外部数据不足,需要至少 50 条")
        return dataframe

    return dataframe

TA-Lib 指标计算

安装 TA-Lib

# 通过 pip 安装
pip install TA-Lib

# 如果在 Linux 上安装失败,先安装系统依赖
sudo apt-get install libta-lib0 libta-lib-dev
# 或使用 freqtrade 内置版本
pip install freqtrade[ta-lib]

趋势指标

import talib.abstract as ta

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # 移动平均线
    dataframe["sma_10"] = ta.SMA(dataframe, timeperiod=10)
    dataframe["sma_20"] = ta.SMA(dataframe, timeperiod=20)
    dataframe["sma_50"] = ta.SMA(dataframe, timeperiod=50)
    dataframe["sma_200"] = ta.SMA(dataframe, timeperiod=200)

    dataframe["ema_12"] = ta.EMA(dataframe, timeperiod=12)
    dataframe["ema_26"] = ta.EMA(dataframe, timeperiod=26)

    # MACD
    macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
    dataframe["macd"] = macd["macd"]
    dataframe["macd_signal"] = macd["macdsignal"]
    dataframe["macd_hist"] = macd["macdhist"]

    # ADX(平均趋向指数)
    dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)

    # 抛物线 SAR
    dataframe["sar"] = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)

    return dataframe

动量指标

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # RSI
    dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
    dataframe["rsi_6"] = ta.RSI(dataframe, timeperiod=6)  # 短期 RSI

    # 随机指标
    stoch = ta.STOCH(dataframe, fastk_period=14, slowk_period=3, slowd_period=3)
    dataframe["stoch_k"] = stoch["slowk"]
    dataframe["stoch_d"] = stoch["slowd"]

    # Williams %R
    dataframe["williams_r"] = ta.WILLR(dataframe, timeperiod=14)

    # CCI(商品通道指数)
    dataframe["cci"] = ta.CCI(dataframe, timeperiod=20)

    # 动量
    dataframe["mom"] = ta.MOM(dataframe, timeperiod=10)

    # ROC(变化率)
    dataframe["roc"] = ta.ROC(dataframe, timeperiod=10)

    # Ultimate Oscillator
    dataframe["ultosc"] = ta.ULTOSC(dataframe)

    return dataframe

波动性指标

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # 布林带
    bb = ta.BBANDS(dataframe, timeperiod=20, nbdevup=2, nbdevdn=2)
    dataframe["bb_upper"] = bb["upperband"]
    dataframe["bb_middle"] = bb["middleband"]
    dataframe["bb_lower"] = bb["lowerband"]
    dataframe["bb_width"] = bb["upperband"] - bb["lowerband"]
    dataframe["bb_percent"] = (dataframe["close"] - bb["lowerband"]) / (bb["upperband"] - bb["lowerband"])

    # ATR(平均真实波幅)
    dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
    dataframe["natr"] = ta.NATR(dataframe, timeperiod=14)

    # 肯特纳通道
    dataframe["kc_middle"] = ta.EMA(dataframe, timeperiod=20)
    kc_atr = ta.ATR(dataframe, timeperiod=10)
    dataframe["kc_upper"] = dataframe["kc_middle"] + kc_atr * 2
    dataframe["kc_lower"] = dataframe["kc_middle"] - kc_atr * 2

    return dataframe

成交量指标

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # OBV(能量潮)
    dataframe["obv"] = ta.OBV(dataframe)

    # 成交量加权平均价
    dataframe["vwap"] = ta.VWAP(dataframe) if hasattr(ta, 'VWAP') else self._calc_vwap(dataframe)

    # 成交量的移动平均
    dataframe["volume_sma_10"] = ta.SMA(dataframe["volume"], timeperiod=10)
    dataframe["volume_sma_20"] = ta.SMA(dataframe["volume"], timeperiod=20)

    # 成交量比率
    dataframe["volume_ratio"] = dataframe["volume"] / dataframe["volume_sma_20"]

    # MFI(资金流量指数)
    dataframe["mfi"] = ta.MFI(dataframe, timeperiod=14)

    # AD(累积/派发线)
    dataframe["ad"] = ta.AD(dataframe)

    return dataframe

指标速查表

类别指标TA-Lib 函数常用参数
趋势SMAta.SMAtimeperiod=20
趋势EMAta.EMAtimeperiod=12
趋势MACDta.MACD12, 26, 9
趋势ADXta.ADXtimeperiod=14
趋势SARta.SAR0.02, 0.2
动量RSIta.RSItimeperiod=14
动量STOCHta.STOCH14, 3, 3
动量CCIta.CCItimeperiod=20
动量MFIta.MFItimeperiod=14
波动BBANDSta.BBANDS20, 2, 2
波动ATRta.ATRtimeperiod=14
成交量OBVta.OBV-
成交量ADta.AD-

自定义指标

自定义计算函数

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # 调用自定义指标函数
    dataframe = self._add_custom_indicators(dataframe, metadata)
    return dataframe

def _add_custom_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    """自定义指标集合"""

    # 1. 价格通道突破
    dataframe["highest_20"] = dataframe["high"].rolling(20).max()
    dataframe["lowest_20"] = dataframe["low"].rolling(20).min()
    dataframe["channel_pos"] = (
        (dataframe["close"] - dataframe["lowest_20"]) /
        (dataframe["highest_20"] - dataframe["lowest_20"])
    )

    # 2. 加权移动平均
    weights = [1, 2, 3, 4, 5]
    dataframe["wma_5"] = (
        dataframe["close"].rolling(5).apply(
            lambda x: sum(w * v for w, v in zip(weights, x)) / sum(weights)
        )
    )

    # 3. 自定义 RSI 背离检测
    dataframe["rsi_14"] = ta.RSI(dataframe, timeperiod=14)
    dataframe["price_ll"] = dataframe["close"].rolling(14).min()
    dataframe["rsi_ll"] = dataframe["rsi_14"].rolling(14).min()
    dataframe["bullish_div"] = (
        (dataframe["close"] <= dataframe["price_ll"].shift(1)) &
        (dataframe["rsi_14"] > dataframe["rsi_ll"].shift(1))
    ).astype(int)

    # 4. 动量的滚动统计
    returns = dataframe["close"].pct_change()
    dataframe["return_skew"] = returns.rolling(20).skew()
    dataframe["return_kurt"] = returns.rolling(20).kurt()
    dataframe["return_std"] = returns.rolling(20).std()

    # 5. 累积/派发指标
    clv = ((dataframe["close"] - dataframe["low"]) - (dataframe["high"] - dataframe["close"])) / (dataframe["high"] - dataframe["low"])
    clv = clv.fillna(0)
    dataframe["ad_custom"] = clv * dataframe["volume"]
    dataframe["ad_custom_ma"] = dataframe["ad_custom"].rolling(20).mean()

    return dataframe

使用 Pandas 进行特征工程

def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
    # 滚动窗口统计
    for window in [5, 10, 20, 50]:
        # 价格统计
        dataframe[f"close_mean_{window}"] = dataframe["close"].rolling(window).mean()
        dataframe[f"close_std_{window}"] = dataframe["close"].rolling(window).std()
        dataframe[f"close_max_{window}"] = dataframe["close"].rolling(window).max()
        dataframe[f"close_min_{window}"] = dataframe["close"].rolling(window).min()

        # 成交量统计
        dataframe[f"volume_mean_{window}"] = dataframe["volume"].rolling(window).mean()
        dataframe[f"volume_std_{window}"] = dataframe["volume"].rolling(window).std()

    # 价格位置百分比
    for window in [10, 20, 50]:
        rolling_min = dataframe["close"].rolling(window).min()
        rolling_max = dataframe["close"].rolling(window).max()
        dataframe[f"close_position_{window}"] = (
            (dataframe["close"] - rolling_min) / (rolling_max - rolling_min)
        )

    # 指数加权移动平均
    dataframe["ewm_12"] = dataframe["close"].ewm(span=12, adjust=False).mean()
    dataframe["ewm_26"] = dataframe["close"].ewm(span=26, adjust=False).mean()

    return dataframe

数据缓存

缓存工作机制

Freqtrade 使用本地文件系统缓存已下载的 K 线数据,避免每次启动都重复下载:

# 数据存储位置
ls user_data/data/
# 输出: binance/  okx/  bybit/

# 查看缓存的数据文件
ls user_data/data/binance/
# 输出: BTC_USDT-1h.json  BTC_USDT-5m.json  ETH_USDT-1h.json

手动下载数据

# 下载单个交易对的数据
freqtrade download-data \
    --exchange binance \
    --pairs BTC/USDT \
    --days 100 \
    --timeframe 5m

# 下载多个交易对
freqtrade download-data \
    --exchange binance \
    --pairs BTC/USDT ETH/USDT SOL/USDT \
    --days 365 \
    --timeframes 5m 15m 1h 4h 1d

# 下载所有 USDT 交易对
freqtrade download-data \
    --exchange binance \
    --pairs .*/USDT \
    --days 30 \
    --timeframe 1h

# 下载交易对列表文件中的交易对
freqtrade download-data \
    --exchange binance \
    --pairs-file user_data/pairs.json \
    --days 100

缓存清除与更新

# 清除所有缓存数据
rm -rf user_data/data/*

# 使用 --erase 参数强制重新下载
freqtrade download-data --exchange binance --pairs BTC/USDT --days 100 --erase

# 使用计时范围下载
freqtrade download-data \
    --exchange binance \
    --pairs BTC/USDT \
    --timerange 20240101-20240601

数据验证

# 查看已下载数据的状态
freqtrade list-data --exchange binance

# 列出所有交易对
freqtrade list-pairs --exchange binance --all

# 验证数据完整性
freqtrade backtesting --strategy MyStrategy --cache none