创建容器
建立网桥
docker network create -d bridge --subnet=18.0.0.0/24 dev_bridage
启动redis
docker run -itd --name=redis-dev -p=6379:6379 --network=dev_bridage --ip=18.0.0.2 redis:6.0
启动mongo
docker run -itd --name=mongo-dev -p=27017:27017 --network=dev_bridage --ip=18.0.0.3 mongo:4.2.6-bionic
启动zookeeper
docker run -itd --name=zookeeper-dev -p=2181:2181 --network=dev_bridage --ip=18.0.0.4 zookeeper:3.6.3
启动kafka
docker run -itd --name=kafka-dev -p=9092:9092 --network=dev_bridage --ip=18.0.0.5 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=18.0.0.4:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.8.1
启动ES
docker run -itd --name=elasticsearch-dev -p 9200:9200 -p 9300:9300 --network=dev_bridage --ip=18.0.0.10 -e discovery.type=single-node -e bootstrap.memory_lock=false -e node.ml=false -e ES_JAVA_OPTS="-Xms256m -Xmx256m" elasticsearch:7.11.1
启动kibana
docker run -itd --name kibana -p 5601:5601 --network=dev_bridage --ip=18.0.0.11 -e ELASTICSEARCH_URL=http://18.0.0.10:9200 docker.elastic.co/kibana/kibana:7.11.1
注意启动kibana,需要修改配置
进入shell
编辑kibana.yal文件
修改为
初始化
kafka初始化,创建topic
from kafka.admin import KafkaAdminClient, NewTopic import logging import config from utils.log import log_config logger = logging.getLogger(__name__) def migrate(name: str) -> None: topics = { "bank-card": {"num_partitions": 12, "replication_factor": 1}, } bootstrap_servers = config.KAFKA['PRODUCERS'][name]['bootstrap_servers'] admin = KafkaAdminClient(bootstrap_servers=bootstrap_servers, client_id=name) exists_topics = admin.list_topics() for name, conf in topics.items(): if name in exists_topics: logger.info("kafka index %s was already exists!" % name) continue admin.create_topics( new_topics=[NewTopic(name=name, **conf)], validate_only=False ) logger.info("kafka index %s was created!" % name) if __name__ == '__main__': log_config() migrate(name='anti_launder'
创建mongo集合
import logging import pymongo from utils.mongo import MongoManager logger = logging.getLogger(__name__) async def migrate(): bankcard_website = await MongoManager.create_collection( collection="collection-name", name="db-name" ) indexes = await bankcard_website.index_information() logger.info(f"indexes is: {indexes}") await bankcard_website.create_index([("update_time", pymongo.ASCENDING)]) await bankcard_website.create_index([("create_time", pymongo.ASCENDING)]) await bankcard_website.create_index( [("payment_domain", pymongo.ASCENDING), ("package_name", pymongo.ASCENDING)], unique=True ) await bankcard_website.create_index([("package_name", pymongo.ASCENDING)]) if __name__ == '__main__': migrate()
创建es index
import logging import asyncio from utils.es import ElasticSearchClient logger = logging.getLogger(__name__) async def migrate(name: str = "default") -> None: ilms = { 'index_policy': { 'body': { "policy": { "phases": { "hot": { "min_age": "0ms", "actions": { "rollover": { "max_age": "30d" } } }, "delete": { "min_age": "3650d", "actions": { "delete": {} } } } } } } } templates = { 'index_raw': { 'body': { "index_patterns": ["index-raw-*"], "mappings": { "_source": { "enabled": True, }, "dynamic": True, "numeric_detection": False, "date_detection": True, "dynamic_date_formats": [ "strict_date_optional_time", "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z" ] }, "settings": { "index.lifecycle.name": "index_policy", "index.lifecycle.rollover_alias": "index_raw" }, "aliases": { "index_raw_all": {} } }, 'create': True, 'order': 101 }, } indices = { 'index-raw-000001': { 'body': { 'aliases': { 'index_raw': {}, 'index_raw_all': {} } } }, } client = ElasticSearchClient.get(name=name) for policy, conf in ilms.items(): await ElasticSearchClient.safe_create_ilm(client=client, name=policy, conf=conf) for name, conf in templates.items(): await ElasticSearchClient.safe_create_template(client=client, name=name, conf=conf) for index, conf in indices.items(): await ElasticSearchClient.safe_create_index(client=client, name=index, conf=conf) await ElasticSearchClient.close_all() if __name__ == '__main__': from utils.log import log_config log_config() asyncio.run(migrate())