数据提供与指标
本章详细介绍 Freqtrade 的 DataProvider 接口,涵盖历史数据获取、TA-Lib 指标计算、自定义指标开发以及数据缓存机制,帮助读者高效获取和处理市场数据。
DataProvider 接口
DataProvider 概述
DataProvider 是 Freqtrade 中用于获取市场数据的核心接口,在策略中通过 self.dp 访问。它提供了统一的数据访问层,屏蔽了不同交易所的数据格式差异。
from freqtrade.strategy import IStrategy
from pandas import DataFrame
class DataAwareStrategy(IStrategy):
timeframe = "5m"
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
dp = self.dp
# 检查 DataProvider 是否可用
if not dp:
return dataframe
# ... 数据获取逻辑
return dataframe
核心方法速查
| 方法 | 返回值 | 说明 |
|---|---|---|
get_pair_dataframe(pair, timeframe) | DataFrame / None | 获取指定交易对的 K 线数据 |
get_analyzed_dataframe(pair, timeframe) | (DataFrame, datetime) | 获取分析后的数据(含指标) |
available_pairs | list | 已下载数据的交易对列表 |
current_whitelist() | list | 当前白名单交易对列表 |
get_pair_list() | list | 当前 Pairlist 结果 |
ticker(pair) | dict / None | 获取实时 Ticker |
orderbook(pair, limit) | dict / None | 获取订单簿深度 |
market(pair) | dict / None | 获取交易对元信息 |
get_signal(pair, timeframe, signal) | bool / None | 获取其他策略的信号 |
get_producer_pairs(producer_name) | list | 获取生产者实例的交易对 |
get_producer_df(producer_name, pair, timeframe) | DataFrame | 获取生产者实例的分析数据 |
获取历史数据
get_pair_dataframe 详解
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
# 获取当前交易对的不同时间周期数据
df_5m = self.dp.get_pair_dataframe(pair, "5m")
df_1h = self.dp.get_pair_dataframe(pair, "1h")
df_4h = self.dp.get_pair_dataframe(pair, "4h")
# 获取其他交易对的数据用于计算相关性
btc_data = self.dp.get_pair_dataframe("BTC/USDT", "1h")
eth_data = self.dp.get_pair_dataframe("ETH/USDT", "1h")
if df_1h is not None and len(df_1h) > 0:
# 将 1h 数据合并到当前周期
dataframe["rsi_1h"] = self._calc_rsi(df_1h).reindex(dataframe.index, method="ffill")
return dataframe
多时间帧数据合并
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
# 获取更高时间周期的数据
for tf in ["15m", "1h", "4h"]:
higher_tf_data = self.dp.get_pair_dataframe(pair, tf)
if higher_tf_data is not None:
# 计算高周期 RSI
higher_tf_data["rsi"] = ta.RSI(higher_tf_data, timeperiod=14)
# 使用 ffill 将高周期数据对齐到当前周期
resampled_rsi = higher_tf_data["rsi"].reindex(dataframe.index, method="ffill")
dataframe[f"rsi_{tf}"] = resampled_rsi
return dataframe
获取分析后的数据
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 获取已经过 populate_indicators 处理的数据
analyzed_df, last_analyzed = self.dp.get_analyzed_dataframe(
metadata["pair"], self.timeframe
)
if analyzed_df is not None:
# 获取上一根 K 线的指标值
previous_rsi = analyzed_df["rsi"].iloc[-2] if len(analyzed_df) > 1 else None
if previous_rsi is not None:
self.logger.info(f"上一根 K 线的 RSI: {previous_rsi:.2f}")
return dataframe
数据获取的常见问题
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
pair = metadata["pair"]
# 安全获取数据:必须检查 None
ext_data = self.dp.get_pair_dataframe("NONEXISTENT/USDT", "5m")
if ext_data is None:
# 数据不可用时,不进行处理
self.logger.warning(f"无法获取外部数据,跳过跨交易对指标")
return dataframe
# 安全的数据长度检查
if len(ext_data) < 50:
self.logger.warning(f"外部数据不足,需要至少 50 条")
return dataframe
return dataframe
TA-Lib 指标计算
安装 TA-Lib
# 通过 pip 安装
pip install TA-Lib
# 如果在 Linux 上安装失败,先安装系统依赖
sudo apt-get install libta-lib0 libta-lib-dev
# 或使用 freqtrade 内置版本
pip install freqtrade[ta-lib]
趋势指标
import talib.abstract as ta
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 移动平均线
dataframe["sma_10"] = ta.SMA(dataframe, timeperiod=10)
dataframe["sma_20"] = ta.SMA(dataframe, timeperiod=20)
dataframe["sma_50"] = ta.SMA(dataframe, timeperiod=50)
dataframe["sma_200"] = ta.SMA(dataframe, timeperiod=200)
dataframe["ema_12"] = ta.EMA(dataframe, timeperiod=12)
dataframe["ema_26"] = ta.EMA(dataframe, timeperiod=26)
# MACD
macd = ta.MACD(dataframe, fastperiod=12, slowperiod=26, signalperiod=9)
dataframe["macd"] = macd["macd"]
dataframe["macd_signal"] = macd["macdsignal"]
dataframe["macd_hist"] = macd["macdhist"]
# ADX(平均趋向指数)
dataframe["adx"] = ta.ADX(dataframe, timeperiod=14)
# 抛物线 SAR
dataframe["sar"] = ta.SAR(dataframe, acceleration=0.02, maximum=0.2)
return dataframe
动量指标
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# RSI
dataframe["rsi"] = ta.RSI(dataframe, timeperiod=14)
dataframe["rsi_6"] = ta.RSI(dataframe, timeperiod=6) # 短期 RSI
# 随机指标
stoch = ta.STOCH(dataframe, fastk_period=14, slowk_period=3, slowd_period=3)
dataframe["stoch_k"] = stoch["slowk"]
dataframe["stoch_d"] = stoch["slowd"]
# Williams %R
dataframe["williams_r"] = ta.WILLR(dataframe, timeperiod=14)
# CCI(商品通道指数)
dataframe["cci"] = ta.CCI(dataframe, timeperiod=20)
# 动量
dataframe["mom"] = ta.MOM(dataframe, timeperiod=10)
# ROC(变化率)
dataframe["roc"] = ta.ROC(dataframe, timeperiod=10)
# Ultimate Oscillator
dataframe["ultosc"] = ta.ULTOSC(dataframe)
return dataframe
波动性指标
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 布林带
bb = ta.BBANDS(dataframe, timeperiod=20, nbdevup=2, nbdevdn=2)
dataframe["bb_upper"] = bb["upperband"]
dataframe["bb_middle"] = bb["middleband"]
dataframe["bb_lower"] = bb["lowerband"]
dataframe["bb_width"] = bb["upperband"] - bb["lowerband"]
dataframe["bb_percent"] = (dataframe["close"] - bb["lowerband"]) / (bb["upperband"] - bb["lowerband"])
# ATR(平均真实波幅)
dataframe["atr"] = ta.ATR(dataframe, timeperiod=14)
dataframe["natr"] = ta.NATR(dataframe, timeperiod=14)
# 肯特纳通道
dataframe["kc_middle"] = ta.EMA(dataframe, timeperiod=20)
kc_atr = ta.ATR(dataframe, timeperiod=10)
dataframe["kc_upper"] = dataframe["kc_middle"] + kc_atr * 2
dataframe["kc_lower"] = dataframe["kc_middle"] - kc_atr * 2
return dataframe
成交量指标
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# OBV(能量潮)
dataframe["obv"] = ta.OBV(dataframe)
# 成交量加权平均价
dataframe["vwap"] = ta.VWAP(dataframe) if hasattr(ta, 'VWAP') else self._calc_vwap(dataframe)
# 成交量的移动平均
dataframe["volume_sma_10"] = ta.SMA(dataframe["volume"], timeperiod=10)
dataframe["volume_sma_20"] = ta.SMA(dataframe["volume"], timeperiod=20)
# 成交量比率
dataframe["volume_ratio"] = dataframe["volume"] / dataframe["volume_sma_20"]
# MFI(资金流量指数)
dataframe["mfi"] = ta.MFI(dataframe, timeperiod=14)
# AD(累积/派发线)
dataframe["ad"] = ta.AD(dataframe)
return dataframe
指标速查表
| 类别 | 指标 | TA-Lib 函数 | 常用参数 |
|---|---|---|---|
| 趋势 | SMA | ta.SMA | timeperiod=20 |
| 趋势 | EMA | ta.EMA | timeperiod=12 |
| 趋势 | MACD | ta.MACD | 12, 26, 9 |
| 趋势 | ADX | ta.ADX | timeperiod=14 |
| 趋势 | SAR | ta.SAR | 0.02, 0.2 |
| 动量 | RSI | ta.RSI | timeperiod=14 |
| 动量 | STOCH | ta.STOCH | 14, 3, 3 |
| 动量 | CCI | ta.CCI | timeperiod=20 |
| 动量 | MFI | ta.MFI | timeperiod=14 |
| 波动 | BBANDS | ta.BBANDS | 20, 2, 2 |
| 波动 | ATR | ta.ATR | timeperiod=14 |
| 成交量 | OBV | ta.OBV | - |
| 成交量 | AD | ta.AD | - |
自定义指标
自定义计算函数
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 调用自定义指标函数
dataframe = self._add_custom_indicators(dataframe, metadata)
return dataframe
def _add_custom_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
"""自定义指标集合"""
# 1. 价格通道突破
dataframe["highest_20"] = dataframe["high"].rolling(20).max()
dataframe["lowest_20"] = dataframe["low"].rolling(20).min()
dataframe["channel_pos"] = (
(dataframe["close"] - dataframe["lowest_20"]) /
(dataframe["highest_20"] - dataframe["lowest_20"])
)
# 2. 加权移动平均
weights = [1, 2, 3, 4, 5]
dataframe["wma_5"] = (
dataframe["close"].rolling(5).apply(
lambda x: sum(w * v for w, v in zip(weights, x)) / sum(weights)
)
)
# 3. 自定义 RSI 背离检测
dataframe["rsi_14"] = ta.RSI(dataframe, timeperiod=14)
dataframe["price_ll"] = dataframe["close"].rolling(14).min()
dataframe["rsi_ll"] = dataframe["rsi_14"].rolling(14).min()
dataframe["bullish_div"] = (
(dataframe["close"] <= dataframe["price_ll"].shift(1)) &
(dataframe["rsi_14"] > dataframe["rsi_ll"].shift(1))
).astype(int)
# 4. 动量的滚动统计
returns = dataframe["close"].pct_change()
dataframe["return_skew"] = returns.rolling(20).skew()
dataframe["return_kurt"] = returns.rolling(20).kurt()
dataframe["return_std"] = returns.rolling(20).std()
# 5. 累积/派发指标
clv = ((dataframe["close"] - dataframe["low"]) - (dataframe["high"] - dataframe["close"])) / (dataframe["high"] - dataframe["low"])
clv = clv.fillna(0)
dataframe["ad_custom"] = clv * dataframe["volume"]
dataframe["ad_custom_ma"] = dataframe["ad_custom"].rolling(20).mean()
return dataframe
使用 Pandas 进行特征工程
def populate_indicators(self, dataframe: DataFrame, metadata: dict) -> DataFrame:
# 滚动窗口统计
for window in [5, 10, 20, 50]:
# 价格统计
dataframe[f"close_mean_{window}"] = dataframe["close"].rolling(window).mean()
dataframe[f"close_std_{window}"] = dataframe["close"].rolling(window).std()
dataframe[f"close_max_{window}"] = dataframe["close"].rolling(window).max()
dataframe[f"close_min_{window}"] = dataframe["close"].rolling(window).min()
# 成交量统计
dataframe[f"volume_mean_{window}"] = dataframe["volume"].rolling(window).mean()
dataframe[f"volume_std_{window}"] = dataframe["volume"].rolling(window).std()
# 价格位置百分比
for window in [10, 20, 50]:
rolling_min = dataframe["close"].rolling(window).min()
rolling_max = dataframe["close"].rolling(window).max()
dataframe[f"close_position_{window}"] = (
(dataframe["close"] - rolling_min) / (rolling_max - rolling_min)
)
# 指数加权移动平均
dataframe["ewm_12"] = dataframe["close"].ewm(span=12, adjust=False).mean()
dataframe["ewm_26"] = dataframe["close"].ewm(span=26, adjust=False).mean()
return dataframe
数据缓存
缓存工作机制
Freqtrade 使用本地文件系统缓存已下载的 K 线数据,避免每次启动都重复下载:
# 数据存储位置
ls user_data/data/
# 输出: binance/ okx/ bybit/
# 查看缓存的数据文件
ls user_data/data/binance/
# 输出: BTC_USDT-1h.json BTC_USDT-5m.json ETH_USDT-1h.json
手动下载数据
# 下载单个交易对的数据
freqtrade download-data \
--exchange binance \
--pairs BTC/USDT \
--days 100 \
--timeframe 5m
# 下载多个交易对
freqtrade download-data \
--exchange binance \
--pairs BTC/USDT ETH/USDT SOL/USDT \
--days 365 \
--timeframes 5m 15m 1h 4h 1d
# 下载所有 USDT 交易对
freqtrade download-data \
--exchange binance \
--pairs .*/USDT \
--days 30 \
--timeframe 1h
# 下载交易对列表文件中的交易对
freqtrade download-data \
--exchange binance \
--pairs-file user_data/pairs.json \
--days 100
缓存清除与更新
# 清除所有缓存数据
rm -rf user_data/data/*
# 使用 --erase 参数强制重新下载
freqtrade download-data --exchange binance --pairs BTC/USDT --days 100 --erase
# 使用计时范围下载
freqtrade download-data \
--exchange binance \
--pairs BTC/USDT \
--timerange 20240101-20240601
数据验证
# 查看已下载数据的状态
freqtrade list-data --exchange binance
# 列出所有交易对
freqtrade list-pairs --exchange binance --all
# 验证数据完整性
freqtrade backtesting --strategy MyStrategy --cache none