债券交易策略

import pandas as pd
from mpl_toolkits.axes_grid1 import host_subplot
import matplotlib.pyplot as plt
import matplotlib.transforms as mtransforms
from wind_helper import WindHelper
import os
from pandas.tseries.offsets import Day
import numpy as np
from datetime import datetime
import empyrical as ep
from itertools import product
import talib
from math import ceil
from numpy import sign


# 求解Hull移动平均
def HMA(price, n):
    if n == 0:
        return price
    elif n < 4:
        return talib.MA(price, n)
    else:
        return talib.WMA(2*talib.WMA(price, ceil(n/2)) - talib.WMA(price, n),np.sqrt(n))


# 根据信号得到最终受益及画图
def evaluation(position, prices, name, version, drawplot):
    """
    evaluate the strategy
    Parameters
    ----------
    position : Series
        your trading signal.
    prices : DataFrame
        target kind price(open, close, high, low, volume).
    name : str
        name of the target asset.
    version : str
        name of trading strategy.
    drawplot : bool
        whether to draw the plot.

    Returns
    -------
    Series
        overall performance measure.
    returns_by_year : Series
        returns in each year.
    pnl_profile: Series
        details pnl of each trade

    """
    if name[:3] == 'CBA':
        profit = prices.pct_change() * position.shift(1)
        profit_list = []
        date_list = prices.index
        open_list = []
        close_list = []
        signal_list = []
        net_temp = 1
        for i in range(1, len(position) - 1):
            # 如果今天的信号与昨天不同,开仓
            net_temp *= (profit.iloc[i] + 1)
            if (position.iloc[i] != 0) and (sign(position.iloc[i - 1]) != sign(position.iloc[i])):
                # 添加出信号当天为开仓日
                open_list.append(date_list[i])
                signal_list.append(sign(position.iloc[i]))
            # 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
            if (position.iloc[i - 1] != 0) and (sign(position.iloc[i - 1]) != sign(position.iloc[i])) and len(open_list) > 0:
                # 把平今仓所得的收益计入列表
                profit_list.append(net_temp - 1)
                # 把平仓信号当天设置为平仓日
                close_list.append(date_list[i])
                net_temp = 1
        if len(close_list) < len(open_list):
            close_list.append(date_list[-1])
            profit_list.append(net_temp - 1)
        # calculate strategy measures
        profit_list = np.array(profit_list)
        if len(profit_list) == 0:
            win_rate = 0
            odds = 0
            turnover = 0
        else:
            win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
            if profit_list[profit_list < 0].size > 0:
                odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
            else:
                odds = np.nan
            turnover = len(profit_list) / len(profit) * 100

        sharpe = ep.sharpe_ratio(profit)
        annual_returns = ep.annual_return(profit) * 100
        net_value = (profit + 1).cumprod().dropna()
        horizon_by_year = net_value.groupby(net_value.index.strftime('%Y')).count()
        returns_by_year = pd.concat([pd.Series(1), net_value.groupby(net_value.index.strftime('%Y')).tail(1)]).pct_change().dropna()
        returns_by_year = np.exp(np.log(returns_by_year + 1) / horizon_by_year.values * 244) - 1
    else:
        profit = prices.diff() * position.shift(1)
        profit_list = []
        date_list = prices.index
        open_list = []
        close_list = []
        signal_list = []
        net_temp = 0
        for i in range(1, len(position) - 1):
            # 如果今天的信号与昨天不同,开仓
            net_temp += profit.iloc[i]
            if (position.iloc[i] != 0) and (position.iloc[i - 1] != position.iloc[i]):
                # 添加出信号当天为开仓日
                open_list.append(date_list[i])
                signal_list.append(position.iloc[i])
            # 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
            if (position.iloc[i - 1] != 0) and (position.iloc[i - 1] != position.iloc[i]) and len(open_list) > 0:
                # 把平今仓所得的收益计入列表
                profit_list.append(net_temp)
                # 把平仓信号当天设置为平仓日
                close_list.append(date_list[i])
                net_temp = 0
        if len(close_list) < len(open_list):
            close_list.append(date_list[-1])
            profit_list.append(net_temp)
        # calculate strategy measures
        profit_list = np.array(profit_list)
        if len(profit_list) == 0:
            win_rate = 0
            odds = 0
            turnover = 0
        else:
            win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
            if profit_list[profit_list < 0].size > 0:
                odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
            else:
                odds = np.nan
            turnover = len(profit_list) / len(profit) * 100

        sharpe = ep.sharpe_ratio(profit)
        annual_returns = ep.annual_return(profit) * 100
        net_value = profit.cumsum().dropna()
        horizon_by_year = net_value.groupby(net_value.index.strftime('%Y')).count()
        returns_by_year = pd.concat([pd.Series(1), net_value.groupby(net_value.index.strftime('%Y')).tail(1)]).diff().dropna()
        returns_by_year = returns_by_year / horizon_by_year.values * 244
    drawdown = net_value - net_value.rolling(len(net_value), min_periods=1).max()
    max_drawdown = -1 * drawdown.min() * 100
    end_drawdown = drawdown.index[np.argmin(drawdown)]
    begin_drawdown = drawdown.index[np.argmax(net_value[:end_drawdown])]
    calmar = ep.calmar_ratio(profit)
    R = -1 * np.mean(profit_list[profit_list < 0])
    pnl_profile = pd.DataFrame({'profit and loss': profit_list / R, 'open date': open_list, 'close date': close_list,
                                'direction': signal_list})
    # draw the result plot
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax1 = ax0.twinx()
        plt.title("%s %s" % (name, version))
        net_value.plot(label=version, ax=ax0)
        if name[:3] == 'CBA':
            ((prices - prices.loc[prices.index[0]]) / 10 + 1).plot(label=name, ax=ax0)
        else:
            (prices - prices.loc[prices.index[0]]).plot(label=name, ax=ax0)
        trans = mtransforms.blended_transform_factory(ax1.transData, ax1.transAxes)
        ax1.fill_between(np.array([begin_drawdown, end_drawdown]), 0, 1,
                         facecolor='green', alpha=0.5, transform=trans)
        drawdown.plot(kind="area", label='drawdown(RHS)', color='k', ax=ax1)
        ax1.set_ylim([-1, 0])
        ax0.legend(loc="best")
        plt.savefig("./img/%s %s.png" % (name, version))
        pd.concat([position, net_value, (prices - prices.loc[prices.index[0]]), profit],
                  axis=1).to_excel("data/%s_%s.xlsx" % (name, version))
        pnl_profile.to_excel("data/pnl profile %s_%s.xlsx" % (name, version))
    return pd.Series([annual_returns, max_drawdown, sharpe, calmar, begin_drawdown,
                      end_drawdown, win_rate, odds, turnover],
                     index=['returns', 'drawdown', 'sharpe', 'calmar', 'begin_drawdown',
                            'end_drawdown', 'win_rate', 'odds', 'turnover']), returns_by_year, pnl_profile


def product_dict(**kwargs):
    """
    give the product of a dict of list

    Parameters
    ----------
    **kwargs : dict
        it input a dict of list, which you want to do product

    Yields
    ------
    list
        list of dict of product of the input

    """
    keys = kwargs.keys()
    vals = kwargs.values()
    for instance in product(*vals):
        yield dict(zip(keys, instance))


# 参数生成器
def params_generator(center, step, num):
    """
    generate a list of parameter according to a center rule

    Parameters
    ----------
    center : float
        parameter list center.
    step : float
        distance betweeen different points.
    num : int
        number of points in each side.

    Returns
    -------
    list
        list of paramters.

    """
    return np.arange(-1 * num, num + 1) * step + center


# 调参器
def params_optimizer(indicator_prices, strategy, params_setting, sample_period, measure, name='A'):
    """
    params optimization function for V1-V5

    Parameters
    ----------
    kind_prices : DataFrame
        price and volume data of you target index.
    strategy : function
        function you want to optimize parameters.
    params_setting : dict
        Dict ot parameter candidate list like {param1: [x1, x2...], param2:[y1, y2...]}.
    sample_period : str
        you target insample period end date.
    measure: str/list
        your target testing measure, can be a list to get multiple measure
    name: target asset name


    Returns
    -------
    params_df : DataFrame
        parameter and corresponding sharpe ratio.

    """
    params_list = product_dict(**params_setting)
    params_df = pd.DataFrame(columns=[x for x in params_setting if len(params_setting[x]) > 1])
    result_list = []
    for params in params_list:
        position = strategy(indicator_prices, **params)
        # position_m = position.groupby(position.index.strftime('%Y-%m')).head(1)
        # indicator_prices_m = indicator_prices[:sample_period].groupby(indicator_prices[:sample_period].index.strftime('%Y-%m')).head(1)
        # result, _ = evaluation(position_m, indicator_prices_m, 'A', "B", False)
        try:
            result, _, _ = evaluation(position[:sample_period], indicator_prices[:sample_period], name, "B", False)
            result_os, _, _ = evaluation(position[sample_period:], indicator_prices[sample_period:], name, "B", False)
        except Exception:
            continue
        params_df = params_df.append({x: params[x] for x in params_df.columns}, ignore_index=True)
        result_os.index = [x + "_os" for x in result_os.index]
        result_list.append(pd.concat([result[measure], result_os[[x + "_os" for x in measure]]]))
    params_df[(measure + [x + "_os" for x in measure])] = result_list
    return params_df.sort_values(by=measure, ascending=False)


# 获取宏观数据
def fetch_macro_data(class_name, code_list):
    """
    Parameters
    ----------
    class_name: str
        name for target data category
    code_list: list
        list of edb data code
    Returns
    -------
    daily_price: dataframe
        DataFrame of edb data
    """
    if os.path.exists("data/%s.xlsx"%class_name):
        daily_price = pd.read_excel("data/%s.xlsx"%class_name, sheet_name="Daily", index_col=0, date_parser=True,engine='openpyxl')
    else:
        end_date = datetime.today()
        daily_price = wind_help.edb(code_list, "2007-01-01", end_date)
        daily_price.to_excel('data/%s.xlsx'%class_name, sheet_name='Daily')
    daily_price.index = pd.to_datetime(daily_price.index)
    return daily_price


# 获取指数数据
def fetch_index_data(code, begin_date = '2010-01-01', end_date = '2021-05-01'):
    """
    fetch index price volume data

    Parameters
    ----------
    code : str
        target index name.

    Returns
    -------
    daily_price : DataFrame
        index is date, columns including close, high, low, open, volume.

    """
    if os.path.exists("data/asset_price/%s.xlsx" % code.split('.')[0]):
        daily_price = pd.read_excel("data/asset_price/%s.xlsx" % code.split('.')[0], sheet_name="Daily", index_col=0,
                                    date_parser=True, engine='openpyxl')
    else:
        if code == 'T.CFE':
            T00 = wind_help.wsd("T00.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
            T01 = wind_help.wsd("T01.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
            T02 = wind_help.wsd("T02.CFE", 'close, high, low, open, oi , volume, settle', "2015-03-20", end_date)
            daily_price = pd.DataFrame(index=T00.index, columns=['close', 'open', 'high', 'low', 'volume', 'settle'])
            OI = pd.concat([T00['oi'], T01['oi'], T02['oi']], axis=1)
            OI.columns = [0, 1, 2]
            OI_index = OI.idxmax(axis=1)
            daily_price.where(OI_index != 0, T00.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
                              inplace=True)
            daily_price.where(OI_index != 1, T01.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
                              inplace=True)
            daily_price.where(OI_index != 2, T02.loc[:, ['close', 'high', 'low', 'open', 'volume', 'settle']],
                              inplace=True)
        elif code[:3] == 'CBA':
            daily_price = wind_help.wsd(code, 'close', begin_date, end_date, options= "TradingCalendar=NIB")
        else:
            daily_price = wind_help.wsd(code, 'close, high, low, open, volume,amt,vwap,turn', "2005-04-08", end_date)

        daily_price.dropna(how="all", inplace=True)
        daily_price.to_excel('data/asset_price/%s.xlsx' % code.split('.')[0], sheet_name='Daily')
        daily_price.index = pd.to_datetime(daily_price.index)
    return daily_price


# 对利率,凸性,平陡,子弹策略超额收益画图
def draw_rate(begin_date, end_date):
    maturity = '1-10'
    interest_rate = fetch_macro_data('1-5-10yr',['S0059744', 'S0059747', 'S0059749'])
    yield_diff = (interest_rate['S0059749'] - interest_rate['S0059744'])
    yield_diff = HMA(yield_diff, 5).rename('steepness')
    curvature = (2*interest_rate['S0059747'] - (interest_rate['S0059749'] + interest_rate['S0059744']))
    curvature = HMA(curvature, 5).rename('curvature')
    agg_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
    agg_returns = []
    agg_durations = []
    for interval in agg_dict:
        agg_data = pd.read_excel("./哑铃子弹/指标数据2021-05-11(%s).xls"%agg_dict[interval], index_col=0)
        agg_data.index = pd.to_datetime(agg_data.index)
        agg_data = agg_data.sort_index()
        agg_durations.append(agg_data['平均市值法久期(年)'])
        agg_returns.append(agg_data['涨跌幅(%)'] / 100)

    agg_returns = pd.concat(agg_returns, axis=1)
    agg_returns.columns = agg_dict.keys()
    agg_durations = pd.concat(agg_durations, axis=1)
    agg_durations.columns = agg_dict.keys()

    proportion_long = (agg_durations.mean(axis=1) - agg_durations['1-3']) / (agg_durations['7-10'] - agg_durations['1-3'])
    dumbell_returns = agg_returns['7-10'] * proportion_long + agg_returns['1-3'] * (1 - proportion_long)
    proportion_long = (agg_durations.mean(axis=1) - agg_durations['3-5']) / (agg_durations['5-7'] - agg_durations['3-5'])
    bullet_returns = agg_returns['5-7'] * proportion_long + agg_returns['3-5'] * (1 - proportion_long)
    agg_return = agg_returns.mean(axis=1)
    dumbell_excess_net = (1 + dumbell_returns - agg_return).cumprod().rename('dumbell excess net').loc[begin_date:end_date]
    bullet_excess_net = (1 + bullet_returns - agg_return).cumprod().rename('bullet excess net').loc[begin_date:end_date]
    agg_dict = {'1-3':'CBA00622.CS','3-5':'CBA00632.CS','5-7':'CBA00642.CS','7-10':'CBA00652.CS'}
    agg_returns = []
    for interval in agg_dict:
        agg_data = fetch_index_data(agg_dict[interval], begin_date, end_date)
        agg_returns.append(agg_data.pct_change())
    agg_returns = pd.concat(agg_returns, axis=1)
    agg_return = agg_returns.mean(axis=1)
    plt.figure(figsize=(30, 15))
    ax0 = host_subplot(111)
    ax0.set_title('dumbell&bullet excess net %s'%maturity)
    ax1 = ax0.twinx()
    original_net = (1 + agg_return.loc[begin_date:end_date]).cumprod().rename('aggregate net value')
    original_net.plot(ax=ax1)
    bullet_excess_net.plot(ax=ax0)
    pd.concat([yield_diff.loc[begin_date:end_date]/100 + 1, curvature.loc[begin_date:end_date]/100 + 1], axis=1).plot(ax=ax0)
    plt.legend()


def win_rate(series):
    return len(series.loc[series > 0]) / len(series.loc[series != 0])


def odds(series):
    return -1 * series.loc[series > 0].mean() / series.loc[series < 0].mean()


def turnover(series):
    return len(series.loc[series != 0]) / len(series)


# 价格能量策略
def strategy_v1(prices, **kwargs):
    """
    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    **kwargs :
        scale: whether to scale the price and volume
        hma_short: hma short window of volume
        hma_long: hma long window of volume
        dema: price smooth dema window
        energy: smoothed price momentum window
        long_thres: threshold for initiating the trade
        short: whether using short strategy
        method: method to use in filtering oscilating scenarios
        efficiency: window of calculating volatility indicator
        efficiency_thres: threshold to filter oscilating scenarios
    Returns
    -------
    position : Series
        0 1 -1 signal of the holdings .

    """
    # 定义价能
    price_energy = talib.DEMA(prices,kwargs['dema']).pct_change(kwargs['energy'])*200 + 1
    # 如果只有价格信息那么PV指标只有价能,否则要乘以量能
    P_V = price_energy
    position = pd.Series(index = P_V.index, dtype='float64', name='position')
    position[P_V >= kwargs['long_thres']] = 1
    position.fillna(0, inplace=True)
    # 首先把低于阈值的都变成空头
    position[P_V < kwargs['long_thres']] = -1
    # 使用类波动率与趋势指标过滤掉空头仓位,如果不想做限制可以不加method
    if 'method' in kwargs:
        cut_value = 0
        if kwargs['method'] == 'Efficiency':
            # 用价格效率过滤震荡信号
            price_efficiency = 100 * prices.diff(kwargs['efficiency']).abs() / (prices.diff(1).abs().rolling(kwargs['efficiency']).sum())
            position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
        elif kwargs['method'] == 'Tangent':
            # 用斜率过滤震荡信号
            position[P_V < kwargs['long_thres']] = -1
            price_efficiency = 1000 * prices.diff(kwargs['efficiency'])/kwargs['efficiency']
            price_efficiency.loc[position.index] = price_efficiency.loc[position.index] * position
            position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
        elif kwargs['method'] == 'CMI':
            # 用CMI指数过滤震荡信号
            position[P_V < kwargs['long_thres']] = -1
            Range = prices.rolling(kwargs['efficiency'], 1).max() - prices.rolling(kwargs['efficiency'], 1).min()
            price_efficiency = (100 * prices.diff(kwargs['efficiency']).abs() / Range).fillna(100)
            position[(price_efficiency < kwargs["efficiency_thres"])] = cut_value * position[(price_efficiency < kwargs["efficiency_thres"])]
    return position


# MTM策略
def strategy_MT(prices, drawplot=False, **kwargs):
    """

    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    drawplot : TYPE, optional
        DESCRIPTION. The default is False. control whether to draw holdings plot
    **kwargs : dict
        smooth: HMA window for smoothing price at first
        diff: diff window of price
        MA: MA window of MT indicator

    Returns
    -------
    position : TYPE
        DESCRIPTION.

    """
    prices = HMA(prices, kwargs['smooth'])
    MT = prices.diff(kwargs['diff'])
    MTM = HMA(MT, kwargs['MA'])
    together = pd.concat([prices, MT, MTM], axis=1)
    together.columns = ['rates', 'MT', 'MTM']
    position = pd.Series(index = together.index, dtype='float64')
    position[(together['MT'] > 0) & (together['MT'] > together['MTM'])] = 1
    position[(together['MT'] < 0) & (together['MT'] < together['MTM'])] = -1
    position.fillna(0, inplace=True)
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax0.set_title('MT strategy')
        prices.plot(ax=ax0)
        trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
        ax0.fill_between(prices.index, 0, 1, where=position > 0, color='green', alpha=0.5,
                         transform=trans)
        ax0.fill_between(prices.index, 0, 1, where=position < 0, color='red', alpha=0.5,
                         transform=trans)
        plt.legend()
        plt.savefig("./img/bond portfolio MT.png")
    return position


# 布林带突破策略
def strategy_BOLL_breakout(prices, drawplot=False, **kwargs):
    """

    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    drawplot : TYPE, optional
        DESCRIPTION. The default is False. control whether to draw holdings plot
    **kwargs : dict
        smooth: HMA window for smoothing price at first
        std_window: volatility window of price
        width: band width of boll bands

    Returns
    -------
    position : TYPE
        DESCRIPTION.

    """
    MA_prices = HMA(prices, kwargs['smooth'])
    BAND = prices.rolling(kwargs['std_window'], 5).std()
    Upper_band = (MA_prices + kwargs['width']*BAND).values
    Upper_diff = prices.values - Upper_band
    Lower_band = (MA_prices - kwargs['width']*BAND).values
    Lower_diff = prices.values - Lower_band
    positions = np.zeros(len(prices))
    position = 0
    for i in range(2, len(prices)):
        if position == 0:
            if (Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0):
                position = 1
            elif (Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0):
                position = -1
        else:
            if (position == 1) and (Upper_diff[i - 2] > 0) and (Upper_diff[i] < 0) and (Upper_diff[i - 1] < 0):
                position = 0
            if (position == -1) and (Lower_diff[i - 2] < 0) and (Lower_diff[i] > 0) and (Lower_diff[i - 1] > 0):
                position = 0
        positions[i] = position
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax0.set_title('boll strategy')
        pd.concat([prices, MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
        trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
        ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
                         transform=trans)
        ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
                         transform=trans)
        plt.legend()
        plt.savefig("./img/bond portfolio boll breakout.png")
    return pd.Series(positions, index=prices.index)


# 布林带反转策略
def strategy_BOLL_reverse(prices, drawplot=False, **kwargs):
    """

    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    drawplot : TYPE, optional
        DESCRIPTION. The default is False. control whether to draw holdings plot
    **kwargs : dict
        smooth: HMA window for smoothing price at first
        std_window: volatility window of price
        width: band width of boll bands

    Returns
    -------
    position : TYPE
        DESCRIPTION.

    """
    MA_prices = HMA(prices, kwargs['smooth']).values
    BAND = prices.rolling(kwargs['std_window'], 5).std().values
    Upper_band = MA_prices + kwargs['width']*BAND
    Upper_diff = prices.values - Upper_band
    Lower_band = MA_prices - kwargs['width']*BAND
    Lower_diff = prices.values - Lower_band
    positions = np.zeros(len(prices))
    position = 0
    for i in range(2, len(prices)):
        if position == 0:
            if (Lower_diff[i - 2] < 0) and (Lower_diff[i - 1] > 0) and (Lower_diff[i] > 0) and (MA_prices[i] > MA_prices[i - 1]):
                position = 1
            elif (Upper_diff[i - 2] > 0) and (Upper_diff[i - 1] < 0) and (Upper_diff[i] < 0) and (MA_prices[i] < MA_prices[i - 1]):
                position = -1
        else:
            if (position == 1) and (((Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0)) or ((Lower_diff[i - 2] > 0) and (Lower_diff[i - 1] < 0) and (Lower_diff[i] < 0))):
                position = 0
            if (position == -1) and (((Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0)) or ((Upper_diff[i - 2] < 0) and (Upper_diff[i - 1] > 0) and (Upper_diff[i] > 0))):
                position = 0
        positions[i] = position
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax0.set_title('boll strategy reverse')
        pd.concat([prices, pd.Series(MA_prices, index=prices.index), pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
        trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
        ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
                         transform=trans)
        ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
                         transform=trans)
        plt.legend()
        plt.savefig("./img/bond portfolio boll reverse.png")
    return pd.Series(positions, index=prices.index)


# 分位数突破策略
def strategy_quantile_breakout(prices, drawplot=False, **kwargs):
    """

    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    drawplot : TYPE, optional
        DESCRIPTION. The default is False. control whether to draw holdings plot
    **kwargs : dict
        MA: HMA window for smoothing price at first
        window: quantile window of price
        width: quantile width

    Returns
    -------
    position : TYPE
        DESCRIPTION.

    """
    MA_prices = HMA(prices, kwargs['MA'])
    Upper_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 + kwargs['width']).values
    Upper_diff = MA_prices.values - Upper_band
    Lower_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 - kwargs['width']).values
    Lower_diff = MA_prices.values - Lower_band
    positions = np.zeros(len(prices))
    position = 0
    for i in range(2, len(prices)):
        if position == 0:
            if (Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0):
                position = 1
            elif (Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0):
                position = -1
        else:
            if (position == 1) and (Upper_diff[i - 2] > 0) and (Upper_diff[i] < 0) and (Upper_diff[i - 1] < 0):
                position = 0
            if (position == -1) and (Lower_diff[i - 2] < 0) and (Lower_diff[i] > 0) and (Lower_diff[i - 1] > 0):
                position = 0
        positions[i] = position
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax0.set_title('quantile strategy')
        pd.concat([MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
        trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
        ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
                         transform=trans)
        ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
                         transform=trans)
        plt.legend()
    return pd.Series(positions, index=prices.index)


# 分位数反转策略
def strategy_quantile_reverse(prices, drawplot=False, **kwargs):
    """

    Parameters
    ----------
    prices : DataFrame
        Price and volume data of index.
    drawplot : TYPE, optional
        DESCRIPTION. The default is False. control whether to draw holdings plot
    **kwargs : dict
        MA: HMA window for smoothing price at first
        window: quantile window of price
        width: quantile width

    Returns
    -------
    position : TYPE
        DESCRIPTION.

    """
    MA_prices = HMA(prices, kwargs['MA'])
    Upper_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 + kwargs['width']).values
    Upper_diff = MA_prices.values - Upper_band
    Lower_band = MA_prices.rolling(kwargs['window'], 10).quantile(.5 - kwargs['width']).values
    Lower_diff = MA_prices.values - Lower_band
    positions = np.zeros(len(prices))
    position = 0
    for i in range(2, len(prices)):
        if position == 0:
            if (Lower_diff[i - 2] < 0) and (Lower_diff[i - 1] > 0) and (Lower_diff[i] > 0) and (MA_prices[i] > MA_prices[i - 1]):
                position = 1
            elif (Upper_diff[i - 2] > 0) and (Upper_diff[i - 1] < 0) and (Upper_diff[i] < 0) and (MA_prices[i] < MA_prices[i - 1]):
                position = -1
        else:
            if (position == 1) and (((Upper_diff[i - 2] < 0) and (Upper_diff[i] > 0) and (Upper_diff[i - 1] > 0)) or ((Lower_diff[i - 2] > 0) and (Lower_diff[i - 1] < 0) and (Lower_diff[i] < 0))):
                position = 0
            if (position == -1) and (((Lower_diff[i - 2] > 0) and (Lower_diff[i] < 0) and (Lower_diff[i - 1] < 0)) or ((Upper_diff[i - 2] < 0) and (Upper_diff[i - 1] > 0) and (Upper_diff[i] > 0))):
                position = 0
        positions[i] = position
    if drawplot:
        plt.figure(figsize=(20, 8))
        ax0 = host_subplot(111)
        ax0.set_title('quantile strategy')
        pd.concat([MA_prices, pd.Series(Upper_band, index=prices.index), pd.Series(Lower_band, index=prices.index)], axis=1).plot(ax=ax0)
        trans = mtransforms.blended_transform_factory(ax0.transData, ax0.transAxes)
        ax0.fill_between(prices.index, 0, 1, where=positions > 0, color='green', alpha=0.5,
                         transform=trans)
        ax0.fill_between(prices.index, 0, 1, where=positions < 0, color='red', alpha=0.5,
                         transform=trans)
        plt.legend()
    return pd.Series(positions, index=prices.index)


# 根据平陡配置子弹哑铃(上帝视角)
def strategy_maturity_steepness_god(begin, end):
    """
    Parameters
    ----------
    begin : datetime
        backtest begin date.
    end : datetime
        backtest end date.

    Returns
    -------
    overall_info : dataframe
        all middle information during the backtest.
    performance : series
        performance statistics.
    returns_by_year : series
        each years returns.

    """
    # 变平看-1,变陡看1
    agg_return_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
    interest_rate = fetch_macro_data('1-5-10yr',['S0059744', 'S0059747', 'S0059749'])
    steepness = (interest_rate['S0059749'] - interest_rate['S0059744']).loc[begin:end]
    agg_dict = {'1-3':'CBA00621.CS','3-5':'CBA00631.CS','5-7':'CBA00641.CS','7-10':'CBA00651.CS'}
    agg_returns = []
    agg_durations = []
    for interval in agg_dict:
        agg_data = pd.read_excel("./哑铃子弹/指标数据2021-05-11(%s).xls"%agg_dict[interval], index_col=0)
        agg_data.index = pd.to_datetime(agg_data.index)
        agg_data = agg_data.sort_index().loc[begin:end, :]
        agg_durations.append(agg_data['平均市值法久期(年)'])
        agg_returns.append(agg_data['涨跌幅(%)'] / 100)
    agg_returns = pd.concat(agg_returns, axis=1)
    agg_durations = pd.concat(agg_durations, axis=1)
    durations_agg = agg_durations.mean(axis=1)
    agg_returns_mean = agg_returns.mean(axis=1)
    agg_returns.columns = agg_dict.keys()
    agg_durations.columns = agg_dict.keys()
    detail_date = pd.read_excel('利率走势.xlsx', sheet_name='Sheet2')
    date_list = detail_date['日期'].tolist()
    returns_list = []
    bool_list = []
    for i in range(len(date_list) - 1):
        signal = steepness.loc[date_list[i]+Day():date_list[i+1]].iloc[-1] - steepness.loc[date_list[i]+Day():date_list[i+1]].iloc[0]
        if abs(signal) > .1:
            signal = np.sign(signal)
        else:
            signal = 0
        if signal == 0:
            returns_portfolio = agg_returns_mean.loc[date_list[i]+Day():date_list[i+1]]
            bool_list.extend([0] * len(returns_portfolio))
        elif signal == -1:
            agg_duration = durations_agg.loc[date_list[i]+Day():date_list[i+1]]
            returns_short = agg_returns.loc[date_list[i]+Day():date_list[i+1], '1-3']
            duration_short = agg_durations.loc[date_list[i]+Day():date_list[i+1], '1-3']
            returns_long = agg_returns.loc[date_list[i]+Day():date_list[i+1], '7-10']
            duration_long = agg_durations.loc[date_list[i]+Day():date_list[i+1], '7-10']
            proportion_long = (agg_duration - duration_short) / (duration_long - duration_short)
            returns_portfolio = proportion_long * returns_long + (1 - proportion_long) * returns_short
            bool_list.extend([-1] * len(returns_portfolio))
        elif signal == 1:
            agg_duration = durations_agg.loc[date_list[i]+Day():date_list[i+1]]
            returns_short = agg_returns.loc[date_list[i]+Day():date_list[i+1], '3-5']
            duration_short = agg_durations.loc[date_list[i]+Day():date_list[i+1], '3-5']
            returns_long = agg_returns.loc[date_list[i]+Day():date_list[i+1], '5-7']
            duration_long = agg_durations.loc[date_list[i]+Day():date_list[i+1], '5-7']
            proportion_long = (agg_duration - duration_short) / (duration_long - duration_short)
            returns_portfolio = proportion_long * returns_long + (1 - proportion_long) * returns_short
            bool_list.extend([1] * len(returns_portfolio))
        else:
            print('wrong signal')
            return None
        returns_list.append(returns_portfolio)
    returns_series = pd.concat(returns_list)
    signal = bool_list[:-1]
    signal.append(0)
    signal = np.array(signal)
    excess_returns = returns_series - agg_returns_mean
    net_temp = 1
    profit_list = []
    for i in range(1, len(excess_returns) - 1):
        # 如果今天的信号与昨天不同,开仓
        net_temp *= excess_returns.iloc[i] + 1
        # 如果今天非空仓,并且今天给出的信号方向不同,那么考虑平仓
        if (signal[i - 1] != 0) and (signal[i - 1] != signal[i]):
            # 把平今仓所得的收益计入列表
            profit_list.append(net_temp - 1)
            net_temp = 1
    profit_list = np.array(profit_list)
    if len(profit_list) == 0:
        win_rate = 0
        odds = 0
        turnover = 0
    else:
        win_rate = len(profit_list[profit_list > 0]) / len(profit_list) * 100
        if profit_list[profit_list < 0].size > 0:
            odds = -1 * np.mean(profit_list[profit_list > 0]) / np.mean(profit_list[profit_list < 0])
        else:
            odds = np.nan
        turnover = len(profit_list) / len(excess_returns) * 100

    sharpe = ep.sharpe_ratio(excess_returns)
    annual_returns = ep.annual_return(excess_returns) * 100

    plt.figure(figsize=(20, 8))
    ax0 = host_subplot(111)
    ax0.set_title('dumbell&bullet 1-10yr')
    ax1 = ax0.twinx()
    enhanced_net = (returns_series + 1).cumprod()
    agg_net = (agg_returns_mean + 1).cumprod()
    overall_net = pd.concat([enhanced_net, agg_net, steepness.loc[begin:end]], axis=1)
    overall_net.columns = ['Enhanced Portfolio Net Value', 'Aggregate Fortune Index Net Value', 'steepness']
    overall_net.fillna(method='ffill', inplace=True)
    overall_net.plot(ax=ax0)
    excess_net = (excess_returns + 1).cumprod()
    print('excess_net is {:.4f}'.format(excess_net.iloc[-1]))
    returns_by_year = pd.concat([pd.Series(1), excess_net.groupby(excess_net.index.strftime('%Y')).tail(1)]).pct_change().dropna()
    excess_net.plot(ax=ax1, label='Excess Return')
    drawdown = excess_net - excess_net.rolling(len(excess_net), min_periods=1).max()
    max_drawdown = -1 * drawdown.min() * 100
    end_drawdown = drawdown.index[np.argmin(drawdown)]
    begin_drawdown = drawdown.index[np.argmax(excess_net[:end_drawdown])]
    calmar = ep.calmar_ratio(excess_returns)

    proportion_long = (durations_agg - agg_durations['3-5']) / (agg_durations['5-7'] - agg_durations['3-5'])
    bullet_returns = agg_returns['5-7'] * proportion_long + agg_returns['3-5'] * (1 - proportion_long)
    (bullet_returns - agg_returns_mean + 1).cumprod().plot(ax=ax1, label='Bullet Excess Return')
    y_min = (overall_net.iloc[:, 0] - overall_net.iloc[:, 1]).min()
    y_max = (overall_net.iloc[:, 0] - overall_net.iloc[:, 1]).max()
    ax1.set_ylim([excess_net.min() - .01, excess_net.max()+.01])
    trans = mtransforms.blended_transform_factory(ax1.transData, ax1.transAxes)
    ax1.fill_between(returns_series.index, 0, 1, where=signal > 0, color='green', alpha=0.2,
                     transform=trans)
    ax1.fill_between(returns_series.index, 0, 1, where=signal < 0, color='red', alpha=0.2,
                     transform=trans)
    plt.legend()
    plt.savefig('./img/dumbell&bullet 1-10 diff -2020 steepness god')
    overall_info = pd.concat([pd.Series(signal, index=returns_series.index, name='signal'),
                              agg_returns,  10000*(excess_net - 1).rename('excess net value'), overall_net,
                              durations_agg.rename('Aggregate duration')], axis=1)
    overall_info.to_excel("./data/overall info dumbell&bullet steepness god 1-10 diff.xlsx")
    performance = pd.Series([annual_returns, max_drawdown, sharpe, calmar, begin_drawdown,
               end_drawdown, win_rate, odds, turnover],
              index=['returns', 'drawdown', 'sharpe', 'calmar', 'begin_drawdown',
                     'end_drawdown', 'win_rate', 'odds', 'turnover'])
    # pnl_profile.to_excel("./data/pnl profile dumbell&bullet %s.xlsx" % maturity)
    return overall_info, performance, returns_by_year
if __name__ == '__main__':
    wind_help = WindHelper()
    # signal_weight(.5)
    # interest_rate = fetch_macro_data('1-5-10yr', ['S0059744', 'S0059747', 'S0059749'])
    # yield_diff = (interest_rate['S0059749'] - interest_rate['S0059744'])
    # curvature = 2 * interest_rate['S0059747'] - (interest_rate['S0059749'] + interest_rate['S0059744'])
    info, performance, returns_year = strategy_maturity_final(datetime(2008, 1, 1), datetime(2021, 5, 8))

上一篇:一文帮你弄懂散列是什么


下一篇:Python pandas.DataFrame.agg函数方法的使用