源码实践_实现一个mqtt(1)

源码实践_实现一个mqtt(1)

https://gitee.com/maomaomaoge/opmq

写这个原因

因为用开源的emqx,背后的语言是erlang,不是计算的语言,但是天生的分布式,单机测试吞吐非常容易卡死,部署分布式,说实话,我不会,小公司也没有那个人力去部署,一般用一个单机搞,能多大,就多大,所以,我想实现一个单机最大效率的mq(当然这和机器的网卡和cpu有关),只是支持简单的发布订阅即可,不正那么多弯弯绕,小公司用不到的东西

设计topic

因为我没台关注topic的解析,但是可以为以后考虑到是topic主导,还是tcp连接主导。也就是用连接去找topic,还是topic去找tcp,最终敲定topic下面带着tcp切片。数据结构用谷歌的Btree,因为这个数据结构在内存查询很快。

下面是初步实现的topic和tcp的关系,结合了btree

package opmq

import (
	"github.com/google/btree"
	"net"
)

type Topic struct {
	Name string
	Conn []net.Conn
	Hash int64
}

func NewTopic(name string) *Topic {
	t :=  &Topic{
		Name: name,
		Conn: make([]net.Conn, 0),
	}
	t.CalcHash()

	return t
}

func (t *Topic) Less(b btree.Item) bool {
	return t.Hash < b.(*Topic).Hash
}

func (t *Topic) CalcHash()  {
	b := []byte(t.Name)
	for _, v := range b {
		t.Hash += int64(v)
	}
}

测试代码

package opmq

import (
	"flag"
	"fmt"
	"github.com/google/btree"
	"strconv"
	"testing"
)

// all extracts all items from a tree in order as a slice.
func all(t *btree.BTree) (out []btree.Item) {
	t.Ascend(func(a btree.Item) bool {
		out = append(out, a)
		return true
	})
	return
}

func hashHere(s string) int64 {
	h := int64(0)
	b := []byte(s)
	for _, v := range b {
		h += int64(v)
	}

	return h
}

func TestTopic_CalcHash(t *testing.T) {
	var btreeDegree = flag.Int("degree", 32, "B-Tree degree")
	tree := btree.New(*btreeDegree)

	for i := 0; i < 10; i++ {
		to := &Topic{
			Name: strconv.Itoa(i),
		}
		to.CalcHash()

		tree.ReplaceOrInsert(to)
	}

	for _, v := range all(tree) {
		fmt.Println(v)
	}

	fmt.Println("查找")
	to2 := &Topic{
		Name: "1",
	}
	to2.CalcHash()
	fmt.Println(tree.Get(to2))
}

# 输出代码
=== RUN   TestTopic_CalcHash
&{0 [] 48}
&{1 [] 49}
&{2 [] 50}
&{3 [] 51}
&{4 [] 52}
&{5 [] 53}
&{6 [] 54}
&{7 [] 55}
&{8 [] 56}
&{9 [] 57}
查找
&{1 [] 49}
--- PASS: TestTopic_CalcHash (0.00s)
PASS

Process finished with the exit code 0

报文

直接用开源包的报文解析

测试报文

server.go

package opmq

import (
	"fmt"
	"gitee.com/maomaomaoge/opmq/packets"
	"log"
	"net"
)

func Serve() {
	ln, err := net.Listen("tcp", ":7000")
	if err != nil {
		return
	}
	for {
		conn, err := ln.Accept()
		if err != nil {
			log.Fatalln(err)
		}

		go read(conn)
	}
}

func read(conn net.Conn)  {
	for {
		packet, err := packets.ReadPacket(conn)
		if err != nil {
			return
		}

		fmt.Println("收到的报文: ", packet.String())

		ack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
		ack.Write(conn)
	}
}

开源包的最简单代码

/*
 * Copyright (c) 2021 IBM Corp and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    https://www.eclipse.org/legal/epl-2.0/
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Seth Hoenig
 *    Allan Stockdill-Mander
 *    Mike Robertson
 */

package main

import (
	"fmt"
	"log"
	"os"
	"time"

	"github.com/eclipse/paho.mqtt.golang"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("TOPIC: %s\n", msg.Topic())
	fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
	mqtt.DEBUG = log.New(os.Stdout, "", 0)
	mqtt.ERROR = log.New(os.Stdout, "", 0)
	opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:7000")

	c := mqtt.NewClient(opts)
	token := c.Connect()
	if token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for i := 0; i < 5; i++ {
		text := fmt.Sprintf("this is msg #%d!", i)
		token := c.Publish("go-mqtt/sample", 0, false, text)
		token.Wait()
	}

	time.Sleep(6 * time.Second)

	if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	c.Disconnect(250)

	time.Sleep(1 * time.Second)
}

serve收到的报文

=== RUN   TestServe
收到的报文:  CONNECT: dup: false qos: 0 retain: false rLength: 12 protocolversion: 4 protocolname: MQTT cleansession: true willflag: false WillQos: 0 WillRetain: false Usernameflag: false Passwordflag: false keepalive: 30 clientId:  willtopic:  willmessage:  Username:  Password: 

nice, 初步完成

源码实践_实现一个mqtt(1)

上一篇:记录一次doris BE重启故障()


下一篇:CorelDRAW(CDR)利用“交互式网格填充”工具填充对象实例教程