netcore消息发送到kafka通过logstash保存在es当中

简介

  根据项目需求,需要把系统当中的操作日志保存起来,在技术方面采用了Elasticsearch存储日志。

  项目当中把消息发送kafka当中,logstash关联kafka,然后把接收到的消息保存到elsticsearch当中。

  • 采用docker部署
  • kafka
  • zookeeper(用来保存kafka相关配置信息)
  • logstash

安装kafka

  kafka采用zk来管理broker注册,在zk上有一个专门用来broker服务器列表记录节点【/brokers/ids】。每个broker在启动时,都会在zk上进行注册。

  以及消费者和生产者负载均衡和记录消息的偏移量。

  安装kafka一共是安装了三个组件。kafka、kafka-manager(可视化)、zookeeper.

  执行docker-compose 文件。生成对应的容器。

docker-compose -f docker-kafka.yml up -d

netcore消息发送到kafka通过logstash保存在es当中

docker exec -it kafka /bin/bash

cd opt/kafka_2.13.2.6.0/bin

netcore消息发送到kafka通过logstash保存在es当中

kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 2 --partitions 2 --topic useroperationlog    

  安装完成以后进入到容器里面创建相应的topic即可。

安装logstash和elasticsearch

  es和logstash也是用docker-compose执行进行创建。在创建的时候logstash需要挂载对应的配置文件,config和pipeline

netcore消息发送到kafka通过logstash保存在es当中

  安装完成之后需要修改汉化kibana,在kibana.yml文件当中添加i18n.locale: "zh-CN"即可

netcore消息发送到kafka通过logstash保存在es当中

  修改logstash配置,在logstash.conf文件当中进行添加kafka配置topic、分组、kafka地址等信息

netcore消息发送到kafka通过logstash保存在es当中

  修改完配置之后记得重启。然后就可以往kafka的topic当中发送消息。大功告成

netcore消息发送到kafka通过logstash保存在es当中

附docker-compose文件

  elasticsearch的配置

version: '3.3'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
    container_name: elasticsearch
    ports:
      - 9200:9200
    environment:
      - discovery.type=single-node
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
    ulimits:
      memlock:
        soft: -1
        hard: -1
    deploy:
      replicas: 1
      update_config:
         parallelism: 2
         delay: 10s
      restart_policy:
          condition: on-failure1
    volumes:
      - esdata:/usr/share/elasticsearch/data
      - esconfig:/usr/share/elasticsearch/config
      - /etc/localtime:/etc/localtime
    networks:
      - esnet
  kibana:
    image: docker.elastic.co/kibana/kibana:7.9.3
    container_name: kibana
    volumes:
      - kibanaConfig:/usr/share/kibana/config
      - /etc/localtime:/etc/localtime
    ports:
      - 5601:5601
    environment:
      - ELASTICSEARCH_host=http://elasticsearch:9200
    deploy:
      replicas: 1
      update_config:
         parallelism: 2
         delay: 10s
      restart_policy:
          condition: on-failure1
    networks:
      - esnet
  logstash:
    image: logstash:7.9.3
    container_name: logstash
    volumes:
      - logstashConfig:/usr/share/logstash/config
      - pipelinesConfig:/usr/share/logstash/pipeline
    ports:
      - 5044:5044
    deploy:
      replicas: 1
      update_config:
         parallelism: 2
  delay: 10s
      restart_policy:
          condition: on-failure1
    networks:
      - esnet
volumes:
  esdata: {}
  esconfig: {}
  kibanaConfig: {}
  logstashConfig: {}
  pipelinesConfig: {}
networks:
  esnet:
      external: true



  kafka的docker-compose文件。


version: "3"
services:
   zookeeper:
     image: zookeeper
     container_name: zookeeper

     ports:
      - 2181:2181
     networks:
      - kafkanetwork
     volumes:
      - zookeeper_data:/data
      - zookeeper_log:/logs
      - zookeeper_datalog:/datalog
      - /etc/localtime:/etc/localtime
     deploy:
         restart_policy:
             condition: on-failure
   kafka:
     image: wurstmeister/kafka
     ports:
      - 9092:9092
     environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://123.56.23.68:9092
      KAFKA_BROKER_ID: 22
     volumes:
       - /etc/localtime:/etc/localtime
       - kafka_config:/opt/kafka/config/
       - kafka_libs:/opt/kafka/libs/
       - kafka_logs:/kafka
     networks:
       - kafkanetwork
     container_name: kafka
     deploy:
      restart_policy:
          condition: on-failure
   kafka-manager:
    image: sheepkiller/kafka-manager:latest
    ports:
     - 9000:9000
    environment:
     ZK_HOSTS: zookeeper:2181
     APPLICATION_SECRET: LETMEIN
     KM_ARGS: -Djava.net.preferIPv4Stack=true
    networks:
     - kafkanetwork
    deploy:
      restart_policy:
          condition: on-failure
networks:
   kafkanetwork:
     external:
        name: kafkanetwork

volumes:
   zookeeper_data: {}
   zookeeper_log: {}
   zookeeper_datalog: {}
   kafka_config: {}
   kafka_libs: {}
   kafka_logs: {}

上一篇:.NetCore 配合 Gitlab CI&CD 实践


下一篇:Asp.NetCore之AutoMapper基础篇