爬虫笔记20——票星球抢票脚本的实现

以下内容仅供交流学习使用!!!

思路分析

前面的爬虫笔记一步一步走过来我们的技术水平也有了较大的提升了,现在我们来进行一下票星球抢票实战项目,实现票星球的自动抢票。

  1. 我们打开票星球的移动端页面,分析每一步操作需要用到的接口。
  2. 先手动操作抢票找到抢票流程的每一个接口后,判断接口请求要求,请求方式,以及是否需要传递什么参数。
  3. 接下来就是每个接口一步一步操作请求响应,测试成功返回200状态码即可。
  4. 实现过程我主要是创建一个类对象,初始化方法配置需要用到的属性,然后把每一步操作的请求封装成每一个函数。
  5. 最后定义一个主函数运行实现逻辑。
  6. 当然,下面是代码的大体内容,主要讲解如何操作,需要全部代码可以找一下我发布的资源下载(目前文件还在审核中,通过后即可直接下载)。

示例代码

首先我们要用到下面三个包,pip下载即可

#pip install requests fake_useragent datetime
#请求
import requests
#随机UA
from fake_useragent import UserAgent
#时间
import datetime

然后我们创建一个类对象,初始化方法里面创建需要用到的属性变量,token属性需要自行登录找到自己的token复制过来即可:

class PXQ:
    def __init__(self):
        # 输入自己的token
        self.token = '这里要打开网页开发者工具找到你自己的token'
        # 演唱会项目id,必填,最简单就是在url上可以看到,下面这个是要操作抢购的演唱会的show_id
        # 或者也是打开网页开发者工具中的网络也是可以找到
        self.show_id = '665708481d06bc0001627d83'
        # 指定场次id,不指定则默认从第一场开始遍历,查找同理
        self.session_id = '665708656a025300012ae72a'
        # 购票数量,自行设置要购买的数量,但一定要看购票须知,不要超过上限
        self.buy_count = 1
        # 指定观演人,观演人序号从0开始,人数需与票数保持一致
        self.audience_idx = [0]
        # audience_idx = [0]
        # 门票类型,不确定则可以不填,让系统自行判断。快递送票:EXPRESS,电子票:E_TICKET/ID_CARD,现场取票:VENUE,电子票或现场取票:VENUE_E,目前只发现这四种,如有新发现可补充
        self.deliver_method = ''
        # 获取想要购买的票价id
        self.seat_plan_id = ''
        self.session_id_exclude = []  # 被排除掉的场次
        self.price: int = 0
        # 抢购的票价名称
        self.seatPlanName = ''
        self.threadsLists = []
        # 判断是否对应价位有票数,默认是无
        self.flag = False
        # 设置抢票开始时间,自行设置开抢时间,分别是年月日时分秒
        self.startTime = datetime.datetime(2024, 7, 3, 2, 0, 0)
        # 随机UA
        self.user_agent = UserAgent()

接下来就是操作流程每个请求接口设置,封装成方法,依次是:

  • 根据项目id获取所有场次和在售状态
    # 根据项目id获取所有场次和在售状态
    def get_sessions(self, useragent):
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json'
        }
        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/show/pub/v3/show/" + self.show_id + "/sessions_dynamic_data"
        response = requests.get(url=url, headers=headers).json()
        if response["statusCode"] == 200:
            # 获取场次的id
            # print(response)
            return response["data"]["sessionVOs"]
        else:
            print("get_sessions异常:" + str(response))
        return None

  • 根据场次id获取座位信息,箭头后面表示函数返回数据类型
    def get_seat_plans(self, useragent) -> list:
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json'
        }
        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/show/pub/v3/show/" + self.show_id + "/show_session/" + self.session_id + "/seat_plans_static_data"
        response = requests.get(url=url, headers=headers).json()
        if response["statusCode"] == 200:
            # 查看票价的id
            # print(response)
            return response["data"]["seatPlans"]
        else:
            raise Exception("get_seat_plans异常:" + str(response))
  • 获取座位余票
# 获取座位余票
    def get_seat_count(self, useragent) -> list:
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json'
        }
        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/show/pub/v3/show/" + self.show_id + "/show_session/" + self.session_id + "/seat_plans_dynamic_data"
        response = requests.get(url=url, headers=headers).json()
        if response["statusCode"] == 200:
            return response["data"]["seatPlans"]
        else:
            raise Exception("get_seat_count异常:" + str(response))
  • 获取门票类型(快递送票EXPRESS,电子票E_TICKET,现场取票VENUE,电子票或现场取票VENUE_E)
# 获取门票类型(快递送票EXPRESS,电子票E_TICKET,现场取票VENUE,电子票或现场取票VENUE_E)
    def get_deliver_method(self, useragent, qty: int) -> str:
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json',
            'access-token': self.token
        }
        data = {
            "items": [
                {
                    "skus": [
                        {
                            "seatPlanId": self.seat_plan_id,  
                            "sessionId": self.session_id,  
                            "showId": self.show_id,  
                            "skuId": self.seat_plan_id,
                            "skuType": "SINGLE",
                            "ticketPrice": self.price, 
                            "qty": qty  #数量
                        }
                    ],
                    "spu": {
                        "id": self.show_id,
                        "spuType": "SINGLE"
                    }
                }
            ]
        }
        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/trade/buyer/order/v3/pre_order"
        response = requests.post(url=url, headers=headers, json=data).json()
        if response["statusCode"] == 200:
            # 这里的print可以去掉
            # print(response["data"]["supportDeliveries"][0])
            return response["data"]["supportDeliveries"][0]["name"]
        else:
            raise Exception("获取门票类型异常:" + str(response))
  • 获取观演人信息
# 获取观演人信息
    # def get_audiences() -> list | None:
    def get_audiences(self, useragent):
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json',
            'access-token': self.token
        }
        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/user/buyer/v3/user_audiences"
        response = requests.get(url=url, headers=headers).json()
        if response["statusCode"] == 200:
            # 可以获取观影人的信息查看是否正确
            return response["data"]
        else:
            print("get_audiences异常:" + str(response))
        return None

  • 提交订单需要做的判断比较多,要判断演出票是什么票型(快递送票EXPRESS,电子票E_TICKET/ID_Card,现场取票VENUE,电子票或现场取票VENUE_E)
# 提交订单(快递送票EXPRESS,电子票E_TICKET/ID_Card,现场取票VENUE,电子票或现场取票VENUE_E)
    def create_order(self, useragent, qty: int, express_fee: int,
                     receiver, cellphone, address_id, detail_address, location_city_id, audience_ids: list):
        headers = {
            'User-Agent': useragent,
            'Content-Type': 'application/json',
            'access-token': self.token
        }
        if self.deliver_method == "EXPRESS":
            data = {
                "priceItemParam": [
                    {
                        "applyTickets": [],
                        "priceItemName": "票款总额",
                        "priceItemVal": self.price * qty,
                        "priceItemType": "TICKET_FEE",
                        "priceItemSpecies": "SEAT_PLAN",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(self.price * qty)
                    },
                    {
                        "applyTickets": [],
                        "priceItemName": "快递费",
                        "priceItemVal": express_fee,
                        "priceItemId": self.show_id,
                        "priceItemSpecies": "SEAT_PLAN",
                        "priceItemType": "EXPRESS_FEE",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(express_fee)
                    }
                ],
                "items": [
                    {
                        "skus": [
                            {
                                "seatPlanId": self.seat_plan_id,
                                "sessionId": self.session_id,
                                "showId": self.show_id,
                                "skuId": self.seat_plan_id,
                                "skuType": "SINGLE",
                                "ticketPrice": self.price,
                                "qty": qty,
                                "deliverMethod": self.deliver_method
                            }
                        ],
                        "spu": {
                            "id": self.show_id,
                            "spuType": "SINGLE"
                        }
                    }
                ],
                "contactParam": {
                    "receiver": receiver,  # 张三
                    "cellphone": cellphone  # 13812345678
                },

                "one2oneAudiences": [{"audienceId": i, "sessionId": self.session_id} for i in audience_ids],
                "addressParam": {
                    "address": detail_address,  # 星巴克咖啡门口
                    "district": location_city_id[4:],
                    "city": location_city_id[2:4],
                    "province": location_city_id[0:2],
                    "addressId": address_id
                }
            }
        elif self.deliver_method == "ID_CARD":
            data = {
                "priceItemParam": [
                    {
                        "applyTickets": [],
                        "priceItemName": "票款总额",
                        "priceItemVal": self.price * qty,
                        "priceItemType": "TICKET_FEE",
                        "priceItemSpecies": "SEAT_PLAN",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(self.price * qty),
                    }
                ],
                "items": [
                    {
                        "skus": [
                            {
                                "seatPlanId": self.seat_plan_id,
                                "sessionId": self.session_id,
                                "showId": self.show_id,
                                "skuId": self.seat_plan_id,
                                "skuType": "SINGLE",
                                "ticketPrice": self.price,
                                "qty": qty,
                                "deliverMethod": self.deliver_method,
                            }
                        ],
                        "spu": {"id": self.show_id, "spuType": "SINGLE"},
                    }
                ],
                "one2oneAudiences": [
                    {"audienceId": i, "sessionId": self.session_id} for i in audience_ids
                ],
                "many2OneAudience": {
                    "audienceId": audience_ids[0],
                    "sessionIds": [self.session_id],
                },
            }
        # 电子票已解决
        elif self.deliver_method == "E_TICKET":
            data = {
                "priceItemParam": [
                    {
                        "applyTickets": [],
                        "priceItemName": "票款总额",
                        "priceItemVal": self.price * qty,
                        "priceItemType": "TICKET_FEE",
                        "priceItemSpecies": "SEAT_PLAN",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(self.price * qty)
                    }
                ],
                "items": [
                    {
                        "skus": [
                            {
                                "seatPlanId": self.seat_plan_id,
                                "sessionId": self.session_id,
                                "showId": self.show_id,
                                "skuId": self.seat_plan_id,
                                "skuType": "SINGLE",
                                "ticketPrice": self.price,
                                "qty": qty,
                                "deliverMethod": self.deliver_method
                            }
                        ],
                        "spu": {
                            "id": self.show_id,
                            "spuType": "SINGLE"
                        }
                    }
                ],
                "many2OneAudience": {
                    "audienceId": audience_ids[0],
                    "sessionIds": [
                        self.session_id
                    ]
                }
            }
        elif self.deliver_method == "VENUE":
            data = {
                "priceItemParam": [
                    {
                        "applyTickets": [],
                        "priceItemName": "票款总额",
                        "priceItemVal": self.price * qty,
                        "priceItemType": "TICKET_FEE",
                        "priceItemSpecies": "SEAT_PLAN",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(self.price * qty)
                    }
                ],
                "items": [
                    {
                        "skus": [
                            {
                                "seatPlanId": self.seat_plan_id,
                                "sessionId": self.session_id,
                                "showId": self.show_id,
                                "skuId": self.seat_plan_id,
                                "skuType": "SINGLE",
                                "ticketPrice": self.price,
                                "qty": qty,
                                "deliverMethod": self.deliver_method
                            }
                        ],
                        "spu": {
                            "id": self.show_id,
                            "spuType": "SINGLE"
                        }
                    }
                ],
                "one2oneAudiences": [{"audienceId": i, "sessionId": self.session_id} for i in audience_ids]
            }
        elif self.deliver_method == "VENUE_E":
            data = {
                "priceItemParam": [
                    {
                        "applyTickets": [],
                        "priceItemName": "票款总额",
                        "priceItemVal": self.price * qty,
                        "priceItemType": "TICKET_FEE",
                        "priceItemSpecies": "SEAT_PLAN",
                        "direction": "INCREASE",
                        "priceDisplay": "¥" + str(self.price * qty)
                    }
                ],
                "items": [
                    {
                        "skus": [
                            {
                                "seatPlanId": self.seat_plan_id,
                                "sessionId": self.session_id,
                                "showId": self.show_id,
                                "skuId": self.seat_plan_id,
                                "skuType": "SINGLE",
                                "ticketPrice": self.price,
                                "qty": qty,
                                "deliverMethod": self.deliver_method
                            }
                        ],
                        "spu": {
                            "id": self.show_id,
                            "spuType": "SINGLE"
                        }
                    }
                ]
            }
        else:
            raise Exception("不支持的deliver_method:" + str(self.deliver_method))

        url = "https://m.piaoxingqiu.com/cyy_gatewayapi/trade/buyer/order/v3/create_order"
        response = requests.post(url=url, headers=headers, json=data).json()
        if response["statusCode"] == 200:
            print("下单成功!请尽快支付!")
        else:
            raise Exception("下单异常:" + str(response))
  • 主函数,前面都是函数定义,主要是在主函数执行操作抢票。
  • 我们前面已经知道哪个演唱会show_id,以及哪个场次session_id,现在要在主函数里面的seat_plan_id进行自行赋值,也可以开头初始化方法里面赋值/默认为空,seat_plan_id可以通过前面get_seat_plans函数里的接口地址获取。
# 主函数
    def main(self):
        # 创建随机的useragent
        useragent = self.user_agent.random
        while True:
            try:
                # 获取座位余票信息,默认从最低价开始
                seat_plans = self.get_seat_plans(
上一篇:Rust 组织管理


下一篇:如何给gitlab其他访问者创建账号并增加权限