手把手教你如何从Tushare库下载股票数据,并保存在硬盘当中。第三篇多线程
前言
我之前发过一片关于通过爬虫技术从网易金融上获取股票数据的文章,大家反响还是挺强烈的。但是吧,那个作品只能说是我早期学习爬虫和量化的一个缩影。我现在要和大家分享的是非常Dry的干货。基本上可以说是我个人的最终版的股票数据下载文本,甚至可以说是Tushare下载数据的一个基石。我将分为三个部分来讲述:
一、Tushare是什么?
在国内做量化基本上都或多或少知道Tushare这个库的,大佬们基本上都是用Tushare来分享自己的作品。作为一个相对价格友好的金融数据库是非常良心的,价格相比于Wind,力度还是非常大的。
我们来看看Tushare在pypi上给自己的功能介绍是什么:
- easy to use as most of the data returned are pandas DataFrame objects
- can be easily saved as csv, excel or json files
- can be inserted into MySQL or Mongodb
Tushare的目标人群:
- financial market analyst of China
- learners of financial data analysis with pandas/NumPy
- people who are interested in China financial data
行了咱们也扯得差不多了,开始整正事儿吧。
二、代码
首先我得声明一下:
- 我今天分享的代码不能用Jupyter来运行,但是可以用Jupyter来调试Debug,希望大家理解。
- 用了代码记得帮兄弟我宣传推广一下哈,嘿嘿。
- Tushare是需要花钱的,我用的是500元年费的,据说200元年费就可以有很丰富的功能了。说句实话如果你真的很想往量化方面发展研究的话,我觉得入手200元的还是个挺不错的选择。具体200元能干什么请自己去浏览哈。200元档位是2000积分,我大致观察了一下,基本上覆盖所有功能了。
1.引入库
代码如下(示例):
import os
import time
from datetime import datetime as dt
from datetime import timedelta
import threading
import pandas as pd
import numpy as np
import requests
import tushare as ts
token = '这个地方放你的Tushare秘钥'
ts.set_token(token)
pro = ts.pro_api(token)
2.多线程函数比例切割
前面两讲我们实现了单线程的对于股票数据的下载。我们下面就来实现多线程的讲解。首先我们要知道我们的天花板在哪里:对方的服务器的限制,和自己的权限限制。我一共给自己开了5条线程,因为我试过6~8条,都会被弹回来,因为我一分钟request得太频繁了。5000分的会员也不是为所欲为的哦,哈哈。
首先呢我们需要创建一个线程List:
def DuoXianCheng():
threads = []
我刚刚说过我要开启5条线程,而且呢,我要把我的股票候选名单按照平均分配20%的比例来切割。所以我们需要一个numpy.array:
a = np.arange(0, 1, 0.2)
b = a + 0.2
mtx = np.vstack([a,b]).T # Transpose的原因是我们要把2x5的换成5x2来取值更合理
或者我们也可以手动来创建一个numpy.array:
mtx = np.array([[0., 0.2],
[0.2, 0.4],
[0.4, 0.6],
[0.6, 0.8],
[0.8, 1.]])
这俩没有任何区别选择那种都行。
3.多线程的loop逻辑
我们有了切割比例,那就设计一个for loop来进行赋值:
row, col = np.shape(mtx) # 获取行数值和列数值,用np.shape函数
print('\r' + "线程进入系统,开始运行")
# 将所有的候选人分成五个池子,看你的matrix有几行(row)
for r in range(row):
# 数据有两列,标识为[0]和[1]
for c in [mtx[r]]:
t = threading.Thread(target=worker,
args=(begin_date, # 日期自己赋值,我目前基本上都是从18年19年起。
last_trading_day(), # 这个函数请看第一讲的帖子。
round(total*c[0]), # 第一列取值
round(total*c[1]), # 第二列取值
r+1)) # 线程数从1开始,0开始看着别扭
t.start() # 开一条线程,启动一条线程
threads.append(t) # 将每一条线程开启后加入到threads那个list里面
最后呢,我们要根据线程的特点,把这五条打包好的线程同时并发起来。
for xc in threads:
xc.join()
4.我们把函数都拼装起来就完成了
记得看我最开始的声明,你可以用Jupyter去一行一行Debug我的代码。但这个代码必须用IDE运行,因为Jupyter不能跑多进程多线程。所以不要用Jupyter跑这个py代码脚本。
def last_trading_day():
data = pro.query('trade_cal',
start_date='20200101',
end_date=as_of_today,
is_open='1')
trading_dates = data['cal_date']
d0 = dt.now()
trading_dates_list = trading_dates.tolist()
if as_of_today in trading_dates.values:
if d0.hour >= 16:
today_index = trading_dates_list.index(as_of_today)
latest_trading_date = trading_dates_list[int(today_index)]
return latest_trading_date
else:
previous_trading_date = trading_dates.values[-2]
return previous_trading_date
else:
previous_trading_date = trading_dates.values[-1]
return previous_trading_date
def get_code_list():
data = pro.daily_basic(trade_date=last_trading_day())
x1 = data.close < 200 # 收盘价小于200元
x2 = data.pe < 100 # 市盈率低于100倍
x3 = data.pb < 10 # 市净率低于10倍
x4 = data.turnover_rate > 1 # 换手率大于1
x = x1 & x2 & x3 & x4
stock_list_1 = data[x].ts_code.values.tolist()
data2 = pro.query('stock_basic')
AS_of_Today = int(dt.now().strftime('%Y%m%d'))
data2 = data2[data2['list_date'].apply(int).values < (AS_of_Today-360)]
data2 = data2[-data2.name.str.startswith('*')]
data2 = data2[-data2.industry.isin(['银行','保险','房地产','区域地产',])
stock_list_2 = data2.ts_code.values.tolist()\
stock_list = [value for value in stock_list_1 if value in stock_list_2] + stock_list_3 + stock_list_4
stock_list = list(dict.fromkeys(stock_list))
def worker(begin, today, begin_range, end_range, process, ):
for stock_codes in Tickers[begin_range : end_range]
try:
df = ts.pro_bar(ts_code='{}'.format(stock_codes), adj='qfq', freq='D',
start_date=begin, end_date=today)
save_csv(stock_codes, df)
except requests.exceptions.ProxyError:
continue
def save_csv(ticker, csv_file):
paths = {
"stdev": root_path + '\\stdev\\',
"tester": root_path + '\\tester\\',
"csv_tester": '.\\csv_tester\\'
}
for name, path in paths.items():
csv_file.to_csv(path + "%s.csv" % ticker,
encoding="GBK", index=False)
csv_file.to_csv(root_path + '\\tester2\\' + "%s.csv" % os.path.splitext(ticker)[0],
encoding="GBK", index=False)
def DuoXianCheng():
threads = []
# 建立一个比例分配的矩阵,5条线程 x 高低比例
a = np.arange(0, 1, 0.2)
b = a + 0.2
mtx = np.vstack([a,b]).T # Transpose的原因是我们要把2x5的换成5x2来取值更合理
# 你也可以手动来建立这个matrix. 如下:
mtx = np.array([[0., 0.2],
[0.2, 0.4],
[0.4, 0.6],
[0.6, 0.8],
[0.8, 1.]])
row, col = np.shape(mtx) # 获取行数值和列数值,用np.shape函数
print('\r' + "线程进入系统,开始运行")
# 将所有的候选人分成五个池子,看你的matrix有几行(row)
for r in range(row):
# 数据有两列,标识为[0]和[1]
for c in [mtx[r]]:
t = threading.Thread(target=worker,
args=(begin_date,
last_trading_day(),
round(total*c[0]),
round(total*c[1]),
r+1)) # 线程数从1开始,0开始看着别扭
t.start()
threads.append(t)
for xc in threads:
xc.join()
if __name__ == "__main__":
start = time.time()
root_path = "D:\\BaiduNetdiskWorkspace\\database\\__stock__" # 这个看你想存在硬盘的什么位置,自己设置。
begin_date = str(20180101)
as_of_today = str(dt.now().strftime('%Y%m%d'))
AS_of_Today = int(dt.now().strftime('%Y%m%d'))
tickers = get_code_list()
total = len(get_code_list())
DuoXianCheng() # 或者给它起名叫main()也行,毕竟它要执行worker()这个函数。
print("=☆=★=运行时间为:%.2f 分钟=☆=★=" % (int(time.time() - start)/60))
总结
通过这三章的讲解我已经将我的这个“多线程下载股票信息并存储.py”文件分享给大家了。如果您觉得有用,请帮我点个赞收藏吧!您的鼓励就是我的动力!!有什么不懂的或者不清楚不对的地方请留言或者加私聊魏鑫沟通。Last, but not least, 推一下我Python及量化导师的博客:CuteHand