FTP 文件传输服务

昨晚心血来潮,尝试用python写了一个ftp文件传输服务,可以接收指令,从远程ftp服务器同步指定目录数据,最后没用上,开源出来。

https://github.com/jadepeng/ftp_transfer_service.git

运行原理

  • 'task_server' 是一个web服务器,可以接收传入任务,接收到任务后,将task写入mysql
  • 启动任务后,'task_server'会扫描ftp文件列表,写入redis队列
  • transfer_client 是传输执行程序,可以多点部署,该程序会读取redis队列,进行文件下载

使用

配置

修改 .env 文件, 配置mysql和redis地址

REDIS_SERVER=""
REDIS_PORT=6380
REDIS_PASSWORD=""
MYSQL_HOST=""
MYSQL_PORT=3306
MYSQL_PASSWORD=""
MYSQL_USER=""
MYSQL_DB=""

启动服务

server 端

python3 task_server.py

传输端,可以部署多个

python3 transfer_client.py

接收任务

POST /task/

{
  "taskId": "9",
  "serverPath": "/weblog",
  "storagePath": "/data",
  "host": "ftpServer",
  "port": 21,
  "user": "user",
  "password": "password"
}

启动传输

GET /task/{taskId}/start

查看进度

GET /task/{taskId}/progress

实现简介

第一次用fastapi来写web服务,这里记录下有意思的地方。

配置

可以通过配置类实现app的配置参数,pydantic还可以加载env文件更新配置

setting.py

from pydantic import BaseSettings


class APISettings(BaseSettings):
    mysql_host: str = "127.0.0.1"
    mysql_port: int = 3306
    mysql_password: str
    mysql_user: str
    mysql_db: str
    redis_server: str = "127.0.0.1"
    redis_port: int = 6380
    redis_password: str

    max_wait_time_count: int = 10

    class Config:
        env_file = ".env"
        env_file_encoding = 'utf-8'

redis 队列

通过list实现队列,rpush,blpop

import redis

class RedisQueue(object):

    def __init__(self, name, namespace='queue', **redis_kwargs):
        self.__db= redis.Redis(**redis_kwargs)
        self.key = '%s:%s' %(namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回队列里面list内元素的数量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到队列最右方

    def get_wait(self, timeout=None):
        item = self.__db.blpop(self.key, timeout=timeout)
        return item

    def get_nowait(self):
        item = self.__db.lpop(self.key)
        return item

redis BloomFilter

BloomFilter 可以用来去重

import mmh3
import redis


class BloomFilter(object):
    def __init__(self, bf_key, bit_size=2000000, hash_count=4, start_seed=41, **redis_kwargs):
        self.bit_size = bit_size
        self.hash_count = hash_count
        self.start_seed = start_seed
        self.client = redis.Redis(**redis_kwargs)
        self.bf_key = bf_key

    def add(self, data):
        bit_points = self._get_hash_points(data)
        for index in bit_points:
            self.client.setbit(self.bf_key, index, 1)

    def madd(self, m_data):
        if isinstance(m_data, list):
            for data in m_data:
                self.add(data)
        else:
            self.add(m_data)

    def exists(self, data):
        bit_points = self._get_hash_points(data)
        result = [
            self.client.getbit(self.bf_key, index) for index in bit_points
        ]
        return all(result)

    def mexists(self, m_data):
        result = {}
        if isinstance(m_data, list):
            for data in m_data:
                result[data] = self.exists(data)
        else:
            result[m_data] = self.exists[m_data]
        return result

    def _get_hash_points(self, data):
        return [
            mmh3.hash(data, index) % self.bit_size
            for index in range(self.start_seed, self.start_seed +
                               self.hash_count)
        ]

python的orm框架sqlalchemy

sqlalchemy 需要先定义ORM类

class TransferTask(Base):
    __tablename__ = 'transfer_task'

    taskId = Column(String(255), primary_key=True, index=True)
    serverPath = Column(String(255), nullable=False)
    storagePath = Column(String(255), nullable=False)
    host = Column(String(255), nullable=False)
    port = Column(Integer, nullable=False)
    user = Column(String(255), nullable=False)
    password = Column(String(255), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferFailedFile(Base):
    __tablename__ = 'transfer_failed_file'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    taskId = Column(String(255), index=True)
    filePath = Column(String(1024), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferProgress(Base):
    __tablename__ = 'transfer_task_progress'

    taskId = Column(String(255), primary_key=True, index=True)
    total = Column(Integer, nullable=False)
    status = Column(Integer, nullable=False)
    finished = Column(Integer, nullable=False)
    failed = Column(Integer, nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

if __name__ == '__main__':
    settings = APISettings()
    db = Database(settings.mysql_host, settings.mysql_port, settings.mysql_user, settings.mysql_password,
                  settings.mysql_db)
    Base.metadata.create_all(db.engine)
上一篇:10.2.3.3 -FTP


下一篇:Kali Linux(Debian)安装FTP服务器vsftpd教程