量化研究----网格模型研究,提供源代码
最近打算在策略模型里面加入网格,做分时的止盈止损,网格合适区间波动的阶段,单边下跌,上涨都不合适,但是可以考虑分时网格,我在实盘系统中加入这个模块
"条件单分时网格":{
"函数名称":"conditional_single_time_sharing_grid(name='条件单分时网格',x1=1,x2=-40)",
"是否开启":"是",
"资金模型":"金额",
"卖出值":2000,
"买入值":2000,
"持有值":2000,
"数据类型":"高频行情"
},
底层的源代码
def conditional_single_time_sharing_grid(self,name='条件单分时网格',x1=0.3,x2=-0.3):
'''
条件单分时网格
stock_type=自定义/持股
'''
stock=str(self.stock)
df=self.df
if False:
pass
else:
df['价格']=pd.to_numeric(df['价格'])
close_list=df['价格'].tolist()
base_price=close_list[0]
price=close_list[-1]
if self.log.shape[0]>0:
log=self.log[self.log['模块名称']==name]
log=log[log['证券代码']==stock]
if log.shape[0]>0:
pre_price=log['触发的价格'].tolist()[-1]
zdf=((price-pre_price)/pre_price)*100
else:
zdf=((price-base_price)/base_price)*100
else:
zdf=((price-base_price)/base_price)*100
if zdf>=x1:
print('{} 模块{} 卖出{} 目前涨跌幅{} 大于目前标涨跌幅{} '.format(self.now_date,name,stock,zdf,x1))
return 'sell'
elif zdf<=x2:
print('{} 模块{} 买入{} 目前涨跌幅{} 小于目前标涨跌幅{} '.format(self.now_date,name,stock,zdf,x1))
return 'buy'
else:
print('{} 模块{} 不符合交易{} 目前涨跌幅{} 目前标涨跌幅{} '.format(self.now_date,name,stock,zdf,x1))
return ''
整理外面重要研究一下日线网格,利用算法自动生成网格大小
网格大小计算的源代码
df['网格']=(((df['close'].rolling(self.n).max()-df['close'].rolling(self.n).min())/df['close'].rolling(self.n).min())*2)/self.n
网格大小计算原理,最近10个交易日,最大,最小值为上下边界,
grid=(max_value-min_value)/n
max_value 10日最大值
min_value 10日最小值
n 交易日
同时也做了一定的网格增强
外面随便输入一个标的比如513100 纳斯达克看看网格回测的效果,我自己研发的回测框架
回测的结果
外面可以发现问题,在上升强势中没有足够的持股来止盈
也发现在下降的过程中不断买入,
但是在区间效果就非常不错
外面换一个标的比如创业板etf 159810
外面可以改进模型,利用趋势加网格,这个模块我在完善
可以看策略的报告
网格回测的全部源代码
from xgtrader_backtrader.backtrader import backtrader
from xgtrader_backtrader.user_def_data.user_def_data import user_def_data
from xgtrader_backtrader.trader_tool import tdx_indicator
import pandas as pd
from xgtrader_backtrader.trader_tool import jsl_data
from xgtrader_backtrader.trader_tool.dfcf_etf_data import dfcf_etf_data
from xgtrader_backtrader.trader_tool.ths_rq import ths_rq
from tqdm import tqdm
from xgtrader_backtrader.trader_tool.tdx_trader_function import tdx_trader_function
import pandas as pd
class my_backtrader:
'''
网格交易模块
'''
def __init__(self,start_date='20230101',end_date='20500101',data_type='D',
starting_cash=150000,cash=150000,commission=0.001,index_stock='000300'):
self.start_date=start_date
self.end_date=end_date
self.data_type=data_type
self.starting_cash=starting_cash
self.commission=commission
self.index_stock=index_stock
#这里输入代码就可以
#导入自定义股票池
self.stock_list=['159810']
self.trader=backtrader(start_date=self.start_date,end_date=self.end_date,
data_type=self.data_type,starting_cash=self.starting_cash,
commission=self.commission,cash=cash,index_stock=self.index_stock)
self.data=user_def_data(start_date=self.start_date,end_date=self.end_date,data_type=self.data_type)
self.log={}
#记录模块
#log={'证券代码':{'证券代码':[''],"交易类型":[''],'触发价格':[''],"交易数量":[],'持有数量':[']}
#网格参数指定计算网格参数
self.n=10
#网页加强参数
self.add_n=2
def add_all_data(self):
'''
多线程加载数据
'''
self.data.get_thread_add_data(stock_list=self.stock_list)
self.hist=self.data.hist
return self.hist
#单线程加载数据,避免数据不成功
def add_all_data_1(self):
'''
单线程加载数据,避免数据不成功
'''
for i in tqdm(range(len(self.stock_list))):
stock=self.stock_list[i]
stock=str(stock)
self.data.get_add_data(stock=stock)
self.hist=self.data.hist
return self.hist
def get_cacal_all_indicator(self):
data=pd.DataFrame()
hist=self.add_all_data_1()
for stock in self.stock_list:
df=hist[hist['stock']==stock]
df['网格']=(((df['close'].rolling(self.n).max()-df['close'].rolling(self.n).min())/df['close'].rolling(self.n).min())*2)/self.n
df['网格']=df['网格']*100*self.add_n
df=df[df['date']<=self.end_date]
df=df[df['date']>=self.start_date]
df['base_price']=df['close'].tolist()[0]
df=df[self.n:]
data=pd.concat([data,df],ignore_index=True)
return data
def run_backtrader(self):
'''
运行回测
'''
trader_list=self.trader.get_trader_date_list()
trader_info=self.get_cacal_all_indicator()
print(trader_info)
for date in trader_list:
df=trader_info[trader_info['date']==date]
stock_list=df['stock'].tolist()
for stock in stock_list:
df1=df[df['stock']==stock]
close_list=df1['close'].tolist()
price=close_list[-1]
price=float(price)
base_price=df1['base_price'].tolist()[-1]
stock_list=list(self.log.keys())
if stock not in stock_list:
zdf=((price -base_price)/base_price)*100
else:
base_price=self.log[stock]['触发价格'][-1]
zdf=((price -base_price)/base_price)*100
grid=df1['网格'].tolist()[-1]
if abs(zdf)>grid and zdf<0:
self.trader.buy(date=date,stock=stock,price=price,amount=100)
print('买入{} 时间{} 目标涨跌幅{}下跌大于单元格{}'.format(date,stock,zdf,grid ))
if len(self.log.keys())>0:
log=self.log[stock]
if len(log.keys())>0:
log1={}
code_list=log['证券代码']
code_list.append(stock)
log1['证券代码']=code_list
date_list=log['时间']
date_list.append(date)
log1['时间']=date_list
trader_type=log['交易类型']
trader_type.append('buy')
log1['交易类型']=trader_type
price_list=log['触发价格']
price_list.append(price)
log1['触发价格']=price_list
amount_list=log['交易数量']
amount_list.append(100)
log1['交易数量']=amount_list
log=log1
else:
log={}
log['证券代码']=[stock]
log['时间']=[date]
log['交易类型']=['buy']
log['触发价格']=[price]
log['交易数量']=[100]
else:
log={}
log['证券代码']=[stock]
log['时间']=[date]
log['交易类型']=['buy']
log['触发价格']=[price]
log['交易数量']=[100]
self.log[stock]=log
self.log[stock]=log
elif abs(zdf)>grid and zdf>0:
self.trader.sell(date=date,stock=stock,price=price,amount=100)
print('卖出{} 时间{} 目标涨跌幅{}上涨大于单元格{}'.format(date,stock,zdf,grid ))
#log={'证券代码':{'证券代码':[''],"时间":[''],"交易类型":[''],'触发价格':[''],"交易数量":[],'持有数量':[']}
if len(self.log.keys())>0:
log=self.log[stock]
if len(log.keys())>0:
log1={}
code_list=log['证券代码']
code_list.append(stock)
log1['证券代码']=code_list
date_list=log['时间']
date_list.append(date)
log1['时间']=date_list
trader_type=log['交易类型']
trader_type.append('sell')
log1['交易类型']=trader_type
price_list=log['触发价格']
price_list.append(price)
log1['触发价格']=price_list
amount_list=log['交易数量']
amount_list.append(100)
log1['交易数量']=amount_list
log=log1
else:
log={}
log['证券代码']=[stock]
log['时间']=[date]
log['交易类型']=['sell']
log['触发价格']=[price]
log['交易数量']=[100]
else:
log={}
log['证券代码']=[stock]
log['时间']=[date]
log['交易类型']=['sell']
log['触发价格']=[price]
log['交易数量']=[100]
self.log[stock]=log
else:
self.trader.settle(date=date,stock=stock,price=price)
print('持有{} 时间{} 目标涨跌幅{} 单元格{}'.format(date,stock,zdf,grid ))
if __name__=='__main__':
trader=my_backtrader(data_type='D')
trader.run_backtrader()
#获取全部的交易报告
trader.trader.get_poition_all_trader_report_html()
#获取策略报告
trader.trader.get_portfolio_trader_report_html()
#显示个股的交易图
trader.trader.get_plot_all_trader_data_figure(limit=1000)
#显示策略数据
df=trader.trader.get_portfolio_trader_data_figure(limit=100000)54