import pandas as pd
from mpl_toolkits.axes_grid1 import host_subplot
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
from wind_helper import WindHelper
import os
from pandas.tseries.offsets import Day
import numpy as np
from datetime import datetime
import empyrical as ep
from itertools import product
import talib
from math import ceil
from numpy import sign
# 求解Hull移动平均
def HMA(price, n):
if n == 0:
return price
elif n < 4:
return talib.MA(price, n)
else:
return talib.WMA(2*talib.WMA(price, ceil(n/2)) - talib.WMA(price, n),np.sqrt(n))
# 根据信号得到最终受益及画图
def evaluation(position, prices, name, version, drawplot):
"""
evaluate the strategy
Parameters
----------
position : Series
your trading signal.
prices : DataFrame
target kind price(open, close, high, low, volume).
name : str
name of the target asset.
version : str
name of trading strategy.
drawplot : bool
whether to draw the plot.
Returns
-------
Series
overall performance measure.
returns_by_year : Series
returns in each year.
pnl_profile: Series
details pnl of each trade
"""
if name[:3] == 'CBA':
profit = prices.pct_change() * position.shift(1)
profit_list = []
date_list = prices.index
open_list = []
close_list = []
signal_list = []
net_temp = 1
for i in range(1, len(position) - 1):
# 如果今天的信号与昨天不同,开仓
net_temp *= (profit.iloc[i] + 1)
if (position.iloc[i] != 0) and (sign(position.iloc[i - 1]) != sign(position.iloc[i])):
# 添加出信号当天为开仓日
open_list.append(date_list[i])
signal_list.append(sign(position.iloc[i]))
# 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
if (position.iloc[i - 1] != 0) and (sign(position.iloc[i - 1]) != sign(position.iloc[i])) and len(open_list) > 0:
# 把平今仓所得的收益计入列表
profit_list.append(net_temp - 1)
# 把平仓信号当天设置为平仓日
close_list.append(date_list[i])
net_temp = 1
if len(close_list) < len(open_list):
close_list.append(date_list[-1])
profit_list.append(net_temp - 1)
# calculate strategy measures
profit_list = np.array(profit_list)
if len(profit_list) == 0:
win_rate = 0
odds = 0
turnover = 0
else:
win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
if profit_list[profit_list < 0].size > 0:
odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
else:
odds = np.nan
turnover = len(profit_list) / len(profit) * 100
sharpe = ep.sharpe_ratio(profit)
annual_returns = ep.annual_return(profit) * 100
net_value = (profit + 1).cumprod().dropna()
horizon_by_year = net_value.groupby(net_value.index.strftime('%Y')).count()
returns_by_year = pd.concat([pd.Series(1), net_value.groupby(net_value.index.strftime('%Y')).tail(1)]).pct_change().dropna()
returns_by_year = np.exp(np.log(returns_by_year + 1) / horizon_by_year.values * 244) - 1
else:
profit = prices.diff() * position.shift(1)
profit_list = []
date_list = prices.index
open_list = []
close_list = []
signal_list = []
net_temp = 0
for i in range(1, len(position) - 1):
# 如果今天的信号与昨天不同,开仓
net_temp += profit.iloc[i]
if (position.iloc[i] != 0) and (position.iloc[i - 1] != position.iloc[i]):
# 添加出信号当天为开仓日
open_list.append(date_list[i])
signal_list.append(position.iloc[i])
# 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
if (position.iloc[i - 1] != 0) and (position.iloc[i - 1] != position.iloc[i]) and len(open_list) > 0:
# 把平今仓所得的收益计入列表
profit_list.append(net_temp)
# 把平仓信号当天设置为平仓日
close_list.append(date_list[i])
net_temp = 0
if len(close_list) < len(open_list):
close_list.append(date_list[-1])
profit_list.append(net_temp)
# calculate strategy measures
profit_list = np.array(profit_list)
if len(profit_list) == 0:
win_rate = 0
odds = 0
turnover = 0
else:
win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
if profit_list[profit_list < 0].size > 0:
odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
else:
odds = np.nan
turnover = len(profit_list) / len(profit) * 100
sharpe = ep.sharpe_ratio(profit)
annual_returns = ep.annual_return(profit) * 100
net_value = profit.cumsum().dropna()
horizon_by_year = net_value.groupby(net_value.index.strftime('%Y')).count()
returns_by_year = pd.concat([pd.Series(1), net_value.groupby(net_value.index.strftime('%Y')).tail(1)]).diff().dropna()
returns_by_year = returns_by_year / horizon_by_year.values * 244
drawdown = net_value - net_value.rolling(len(net_value), min_periods=1).max()
max_drawdown = -1 * drawdown.min() * 100
end_drawdown = drawdown.index[np.argmin(drawdown)]
begin_drawdown = drawdown.index[np.argmax(net_value[:end_drawdown])]
calmar = ep.calmar_ratio(profit)
R = -1 * np.mean(profit_list[profit_list < 0])
pnl_profile = pd.DataFrame({'profit and loss': profit_list / R, 'open date': open_list, 'close date': close_list,
'direction': signal_list})
# draw the result plot
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax1 = ax0.twinx()
plt.title("%s %s" % (name, version))
net_value.plot(label=version, ax=ax0)
if name[:3] == 'CBA':
((prices - prices.loc[prices.index[0]]) / 10 + 1).plot(label=name, ax=ax0)
else:
(prices - prices.loc[prices.index[0]]).plot(label=name, ax=ax0)
trans = mtransforms.blended_transform_factory(ax1.transData, ax1.transAxes)
ax1.fill_between(np.array([begin_drawdown, end_drawdown]), 0, 1,
facecolor='green', alpha=0.5, transform=trans)
drawdown.plot(kind="area", label='drawdown(RHS)', color='k', ax=ax1)
ax1.set_ylim([-1, 0])
ax0.legend(loc="best")
plt.savefig("./img/%s %s.png" % (name, version))
pd.concat([position, net_value, (prices - prices.loc[prices.index[0]]), profit],
axis=1).to_excel("data/%s_%s.xlsx" % (name, version))
pnl_profile.to_excel("data/pnl profile %s_%s.xlsx" % (name, version))
return pd.Series([annual_returns, max_drawdown, sharpe, calmar, begin_drawdown,
end_drawdown, win_rate, odds, turnover],
index=['returns', 'drawdown', 'sharpe', 'calmar', 'begin_drawdown',
'end_drawdown', 'win_rate', 'odds', 'turnover']), returns_by_year, pnl_profile
def product_dict(**kwargs):
"""
give the product of a dict of list
Parameters
----------
**kwargs : dict
it input a dict of list, which you want to do product
Yields
------
list
list of dict of product of the input
"""
keys = kwargs.keys()
vals = kwargs.values()
for instance in product(*vals):
yield dict(zip(keys, instance))
# 参数生成器
def params_generator(center, step, num):
"""
generate a list of parameter according to a center rule
Parameters
----------
center : float
parameter list center.
step : float
distance betweeen different points.
num : int
number of points in each side.
Returns
-------
list
list of paramters.
"""
return np.arange(-1 * num, num + 1) * step + center
# 调参器
def params_optimizer(indicator_prices, strategy, params_setting, sample_period, measure, name='A'):
"""
params optimization function for V1-V5
Parameters
----------
kind_prices : DataFrame
price and volume data of you target index.
strategy : function
function you want to optimize parameters.
params_setting : dict
Dict ot parameter candidate list like {param1: [x1, x2...], param2:[y1, y2...]}.
sample_period : str
you target insample period end date.
measure: str/list
your target testing measure, can be a list to get multiple measure
name: target asset name
Returns
-------
params_df : DataFrame
parameter and corresponding sharpe ratio.
"""
params_list = product_dict(**params_setting)
params_df = pd.DataFrame(columns=[x for x in params_setting if len(params_setting[x]) > 1])
result_list = []
for params in params_list:
position = strategy(indicator_prices, **params)
# position_m = position.groupby(position.index.strftime('%Y-%m')).head(1)
# indicator_prices_m = indicator_prices[:sample_period].groupby(indicator_prices[:sample_period].index.strftime('%Y-%m')).head(1)
# result, _ = evaluation(position_m, indicator_prices_m, 'A', "B", False)
try:
result, _, _ = evaluation(position[:sample_period], indicator_prices[:sample_period], name, "B", False)
result_os, _, _ = evaluation(position[sample_period:], indicator_prices[sample_period:], name, "B", False)
except Exception:
continue
params_df = params_df.append({x: params[x] for x in params_df.columns}, ignore_index=True)
result_os.index = [x + "_os" for x in result_os.index]
result_list.append(pd.concat([result[measure], result_os[[x + "_os" for x in measure]]]))
params_df[(measure + [x + "_os" for x in measure])] = result_list
return params_df.sort_values(by=measure, ascending=False)
# 获取宏观数据
def fetch_macro_data(class_name, code_list):
"""
Parameters
----------
class_name: str
name for target data category
code_list: list
list of edb data code
Returns
-------
daily_price: dataframe
DataFrame of edb data
"""
if os.path.exists("data/%s.xlsx"%class_name):
daily_price = pd.read_excel("data/%s.xlsx"%class_name, sheet_name="Daily", index_col=0, date_parser=True,engine='openpyxl')
else:
end_date = datetime.today()
daily_price = wind_help.edb(code_list, "2007-01-01", end_date)
daily_price.to_excel('data/%s.xlsx'%class_name, sheet_name='Daily')
daily_price.index = pd.to_datetime(daily_price.index)
return daily_price
# 获取指数数据
def fetch_index_data(code, begin_date = '2010-01-01', end_date = '2021-05-01'):
"""
fetch index price volume data
Parameters
----------
code : str
target index name.
Returns
-------
daily_price : DataFrame
index is date, columns including close, high, low, open, volume.
"""
if os.path.exists("data/asset_price/%s.xlsx" % code.split('.')[0]):
daily_price = pd.read_excel("data/asset_price/%s.xlsx" % code.split('.')[0], sheet_name="Daily", index_col=0,
date_parser=True, engine='openpyxl')
else:
if code == 'T.CFE':
T00 = wind_help.wsd("T00.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
T01 = wind_help.wsd("T01.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
T02 = wind_help.wsd("T02.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
daily_price = pd.DataFrame(index=T00.index, columns=['close', 'open', 'high', 'low', 'volume', 'settle'])
OI = pd.concat([T00['oi'], T01['oi'], T02['oi']], axis=1)
OI.columns = [0, 1, 2]
OI_index = OI.idxmax(axis=1)
daily_price.where(OI_index != 0, T00.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
inplace=True)
daily_price.where(OI_index != 1, T01.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
inplace=True)
daily_price.where(OI_index != 2, T02.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
inplace=True)
elif code[:3] == 'CBA':
daily_price = wind_help.wsd(code, 'close', begin_date, end_date, options= "TradingCalendar=NIB")
else:
daily_price = wind_help.wsd(code, 'close, high, low, open, volume,amt,vwap,turn', "2005-04-08", end_date)
daily_price.dropna(how="all", inplace=True)
daily_price.to_excel('data/asset_price/%s.xlsx' % code.split('.')[0], sheet_name='Daily')
daily_price.index = pd.to_datetime(daily_price.index)
return daily_price
# 对利率,凸性,平陡,子弹策略超额收益画图
def draw_rate(begin_date, end_date):
maturity = '1-10'
interest_rate = fetch_macro_data('1-5-10yr',['S0059744', 'S0059747', 'S0059749'])
yield_diff = (interest_rate['S0059749'] - interest_rate['S0059744'])
yield_diff = HMA(yield_diff, 5).rename('steepness')
curvature = (2*interest_rate['S0059747'] - (interest_rate['S0059749'] + interest_rate['S0059744']))
curvature = HMA(curvature, 5).rename('curvature')
agg_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
agg_returns = []
agg_durations = []
for interval in agg_dict:
agg_data = pd.read_excel("./哑铃子弹/指标数据2021-05-11(%s).xls"%agg_dict[interval], index_col=0)
agg_data.index = pd.to_datetime(agg_data.index)
agg_data = agg_data.sort_index()
agg_durations.append(agg_data['平均市值法久期(年)'])
agg_returns.append(agg_data['涨跌幅(%)'] / 100)
agg_returns = pd.concat(agg_returns, axis=1)
agg_returns.columns = agg_dict.keys()
agg_durations = pd.concat(agg_durations, axis=1)
agg_durations.columns = agg_dict.keys()
proportion_long = (agg_durations.mean(axis=1) - agg_durations['1-3']) / (agg_durations['7-10'] - agg_durations['1-3'])
dumbell_returns = agg_returns['7-10'] * proportion_long + agg_returns['1-3'] * (1 - proportion_long)
proportion_long = (agg_durations.mean(axis=1) - agg_durations['3-5']) / (agg_durations['5-7'] - agg_durations['3-5'])
bullet_returns = agg_returns['5-7'] * proportion_long + agg_returns['3-5'] * (1 - proportion_long)
agg_return = agg_returns.mean(axis=1)
dumbell_excess_net = (1 + dumbell_returns - agg_return).cumprod().rename('dumbell excess net').loc[begin_date:end_date]
bullet_excess_net = (1 + bullet_returns - agg_return).cumprod().rename('bullet excess net').loc[begin_date:end_date]
agg_dict = {'1-3':'CBA00622.CS','3-5':'CBA00632.CS','5-7':'CBA00642.CS','7-10':'CBA00652.CS'}
agg_returns = []
for interval in agg_dict:
agg_data = fetch_index_data(agg_dict[interval], begin_date, end_date)
agg_returns.append(agg_data.pct_change())
agg_returns = pd.concat(agg_returns, axis=1)
agg_return = agg_returns.mean(axis=1)
plt.figure(figsize=(30, 15))
ax0 = host_subplot(111)
ax0.set_title('dumbell&bullet excess net %s'%maturity)
ax1 = ax0.twinx()
original_net = (1 + agg_return.loc[begin_date:end_date]).cumprod().rename('aggregate net value')
original_net.plot(ax=ax1)
bullet_excess_net.plot(ax=ax0)
pd.concat([yield_diff.loc[begin_date:end_date]/100 + 1, curvature.loc[begin_date:end_date]/100 + 1], axis=1).plot(ax=ax0)
plt.legend()
def win_rate(series):
return len(series.loc[series > 0]) / len(series.loc[series != 0])
def odds(series):
return -1 * series.loc[series > 0].mean() / series.loc[series < 0].mean()
def turnover(series):
return len(series.loc[series != 0]) / len(series)
# 价格能量策略
def strategy_v1(prices, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
**kwargs :
scale: whether to scale the price and volume
hma_short: hma short window of volume
hma_long: hma long window of volume
dema: price smooth dema window
energy: smoothed price momentum window
long_thres: threshold for initiating the trade
short: whether using short strategy
method: method to use in filtering oscilating scenarios
efficiency: window of calculating volatility indicator
efficiency_thres: threshold to filter oscilating scenarios
Returns
-------
position : Series
0 1 -1 signal of the holdings .
"""
# 定义价能
price_energy = talib.DEMA(prices,kwargs['dema']).pct_change(kwargs['energy'])*200 + 1
# 如果只有价格信息那么PV指标只有价能,否则要乘以量能
P_V = price_energy
position = pd.Series(index = P_V.index, dtype='float64', name='position')
position[P_V >= kwargs['long_thres']] = 1
position.fillna(0, inplace=True)
# 首先把低于阈值的都变成空头
position[P_V < kwargs['long_thres']] = -1
# 使用类波动率与趋势指标过滤掉空头仓位,如果不想做限制可以不加method
if 'method' in kwargs:
cut_value = 0
if kwargs['method'] == 'Efficiency':
# 用价格效率过滤震荡信号
price_efficiency = 100 * prices.diff(kwargs['efficiency']).abs() / (prices.diff(1).abs().rolling(kwargs['efficiency']).sum())
position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
elif kwargs['method'] == 'Tangent':
# 用斜率过滤震荡信号
position[P_V < kwargs['long_thres']] = -1
price_efficiency = 1000 * prices.diff(kwargs['efficiency'])/kwargs['efficiency']
price_efficiency.loc[position.index] = price_efficiency.loc[position.index] * position
position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
elif kwargs['method'] == 'CMI':
# 用CMI指数过滤震荡信号
position[P_V < kwargs['long_thres']] = -1
Range = prices.rolling(kwargs['efficiency'], 1).max() - prices.rolling(kwargs['efficiency'], 1).min()
price_efficiency = (100 * prices.diff(kwargs['efficiency']).abs() / Range).fillna(100)
position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
return position
# MTM策略
def strategy_MT(prices, drawplot=False, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
drawplot : TYPE, optional
DESCRIPTION. The default is False. control whether to draw holdings plot
**kwargs : dict
smooth: HMA window for smoothing price at first
diff: diff window of price
MA: MA window of MT indicator
Returns
-------
position : TYPE
DESCRIPTION.
"""
prices = HMA(prices, kwargs['smooth'])
MT = prices.diff(kwargs['diff'])
MTM = HMA(MT, kwargs['MA'])
together = pd.concat([prices, MT, MTM], axis=1)
together.columns = ['rates', 'MT', 'MTM']
position = pd.Series(index = together.index, dtype='float64')
position[(together['MT'] > 0) & (together['MT'] > together['MTM'])] = 1
position[(together['MT'] < 0) & (together['MT'] < together['MTM'])] = -1
position.fillna(0, inplace=True)
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('MT strategy')
prices.plot(ax=ax0)
trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
ax0.fill_between(prices.index, 0, 1, where=position > 0, color='green', alpha=0.5,
transform=trans)
ax0.fill_between(prices.index, 0, 1, where=position < 0, color='red', alpha=0.5,
transform=trans)
plt.legend()
plt.savefig("./img/bond portfolio MT.png")
return position
# 布林带突破策略
def strategy_BOLL_breakout(prices, drawplot=False, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
drawplot : TYPE, optional
DESCRIPTION. The default is False. control whether to draw holdings plot
**kwargs : dict
smooth: HMA window for smoothing price at first
std_window: volatility window of price
width: band width of boll bands
Returns
-------
position : TYPE
DESCRIPTION.
"""
MA_prices = HMA(prices, kwargs['smooth'])
BAND = prices.rolling(kwargs['std_window'], 5).std()
Upper_band = (MA_prices + kwargs['width']*BAND).values
Upper_diff = prices.values - Upper_band
Lower_band = (MA_prices - kwargs['width']*BAND).values
Lower_diff = prices.values - Lower_band
positions = np.zeros(len(prices))
position = 0
for i in range(2, len(prices)):
if position == 0:
if (Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0):
position = 1
elif (Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0):
position = -1
else:
if (position == 1) and (Upper_diff[i - 2] > 0) and (Upper_diff[i] < 0) and (Upper_diff[i - 1] < 0):
position = 0
if (position == -1) and (Lower_diff[i - 2] < 0) and (Lower_diff[i] > 0) and (Lower_diff[i - 1] > 0):
position = 0
positions[i] = position
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('boll strategy')
pd.concat([prices, MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
transform=trans)
ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
transform=trans)
plt.legend()
plt.savefig("./img/bond portfolio boll breakout.png")
return pd.Series(positions, index=prices.index)
# 布林带反转策略
def strategy_BOLL_reverse(prices, drawplot=False, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
drawplot : TYPE, optional
DESCRIPTION. The default is False. control whether to draw holdings plot
**kwargs : dict
smooth: HMA window for smoothing price at first
std_window: volatility window of price
width: band width of boll bands
Returns
-------
position : TYPE
DESCRIPTION.
"""
MA_prices = HMA(prices, kwargs['smooth']).values
BAND = prices.rolling(kwargs['std_window'], 5).std().values
Upper_band = MA_prices + kwargs['width']*BAND
Upper_diff = prices.values - Upper_band
Lower_band = MA_prices - kwargs['width']*BAND
Lower_diff = prices.values - Lower_band
positions = np.zeros(len(prices))
position = 0
for i in range(2, len(prices)):
if position == 0:
if (Lower_diff[i - 2] < 0) and (Lower_diff[i - 1] > 0) and (Lower_diff[i] > 0) and (MA_prices[i] > MA_prices[i - 1]):
position = 1
elif (Upper_diff[i - 2] > 0) and (Upper_diff[i - 1] < 0) and (Upper_diff[i] < 0) and (MA_prices[i] < MA_prices[i - 1]):
position = -1
else:
if (position == 1) and (((Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0)) or ((Lower_diff[i - 2] > 0) and (Lower_diff[i - 1] < 0) and (Lower_diff[i] < 0))):
position = 0
if (position == -1) and (((Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0)) or ((Upper_diff[i - 2] < 0) and (Upper_diff[i - 1] > 0) and (Upper_diff[i] > 0))):
position = 0
positions[i] = position
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('boll strategy reverse')
pd.concat([prices, pd.Series(MA_prices, index=prices.index), pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
transform=trans)
ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
transform=trans)
plt.legend()
plt.savefig("./img/bond portfolio boll reverse.png")
return pd.Series(positions, index=prices.index)
# 分位数突破策略
def strategy_quantile_breakout(prices, drawplot=False, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
drawplot : TYPE, optional
DESCRIPTION. The default is False. control whether to draw holdings plot
**kwargs : dict
MA: HMA window for smoothing price at first
window: quantile window of price
width: quantile width
Returns
-------
position : TYPE
DESCRIPTION.
"""
MA_prices = HMA(prices, kwargs['MA'])
Upper_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 + kwargs['width']).values
Upper_diff = MA_prices.values - Upper_band
Lower_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 - kwargs['width']).values
Lower_diff = MA_prices.values - Lower_band
positions = np.zeros(len(prices))
position = 0
for i in range(2, len(prices)):
if position == 0:
if (Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0):
position = 1
elif (Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0):
position = -1
else:
if (position == 1) and (Upper_diff[i - 2] > 0) and (Upper_diff[i] < 0) and (Upper_diff[i - 1] < 0):
position = 0
if (position == -1) and (Lower_diff[i - 2] < 0) and (Lower_diff[i] > 0) and (Lower_diff[i - 1] > 0):
position = 0
positions[i] = position
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('quantile strategy')
pd.concat([MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
transform=trans)
ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
transform=trans)
plt.legend()
return pd.Series(positions, index=prices.index)
# 分位数反转策略
def strategy_quantile_reverse(prices, drawplot=False, **kwargs):
"""
Parameters
----------
prices : DataFrame
Price and volume data of index.
drawplot : TYPE, optional
DESCRIPTION. The default is False. control whether to draw holdings plot
**kwargs : dict
MA: HMA window for smoothing price at first
window: quantile window of price
width: quantile width
Returns
-------
position : TYPE
DESCRIPTION.
"""
MA_prices = HMA(prices, kwargs['MA'])
Upper_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 + kwargs['width']).values
Upper_diff = MA_prices.values - Upper_band
Lower_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 - kwargs['width']).values
Lower_diff = MA_prices.values - Lower_band
positions = np.zeros(len(prices))
position = 0
for i in range(2, len(prices)):
if position == 0:
if (Lower_diff[i - 2] < 0) and (Lower_diff[i - 1] > 0) and (Lower_diff[i] > 0) and (MA_prices[i] > MA_prices[i - 1]):
position = 1
elif (Upper_diff[i - 2] > 0) and (Upper_diff[i - 1] < 0) and (Upper_diff[i] < 0) and (MA_prices[i] < MA_prices[i - 1]):
position = -1
else:
if (position == 1) and (((Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0)) or ((Lower_diff[i - 2] > 0) and (Lower_diff[i - 1] < 0) and (Lower_diff[i] < 0))):
position = 0
if (position == -1) and (((Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0)) or ((Upper_diff[i - 2] < 0) and (Upper_diff[i - 1] > 0) and (Upper_diff[i] > 0))):
position = 0
positions[i] = position
if drawplot:
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('quantile strategy')
pd.concat([MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
transform=trans)
ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
transform=trans)
plt.legend()
return pd.Series(positions, index=prices.index)
# 根据平陡配置子弹哑铃(上帝视角)
def strategy_maturity_steepness_god(begin, end):
"""
Parameters
----------
begin : datetime
backtest begin date.
end : datetime
backtest end date.
Returns
-------
overall_info : dataframe
all middle information during the backtest.
performance : series
performance statistics.
returns_by_year : series
each years returns.
"""
# 变平看-1,变陡看1
agg_return_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
interest_rate = fetch_macro_data('1-5-10yr',['S0059744', 'S0059747', 'S0059749'])
steepness = (interest_rate['S0059749'] - interest_rate['S0059744']).loc[begin:end]
agg_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
agg_returns = []
agg_durations = []
for interval in agg_dict:
agg_data = pd.read_excel("./哑铃子弹/指标数据2021-05-11(%s).xls"%agg_dict[interval], index_col=0)
agg_data.index = pd.to_datetime(agg_data.index)
agg_data = agg_data.sort_index().loc[begin:end, :]
agg_durations.append(agg_data['平均市值法久期(年)'])
agg_returns.append(agg_data['涨跌幅(%)'] / 100)
agg_returns = pd.concat(agg_returns, axis=1)
agg_durations = pd.concat(agg_durations, axis=1)
durations_agg = agg_durations.mean(axis=1)
agg_returns_mean = agg_returns.mean(axis=1)
agg_returns.columns = agg_dict.keys()
agg_durations.columns = agg_dict.keys()
detail_date = pd.read_excel('利率走势.xlsx', sheet_name='Sheet2')
date_list = detail_date['日期'].tolist()
returns_list = []
bool_list = []
for i in range(len(date_list) - 1):
signal = steepness.loc[date_list[i]+Day():date_list[i+1]].iloc[-1] - steepness.loc[date_list[i]+Day():date_list[i+1]].iloc[0]
if abs(signal) > .1:
signal = np.sign(signal)
else:
signal = 0
if signal == 0:
returns_portfolio = agg_returns_mean.loc[date_list[i]+Day():date_list[i+1]]
bool_list.extend([0] * len(returns_portfolio))
elif signal == -1:
agg_duration = durations_agg.loc[date_list[i]+Day():date_list[i+1]]
returns_short = agg_returns.loc[date_list[i]+Day():date_list[i+1], '1-3']
duration_short = agg_durations.loc[date_list[i]+Day():date_list[i+1], '1-3']
returns_long = agg_returns.loc[date_list[i]+Day():date_list[i+1], '7-10']
duration_long = agg_durations.loc[date_list[i]+Day():date_list[i+1], '7-10']
proportion_long = (agg_duration - duration_short) / (duration_long - duration_short)
returns_portfolio = proportion_long * returns_long + (1 - proportion_long) * returns_short
bool_list.extend([-1] * len(returns_portfolio))
elif signal == 1:
agg_duration = durations_agg.loc[date_list[i]+Day():date_list[i+1]]
returns_short = agg_returns.loc[date_list[i]+Day():date_list[i+1], '3-5']
duration_short = agg_durations.loc[date_list[i]+Day():date_list[i+1], '3-5']
returns_long = agg_returns.loc[date_list[i]+Day():date_list[i+1], '5-7']
duration_long = agg_durations.loc[date_list[i]+Day():date_list[i+1], '5-7']
proportion_long = (agg_duration - duration_short) / (duration_long - duration_short)
returns_portfolio = proportion_long * returns_long + (1 - proportion_long) * returns_short
bool_list.extend([1] * len(returns_portfolio))
else:
print('wrong signal')
return None
returns_list.append(returns_portfolio)
returns_series = pd.concat(returns_list)
signal = bool_list[:-1]
signal.append(0)
signal = np.array(signal)
excess_returns = returns_series - agg_returns_mean
net_temp = 1
profit_list = []
for i in range(1, len(excess_returns) - 1):
# 如果今天的信号与昨天不同,开仓
net_temp *= excess_returns.iloc[i] + 1
# 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
if (signal[i - 1] != 0) and (signal[i - 1] != signal[i]):
# 把平今仓所得的收益计入列表
profit_list.append(net_temp - 1)
net_temp = 1
profit_list = np.array(profit_list)
if len(profit_list) == 0:
win_rate = 0
odds = 0
turnover = 0
else:
win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
if profit_list[profit_list < 0].size > 0:
odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
else:
odds = np.nan
turnover = len(profit_list) / len(excess_returns) * 100
sharpe = ep.sharpe_ratio(excess_returns)
annual_returns = ep.annual_return(excess_returns) * 100
plt.figure(figsize=(20, 8))
ax0 = host_subplot(111)
ax0.set_title('dumbell&bullet 1-10yr')
ax1 = ax0.twinx()
enhanced_net = (returns_series + 1).cumprod()
agg_net = (agg_returns_mean + 1).cumprod()
overall_net = pd.concat([enhanced_net, agg_net, steepness.loc[begin:end]], axis=1)
overall_net.columns = ['Enhanced Portfolio Net Value', 'Aggregate Fortune Index Net Value', 'steepness']
overall_net.fillna(method='ffill', inplace=True)
overall_net.plot(ax=ax0)
excess_net = (excess_returns + 1).cumprod()
print('excess_net is {:.4f}'.format(excess_net.iloc[-1]))
returns_by_year = pd.concat([pd.Series(1), excess_net.groupby(excess_net.index.strftime('%Y')).tail(1)]).pct_change().dropna()
excess_net.plot(ax=ax1, label='Excess Return')
drawdown = excess_net - excess_net.rolling(len(excess_net), min_periods=1).max()
max_drawdown = -1 * drawdown.min() * 100
end_drawdown = drawdown.index[np.argmin(drawdown)]
begin_drawdown = drawdown.index[np.argmax(excess_net[:end_drawdown])]
calmar = ep.calmar_ratio(excess_returns)
proportion_long = (durations_agg - agg_durations['3-5']) / (agg_durations['5-7'] - agg_durations['3-5'])
bullet_returns = agg_returns['5-7'] * proportion_long + agg_returns['3-5'] * (1 - proportion_long)
(bullet_returns - agg_returns_mean + 1).cumprod().plot(ax=ax1, label='Bullet Excess Return')
y_min = (overall_net.iloc[:, 0] - overall_net.iloc[:, 1]).min()
y_max = (overall_net.iloc[:, 0] - overall_net.iloc[:, 1]).max()
ax1.set_ylim([excess_net.min() - .01, excess_net.max()+.01])
trans = mtransforms.blended_transform_factory(ax1.transData, ax1.transAxes)
ax1.fill_between(returns_series.index, 0, 1, where=signal > 0, color='green', alpha=0.2,
transform=trans)
ax1.fill_between(returns_series.index, 0, 1, where=signal < 0, color='red', alpha=0.2,
transform=trans)
plt.legend()
plt.savefig('./img/dumbell&bullet 1-10 diff -2020 steepness god')
overall_info = pd.concat([pd.Series(signal, index=returns_series.index, name='signal'),
agg_returns, 10000*(excess_net - 1).rename('excess net value'), overall_net,
durations_agg.rename('Aggregate duration')], axis=1)
overall_info.to_excel("./data/overall info dumbell&bullet steepness god 1-10 diff.xlsx")
performance = pd.Series([annual_returns, max_drawdown, sharpe, calmar, begin_drawdown,
end_drawdown, win_rate, odds, turnover],
index=['returns', 'drawdown', 'sharpe', 'calmar', 'begin_drawdown',
'end_drawdown', 'win_rate', 'odds', 'turnover'])
# pnl_profile.to_excel("./data/pnl profile dumbell&bullet %s.xlsx" % maturity)
return overall_info, performance, returns_by_year
if __name__ == '__main__':
wind_help = WindHelper()
# signal_weight(.5)
# interest_rate = fetch_macro_data('1-5-10yr', ['S0059744', 'S0059747', 'S0059749'])
# yield_diff = (interest_rate['S0059749'] - interest_rate['S0059744'])
# curvature = 2 * interest_rate['S0059747'] - (interest_rate['S0059749'] + interest_rate['S0059744'])
info, performance, returns_year = strategy_maturity_final(datetime(2008, 1, 1), datetime(2021, 5, 8))