LogAgen的工作流程

LogAgen的工作流程:

一.读日志 --tailf 第三方库

新建tail_test/main.go

package main

import (
    "fmt"
    "github.com/hpcloud/tail"
    "time"
)

func main() {
    fileName := "./my.log"
    //fileName := "/tmp/kafka-logs/web_log-0/00000000000000000000.log"
    config := tail.Config{
        ReOpen:    true,                                 //重新打开
        Follow:    true,                                 //是否跟随
        Location:  &tail.SeekInfo{Offset: 0, Whence: 2}, //从文件的哪个地方开始读
        MustExist: false,                                //文件不存在不报错
        Poll:      true,
    }
    tails, err := tail.TailFile(fileName, config)
    if err != nil {
        fmt.Println("tail file failed,err:", err)
        return
    }
    var (
        line *tail.Line
        ok  bool
    )
    for {
        line,ok = <-tails.Lines
        if !ok {
            fmt.Printf("tail file close reopen,filename:%s\n",tails,fileName)
            time.Sleep(time.Second)
            continue
        }
        fmt.Println("line:",line.Text)
    }
}

执行下面的命令:

export GO111MODULE=on
export GOPROXY=https://goproxy.cn
go mod init
go mod tidy
go build client.go
./client

二. 往kafka写日志--sarama

window安装方法参考;

https://docs.qq.com/doc/DTmdldEJJVGtTRkFi

Ubuntu18.04 安装Jdk1.8

环境信息: OS:Ubuntu18.04 JDK:8

第一步:安装JDK

https://blog.csdn.net/hunt_er/article/details/82850509

1.输入命令:sudo apt install openjdk-8-jre-headless,回车

2.java -version   //查看版本

第二步:安装zookeeper

1.下载:kafka_2.12-2.4.0.tgz

打开下面的网址下载:

http://kafka.apache.org/downloads

wget http://mirror.bit.edu.cn/apache/kafka/2.4.0/kafka_2.12-2.4.0.tgz
tar -zxvf kafka_2.12-2.4.0.tgz
配置
  1. 打开config\zookeeper.properties文件
  2. 修改dataDir=E:\zookeeper3.5.5\data (windows下配置)
启动

cd /usr/local/kafka_2.12-2.4.0/bin

./zookeeper-server-start.sh ../config/zookeeper.properties

第三步:安装kafka

配置
  1. 打开config目录下的server.properties文件
  2. 修改log.dirs=E:\kafkalogs  (windows下配置)
启动
cd /usr/local/kafka_2.12-2.4.0/bin
./kafka-server-start.sh ../config/server.properties

三.编辑代码

新建client.go文件

package main

import (
    "fmt"

    "github.com/Shopify/sarama"
)

// 基于sarama第三方库开发的kafka client

func main() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回

    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "web_log"
    msg.Value = sarama.StringEncoder("this is a test log11")
    // 连接kafka
    client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    fmt.Println("连接成功.")
    defer client.Close()
    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

执行下面的命令:

export GO111MODULE=on
export GOPROXY=https://goproxy.cn
go mod init
go mod tidy
go build client.go
./client

查看kafka-logs/web_log-0日志文件夹里会在如下四个文件:

haima@haima-PC:/tmp/kafka-logs/web_log-0$ ls
00000000000000000000.index  00000000000000000000.timeindex
00000000000000000000.log    leader-epoch-checkpoint
上一篇:【解决了一个问题】腾讯云中使用ckafka生产消息时出现“kafka server: Message contents does not match its CRC.”错误


下一篇:Kafka入门(3):Sarama生产者是如何工作的