源码实践_实现一个mqtt(1)
写这个原因
因为用开源的emqx,背后的语言是erlang,不是计算的语言,但是天生的分布式,单机测试吞吐非常容易卡死,部署分布式,说实话,我不会,小公司也没有那个人力去部署,一般用一个单机搞,能多大,就多大,所以,我想实现一个单机最大效率的mq(当然这和机器的网卡和cpu有关),只是支持简单的发布订阅即可,不正那么多弯弯绕,小公司用不到的东西
设计topic
因为我没台关注topic的解析,但是可以为以后考虑到是topic主导,还是tcp连接主导。也就是用连接去找topic,还是topic去找tcp,最终敲定topic下面带着tcp切片。数据结构用谷歌的Btree,因为这个数据结构在内存查询很快。
下面是初步实现的topic和tcp的关系,结合了btree
package opmq
import (
"github.com/google/btree"
"net"
)
type Topic struct {
Name string
Conn []net.Conn
Hash int64
}
func NewTopic(name string) *Topic {
t := &Topic{
Name: name,
Conn: make([]net.Conn, 0),
}
t.CalcHash()
return t
}
func (t *Topic) Less(b btree.Item) bool {
return t.Hash < b.(*Topic).Hash
}
func (t *Topic) CalcHash() {
b := []byte(t.Name)
for _, v := range b {
t.Hash += int64(v)
}
}
测试代码
package opmq
import (
"flag"
"fmt"
"github.com/google/btree"
"strconv"
"testing"
)
// all extracts all items from a tree in order as a slice.
func all(t *btree.BTree) (out []btree.Item) {
t.Ascend(func(a btree.Item) bool {
out = append(out, a)
return true
})
return
}
func hashHere(s string) int64 {
h := int64(0)
b := []byte(s)
for _, v := range b {
h += int64(v)
}
return h
}
func TestTopic_CalcHash(t *testing.T) {
var btreeDegree = flag.Int("degree", 32, "B-Tree degree")
tree := btree.New(*btreeDegree)
for i := 0; i < 10; i++ {
to := &Topic{
Name: strconv.Itoa(i),
}
to.CalcHash()
tree.ReplaceOrInsert(to)
}
for _, v := range all(tree) {
fmt.Println(v)
}
fmt.Println("查找")
to2 := &Topic{
Name: "1",
}
to2.CalcHash()
fmt.Println(tree.Get(to2))
}
# 输出代码
=== RUN TestTopic_CalcHash
&{0 [] 48}
&{1 [] 49}
&{2 [] 50}
&{3 [] 51}
&{4 [] 52}
&{5 [] 53}
&{6 [] 54}
&{7 [] 55}
&{8 [] 56}
&{9 [] 57}
查找
&{1 [] 49}
--- PASS: TestTopic_CalcHash (0.00s)
PASS
Process finished with the exit code 0
报文
直接用开源包的报文解析
测试报文
server.go
package opmq
import (
"fmt"
"gitee.com/maomaomaoge/opmq/packets"
"log"
"net"
)
func Serve() {
ln, err := net.Listen("tcp", ":7000")
if err != nil {
return
}
for {
conn, err := ln.Accept()
if err != nil {
log.Fatalln(err)
}
go read(conn)
}
}
func read(conn net.Conn) {
for {
packet, err := packets.ReadPacket(conn)
if err != nil {
return
}
fmt.Println("收到的报文: ", packet.String())
ack := packets.NewControlPacket(packets.Connack).(*packets.ConnackPacket)
ack.Write(conn)
}
}
开源包的最简单代码
/*
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
*/
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/eclipse/paho.mqtt.golang"
)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func main() {
mqtt.DEBUG = log.New(os.Stdout, "", 0)
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker("tcp://127.0.0.1:7000")
c := mqtt.NewClient(opts)
token := c.Connect()
if token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
time.Sleep(6 * time.Second)
if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
c.Disconnect(250)
time.Sleep(1 * time.Second)
}
serve收到的报文
=== RUN TestServe
收到的报文: CONNECT: dup: false qos: 0 retain: false rLength: 12 protocolversion: 4 protocolname: MQTT cleansession: true willflag: false WillQos: 0 WillRetain: false Usernameflag: false Passwordflag: false keepalive: 30 clientId: willtopic: willmessage: Username: Password:
nice, 初步完成