学习mqtt协议和emqttd开源项目http://emqtt.com/
emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113
1、-module(emqttd_client).
[html] view plain copy
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
received(Bytes, State = #client_state{parser_fun = ParserFun,
packet_opts = PacketOpts,
proto_state = ProtoState}) ->
2、-module(emqttd_parser).
[html] view plain copy
parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->
3、-module(emqttd_protocol).
解析CONNECT消息
received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) -> process(Packet = ?CONNECT_PACKET(Var), State0) ->
4、-module(emqttd_sm).
start_session(CleanSess, ClientId) -> handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
%% Local node resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) when node(SessPid) =:= node() -> %% Remote node resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->
5、-module(emqttd_session).
dequeue(Session = #session{client_pid = undefined}) -> %% do nothing if client is disconnected Session; dequeue(Session) -> case check_inflight(Session) of true -> dequeue2(Session); false -> Session end. dequeue2(Session = #session{message_queue = Q}) -> case emqttd_mqueue:out(Q) of {empty, _Q} -> Session; {{value, Msg}, Q1} -> %% dequeue more dequeue(deliver(Msg, Session#session{message_queue = Q1})) end. deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) -> ClientPid ! {deliver, Msg}, Session; deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId, client_pid = ClientPid, inflight_queue = InflightQ}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> Msg1 = Msg#mqtt_message{pktid = PktId, dup = false}, ClientPid ! {deliver, Msg1}, await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).
6、-module(emqttd_mqueue).
out(MQ = #mqueue{type = simple, len = 0}) -> {empty, MQ}; out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) -> {R, Q2} = queue:out(Q), {R, MQ#mqueue{q = Q2, len = Len - 1}}; out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> {R, Q2} = queue:out(Q), {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})}; out(MQ = #mqueue{type = priority, q = Q}) -> {R, Q2} = priority_queue:out(Q), {R, MQ#mqueue{q = Q2}}.
7、-module(emqttd_client).
handle_info({deliver, Message}, State) -> with_proto_state(fun(ProtoState) -> emqttd_protocol:send(Message, ProtoState) end, State);
8、-module(emqttd_protocol).