package hangqing
import (
"bufio"
"bytes"
"compress/flate"
"encoding/json"
"github.com/gorilla/websocket"
"io/ioutil"
"log"
"net/http"
"net/url"
"strings"
"sync"
"time"
)
type ServerAddrRsp struct {
Code string `json:"code"`
Server string `json:"server"`
}
type Hq struct {
token string //jvQuant Token
server string //websocket服务器地址
conn *websocket.Conn //websocket连接
cmdChan chan string
exitChan chan int
lv1Deal func(string string) //level1行情处理方法
lv2Deal func(string string) //level2行情处理方法
wg *sync.WaitGroup
}
//实例初始化
func (hq *Hq) Construct(token, serAddr string, lv1Handle, lv2Handle func(string string)) {
hq.token = token
if serAddr == "" {
hq.server = hq.initServer()
}
hq.lv1Deal = lv1Handle
hq.lv2Deal = lv2Handle
hq.conn = hq.connect()
hq.wg = &sync.WaitGroup{}
hq.cmdChan = make(chan string, 128)
hq.exitChan = make(chan int)
//接收协程
hq.wg.Add(2)
go func() {
hq.receive()
hq.wg.Done()
}()
//发送协程
go func() {
hq.cmd()
hq.wg.Done()
}()
}
//获取行情服务器地址
func (hq *Hq) initServer() (server string) {
params := url.Values{
"market": []string{"ab"},
"type": []string{"websocket"},
"token": []string{hq.token},
}
req := "http://jvquant.com/query/server?" + params.Encode()
rb, err := HttpOnce(req, nil, nil, 3000)
if err != nil {
log.Fatalln("获取行情服务器地址失败:", req, err)
}
rspMap := ServerAddrRsp{}
err = json.Unmarshal(rb, &rspMap)
if err != nil {
log.Fatalln("解析行情服务器地址失败:", string(rb), err)
}
server = rspMap.Server
if rspMap.Code != "0" || server == "" {
log.Fatalln("解析行情服务器地址失败:", string(rb))
}
log.Println("获取行情服务器地址成功:", server)
return
}
//连接行情服务器
func (hq Hq) connect() (conn *websocket.Conn) {
wsUrl := hq.server + "?token=" + hq.token
conn, _, err := websocket.DefaultDialer.Dial(wsUrl, nil)
if err != nil {
log.Fatalln("行情服务器连接错误:", err)
}
return
}
//增加level1行情订阅
func (hq Hq) AddLv1(codeArr []string) {
cmd := "add="
cmdArr := []string{}
for _, code := range codeArr {
cmdArr = append(cmdArr, "lv1_"+code)
}
cmd = cmd + strings.Join(cmdArr, ",")
hq.SendRawCmd(cmd)
}
//增加level2行情订阅
func (hq Hq) AddLv2(codeArr []string) {
cmd := "add="
cmdArr := []string{}
for _, code := range codeArr {
cmdArr = append(cmdArr, "lv2_"+code)
}
cmd = cmd + strings.Join(cmdArr, ",")
hq.SendRawCmd(cmd)
}
//指令入队列
func (hq Hq) SendRawCmd(cmd string) {
hq.cmdChan <- cmd
}
//关闭行情连接
func (hq Hq) Close() {
close(hq.cmdChan)
hq.exitChan <- 1
hq.conn.Close()
}
//线程阻塞等待
func (hq Hq) Wait() {
hq.wg.Wait()
}
//websocket指令发送
func (hq Hq) cmd() {
for cmd := range hq.cmdChan {
log.Println("发送指令:" + cmd)
err := hq.conn.WriteMessage(websocket.TextMessage, []byte(cmd))
if err != nil {
log.Println("指令发送错误:", err)
}
}
}
//websocket行情接收处理
func (hq Hq) receive() {
for {
select {
case <-hq.exitChan:
log.Print("接收协程退出")
return
default:
//阻塞接收
messageType, rb, err := hq.conn.ReadMessage()
if err != nil {
log.Print("接收错误:", err)
}
//文本消息
if messageType == websocket.TextMessage {
log.Println("Text响应:", string(rb))
}
//二进制消息
if messageType == websocket.BinaryMessage {
unZipByte := DeCompress(rb)
text := string(unZipByte)
ex1 := strings.Split(text, "\n")
for _, ex1r := range ex1 {
ex2 := strings.Split(ex1r, "=")
if len(ex2) == 2 {
code := ex2[0]
hqs := ex2[1]
if strings.HasPrefix(code, "lv1_") {
hq.lv1Deal(hqs)
}
if strings.HasPrefix(code, "lv2_") {
hq.lv2Deal(hqs)
}
}
}
}
}
}
}
//二进制数据解压方法
func DeCompress(b []byte) []byte {
var buffer bytes.Buffer
buffer.Write([]byte(b))
reader := flate.NewReader(&buffer)
var result bytes.Buffer
result.ReadFrom(reader)
reader.Close()
return result.Bytes()
}
//http请求封装
func HttpOnce(Url string, headers, postData map[string]string, msTimeOut int) (r []byte, err error) {
client := &http.Client{
Timeout: time.Duration(time.Duration(msTimeOut) * time.Millisecond),
}
method := http.MethodGet
r = []byte{}
err = nil
if len(headers) == 0 {
headers = map[string]string{}
}
if len(postData) != 0 {
method = http.MethodPost
headers["Content-Type"] = "application/x-www-form-urlencoded"
}
postParam := url.Values{}
for k, v := range postData {
postParam.Set(k, v)
}
postParamBuff := bytes.NewBufferString(postParam.Encode())
req, err := http.NewRequest(method, Url, postParamBuff)
if err != nil {
return r, err
}
for k, v := range headers {
req.Header.Add(k, v)
}
resp, er := client.Do(req)
if er != nil {
err = er
return
}
defer resp.Body.Close()
if err != nil {
return r, err
}
br := bufio.NewReader(resp.Body)
r, err = ioutil.ReadAll(br)
return r, err
}