2021-04-28

【Python】下载Tushare平台股票数据至MongoDB和简单条件查询,持续更新中...

 

本人是大二学生,对股票感兴趣想做一个类似同花顺的i问财产品。目前正在慢慢学习python中,给出的思路肯定有很多不足之处。如果对你有帮助的话,那最好了~

这个产品预计要做半年以上,我后面持续更新...

 

一、首先给出产品规划

2021-04-28

数据输入部分财务数据主要通过Tushare平台的API下载获取,十分好用的一个平台推荐给大家~

Tushare大数据社区 (waditu.com)

我主要是下载A股的财务数据,大家可以按照自己的需求去下载对应数据。

2021-04-28

  1. Tushare财务数据下载,思路:A股4000多家公司的多张财务数据表,需要分多次下载。这里我采取协程的方式,每个财务表下载开启一个微线程。
    1. 检查数据是否已下载,若未下载则提交到队列中——生产者;
    2. 从队列中获取数据,下载Tushare数据并写入MongoDB——消费者。
# 更新所有的财务数据
    def update_fina(self, period=''):
        # 财务数据接口列表
        api_fina_list = [
            'income',
            'balancesheet',
            'cashflow',
            'forecast',
            'express',
            'dividend',
            'fina_indicator',
            'fina_audit',
            'fina_mainbz',
        ]

        # 创建队列
        q = JoinableQueue(10)

        g_ls = []

        # 生产者——微线程,多任务检查更新各集合数据
        for i in api_fina_list:
            g = gevent.spawn(common.update_coll, **{'coll': i, 'period': period, 'q': q, 'db': self.db})
            g_ls.append(g)

        # 消费者——微线程,下载数据写入集合
        g_down = gevent.spawn(common.down(self.pro, q, self.db))
        g_ls.append(g_down)

        gevent.joinall(g_ls)

        return 200
# 更新指定集合
def update_coll(coll='', period='', q=object, db=object):
    # 获取所有股票的ts_code
    coll_stock_basic = db['stock_basic']
    stock_list = coll_stock_basic.find({}, {'_id': 0, 'ts_code': 1}).batch_size(5)

    # 要更新的集合
    coll_update = db[coll]

    for j in stock_list:
        # 数据已存在的话则无需下载
        res = coll_update.find({'ts_code': j['ts_code'], 'end_date': period})
        if res.count() == 0:
            # 提交到队列去下载数据并写入mongodb
            q.put({'api_name': coll, 'period': period, 'ts_code': j['ts_code'], })
            print(f"微线程{coll}把{j['ts_code']}放入队列\n")

    print(f'微线程{coll}已生成完')
    q.join()
# 下载指定接口的数据并写入指定集合中
def down(pro, q, db):
    while 1:
        dic = q.get()
        api_name = dic['api_name']
        period = dic['period']
        ts_code = dic['ts_code']

        try:
            # 3次重试机制
            num = 1
            while num:
                data = pro.query(api_name, period=period, ts_code=ts_code)
                # pro的接口每分钟最多请求50次
                time.sleep(1.3)
                data_json = json.loads(data.T.to_json())
                if data_json:
                    coll = db[api_name]
                    coll.insert_one(data_json['0'])
                    print(f"\033[0;33;32m\t{ts_code}写入{api_name}成功\n\033[0m")
                    break
                else:
                    num -= 1
            else:
                print(f"\033[0;33;33m\t{ts_code}的{api_name}接口无数据\n\033[0m")
        except Exception:
            pass

        q.task_done()

这里有个Tips:Tushare下载数据连续请求接口,达到一定时间后就会提示timeout。所以我加了一个try ,如果出现报错的话直接pass。不影响继续下载~

看一下运行效果:

2021-04-28

2021-04-28

未完待续……

下一步给公司添加自定义指标(eg:细分行业、概念),这样就完成了数据输入的部分~

上一篇:SIMMR模型中geese_data的解读


下一篇:python定时任务模块-crontab