lovelywholesale selenium

技术栈

  • selenium,chrome-driver
  • peewee
  • xlwings

入口: justfashionnow.py


#justfashionnow.py

# app=xw.App(visible=True,add_book=False)
# #不显示Excel消息框
# app.display_alerts=False
# #关闭屏幕更新,可加快宏的执行速度
# app.screen_updating=False
# wb=app.books.open('11133.xlsx')


# app.quit()
from lovelywholesale import CrawlerMain

CrawlerMain.CrawlerMain()

CrawlerMain.py

# CrawlerMain.py

from typing import List

from xlwings import Book, App

from db import db_init
db_init.init_database()

from pathlib import Path

import xlwings as xw

from db.db_entity import add_product_lovelywholesale, count_product_by_id_lovelywholesale
from excel.ExcelXlwingsUtil import add_center
from log_home.my_log import logger
from lovelywholesale.CatRespDto import CatRespDto, BroList
from lovelywholesale.CrawlerAsRestful import req_cat_ls, req_product
from lovelywholesale.ProductRespDto import ProductRespDto
from util.download_image_util import download_file
from util.url_util import urlToFileName

colA:int= 65
headerColIndex=1

fn='lovelywholesale.xlsx'
def CrawlerMain():
    # wb = xw.Book()
    # wb.save('lovelywholesale.xlsx')

    # wb.sheets[0]


    app: App = xw.App(visible=False, add_book=False)
    sheetAddHeader(app)

    rowZ:int = headerColIndex+1

    catRespDto:CatRespDto= req_cat_ls()
    bro_cat_id_list:List[BroList] = catRespDto.data.ur_here.list[0].bro_list



    for cat_id_dto in bro_cat_id_list:
        productRespDtoPage1: ProductRespDto = req_product(cat_id_dto.cat_id, 1)
        for pageI in range(productRespDtoPage1.data.total_page+1):

            wb:Book = app.books.open(fn)
            sheet1 = wb.sheets["sheet1"]

            productRespDto:ProductRespDto= req_product(cat_id_dto.cat_id,pageI)
            logger.info(f"pageI:{pageI},{productRespDto}")
            try:
                rowZ=process1Page(sheet1,productRespDto,rowZ,pageI)
            except Exception as e:
                logger.error("错误",exc_info=e)
            end=True

            wb.save()
            wb.close()


    app.quit()


def sheetAddHeader(app:App):
    wb: Book = app.books.open(fn)
    sheet1 = wb.sheets["sheet1"]

    sheet1.range(f'{chr(colA + 0)}{headerColIndex}').value = "goods_id"

    sheet1.range(f'{chr(colA + 1)}{headerColIndex}').value = "goods_sn"
    sheet1.range(f'{chr(colA + 2)}{headerColIndex}').value = "name"
    sheet1.range(f'{chr(colA + 3)}{headerColIndex}').value = "image"
    sheet1.range(f'{chr(colA + 4)}{headerColIndex}').value = "usd_promote_price"
    sheet1.range(f'{chr(colA + 5)}{headerColIndex}').value = "usd_shop_price"
    sheet1.range(f'{chr(colA + 6)}{headerColIndex}').value = "usd_market_price"
    sheet1.range(f'{chr(colA + 7)}{headerColIndex}').value = "cat_id"
    sheet1.range(f'{chr(colA + 8)}{headerColIndex}').value = "category_name"

    wb.save()
    wb.close()

def process1Page(sheet1,productRespDto:ProductRespDto,rowZ:int,pageI:int):
    for prodK in productRespDto.data.search_list:
        if prodK.goods_id == 0: continue
        id = prodK.goods_id
        rowZ += 1
        image1 = prodK.m_aws_goods_thumb
        name = prodK.goods_name
        if count_product_by_id_lovelywholesale(id=id) > 0:
            logger.info(f"{id}已添加,跳过")
            continue

        logger.info(f"prodK:{prodK},id:{id}")

        add_product_lovelywholesale(id=id, id2=prodK.goods_sn, name1=name, image1url=image1, price1=prodK.usd_promote_price,
                                    price2=prodK.usd_shop_price, price3=prodK.usd_market_price)

        c1 = chr(colA + 0)
        c8 = chr(colA + 8)
        sheet1.range(f'{chr(colA + 0)}{rowZ}').value = prodK.goods_id

        # sheet1.range(f'{chr(colA+1)}{rowZ}').value =k.goods_id
        sheet1.range(f'{chr(colA + 1)}{rowZ}').value = prodK.goods_sn
        sheet1.range(f'{chr(colA + 2)}{rowZ}').value = name
        sheet1.range(f'{chr(colA + 3)}{rowZ}').value = image1
        sheet1.range(f'{chr(colA + 4)}{rowZ}').value = prodK.usd_promote_price
        sheet1.range(f'{chr(colA + 5)}{rowZ}').value = prodK.usd_shop_price
        sheet1.range(f'{chr(colA + 6)}{rowZ}').value = prodK.usd_market_price
        sheet1.range(f'{chr(colA + 7)}{rowZ}').value = prodK.cat_id
        sheet1.range(f'{chr(colA + 8)}{rowZ}').value = prodK.category_name

        imageFilePath = Path.cwd().joinpath("work_home").joinpath(urlToFileName(image1))
        # imageFilePath=f"work_home\\{urlToFileName(image1)}"
        # im = Image.open(requests.get(url, stream=True).raw)
        if not Path(imageFilePath).exists():
            # im = Image.open(requests.get(image1, stream=True).raw)
            try:
                # urllib.request.urlretrieve(image1, imageFilePath)
                download_file(image1)
            except:
                pass

        # rng =sheet1.range(f"{c8}{rowZ}",f"{chr(colA + 9)}{rowZ+4}")
        if Path(imageFilePath).exists():
            add_center(sheet1, f"{chr(colA + 9)}{rowZ}", imageFilePath, width=40, height=40)
        # sheet1.pictures.add(imageFilePath,link_to_file=True, top=rng.top, left=rng.left,scale=0.05)
        # sheet1.pictures.add(imageFilePath, link_to_file=True, top=rng.top, left=rng.left, scale=0.1)
        # sheet1.range("A1:A20").columns.autofit()
        # rng.columns.autofit()
        # rng.rows.autofit()
        # sheet1.autofit()

    return rowZ

ExcelXlwingsUtil.py

#ExcelXlwingsUtil.py
import os

from PIL import Image


def add_center(sht, target, filePath, match=False, width=None, height=None, column_width=None, row_height=None):
  '''Excel智能居中插入图片

  优先级:match > width & height > column_width & row_height
  建议使用column_width或row_height,定义单元格最大宽或高

  :param sht: 工作表
  :param target: 目标单元格,字符串,如'A1'
  :param filePath: 图片绝对路径
  :param width: 图片宽度
  :param height: 图片高度
  :param column_width: 单元格最大宽度,默认100像素,0 <= column_width <= 1557.285
  :param row_height: 单元格最大高度,默认75像素,0 <= row_height <= 409.5
  :param match: 绝对匹配原图宽高,最大宽度1557.285,最大高度409.5
  '''
  unit_width = 6.107 # Excel默认列宽与像素的比
  rng = sht.range(target) # 目标单元格
  name = os.path.basename(filePath) # 文件名
  _width, _height = Image.open(filePath).size # 原图片宽高
  NOT_SET = True # 未设置单元格宽高
  # match
  if match: # 绝对匹配图像
    width, height = _width, _height
  else: # 不绝对匹配图像
    # width & height
    if width or height:
      if not height: # 指定了宽,等比计算高
        height = width / _width * _height
      if not width: # 指定了高,等比计算宽
        width = height / _height * _width
    else:
      # column_width & row_height
      if column_width and row_height: # 同时指定单元格最大宽高
        width = row_height / _height * _width # 根据单元格最大高度假设宽
        height = column_width / _width * _height # 根据单元格最大宽度假设高
        area_width = column_width * height # 假设宽优先的面积
        area_height = row_height * width # 假设高优先的面积
        if area_width > area_height:
          width = column_width
        else:
          height = row_height
      elif not column_width and not row_height: # 均无指定单元格最大宽高
        column_width = 100
        row_height = 75
        rng.column_width = column_width / unit_width # 更新当前宽度
        rng.row_height = row_height # 更新当前高度
        NOT_SET = False
        width = row_height / _height * _width # 根据单元格最大高度假设宽
        height = column_width / _width * _height # 根据单元格最大宽度假设高
        area_width = column_width * height # 假设宽优先的面积
        area_height = row_height * width # 假设高优先的面积
        if area_width > area_height:
          height = row_height
        else:
          width = column_width
      else:
        width = row_height / _height * _width if row_height else column_width # 仅设了单元格最大宽度
        height = column_width / _width * _height if column_width else row_height # 仅设了单元格最大高度
  assert 0 <= width / unit_width <= 255
  assert 0 <= height <= 409.5
  if NOT_SET:
    rng.column_width = width / unit_width # 更新当前宽度
    rng.row_height = height # 更新当前高度
  left = rng.left + (rng.width - width) / 2 # 居中
  top = rng.top + (rng.height - height) / 2
  try:
    sht.pictures.add(filePath, left=left, top=top, width=width, height=height, scale=None, name=name)
  except Exception: # 已有同名图片,采用默认命名
    pass

db_entity.py

#db_entity.py

import traceback

from const import Site
from log_home import my_log
from peewee import *
import datetime
#see https://github.com/coleifer/peewee
#see https://*.com/questions/44984429/how-to-manage-a-peewee-database-in-a-separate-module

db_proxy = Proxy()

# from common import get_db_home
# db = SqliteDatabase(f'{get_db_home()}/kuaishou.db')



class Product(Model):
    class Meta:
        database = db_proxy
        primary_key = CompositeKey('site','id' )

    site = CharField(null=False)
    id = IntegerField(null=False)

    id2 = CharField(null=True)
    name1 = CharField(null=True)
    name2 = CharField(null=True)

    categoryId=IntegerField(null=True)
    categoryIdStr=CharField(null=True)
    categoryName=CharField(null=True)

    #商品图片
    image1url = CharField(null=True)
    image2url = CharField(null=True)
    image3url = CharField(null=True)
    image4url = CharField(null=True)
    #商品价格
    price1 = FloatField(null=True)
    price2 = FloatField(null=True)
    price3 = FloatField(null=True)
    price4 = FloatField(null=True)
    price5 = FloatField(null=True)

    #db记录行日期
    created_date = DateTimeField(default=datetime.datetime.now)
    updated_date = DateTimeField(default=datetime.datetime.now)


    def __str__(self):
        return super().__str__()

def add_product_lovelywholesale(id, id2=None, cat_id:int=None,category_name:str=None, name1=None, name2=None, image1url=None, image2url=None, image3url=None, image4url=None, price1=None, price2=None, price3=None, price4=None, price5=None):
    add_product(site=Site.lovelywholesale, id=id, id2=id2, cat_id=cat_id,category_name=category_name, name1=name1, name2=name2, image1url=image1url, image2url=image2url, image3url=image3url, image4url=image4url, price1=price1, price2=price2, price3=price3, price4=price4, price5=price5)

def add_product(site, id, id2=None, cat_id:int=None,category_name:str=None,name1=None, name2=None, image1url=None, image2url=None, image3url=None, image4url=None, price1=None, price2=None, price3=None, price4=None, price5=None):
    with db_proxy.atomic() as txn:
        try:
            v=Product.create(site=site,id=id,id2=id2, categoryId=cat_id,categoryIdStr=f"{cat_id}",categoryName=category_name,name1=name1,name2=name2, image1url=image1url,image2url=image2url,image3url=image3url,image4url=image4url,price1=price1,price2=price2,price3=price3,price4=price4,price5=price5)
            my_log.logger.info(f"插入商品,{v.id}")
            v.save()
        except IntegrityError as e:
            if str(e).__contains__("UNIQUE constraint failed"):
                my_log.logger.info(f"该商品已经存在,{id}")
            else:
                traceback.print_exc()
                my_log.logger.info(f"其他异常")


def count_product_by_id_lovelywholesale(id):
    return select_product_by_id(Site.lovelywholesale,id).count

def select_product_by_id(site, id):
    with db_proxy.atomic() as txn:
        statement=Product.select().where(Product.site == site,Product.id==id)
        my_log.logger.debug(f"sql {statement.sql()}")
        return statement.execute()


上一篇:mysql新建数据库语句,内含福利


下一篇:在一个窗体中调用另一个窗体的控件或方法(C#)