我的mqtt协议和emqttd开源项目个人理解(4) - 客户端CleanSession=0时,上线接收离线消息,源码分析

学习mqtt协议和emqttd开源项目http://emqtt.com/

emqttd源码版本号是v1.1.3。http://emqtt.com/downloads/1113


1、-module(emqttd_client).

[html] view plain copy

handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->  

received(Bytes, State = #client_state{parser_fun  = ParserFun,  

                                     packet_opts = PacketOpts,  

                                     proto_state = ProtoState}) ->  


2、-module(emqttd_parser).

[html] view plain copy

parse(<<Type:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, {none, Limit}) ->  


3、-module(emqttd_protocol).

解析CONNECT消息

received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
process(Packet = ?CONNECT_PACKET(Var), State0) ->

4、-module(emqttd_sm).

start_session(CleanSess, ClientId) ->
handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->

%% Local node
resume_session(Session = #mqtt_session{client_id = ClientId,
                                       sess_pid  = SessPid}, ClientPid)
    when node(SessPid) =:= node() ->
%% Remote node
resume_session(Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid}, ClientPid) ->

5、-module(emqttd_session).

dequeue(Session = #session{client_pid = undefined}) ->
    %% do nothing if client is disconnected
    Session;
 
dequeue(Session) ->
    case check_inflight(Session) of
        true  -> dequeue2(Session);
        false -> Session
    end.
 
dequeue2(Session = #session{message_queue = Q}) ->
    case emqttd_mqueue:out(Q) of
        {empty, _Q} ->
            Session;
        {{value, Msg}, Q1} ->
            %% dequeue more
            dequeue(deliver(Msg, Session#session{message_queue = Q1}))
    end.
 
deliver(Msg = #mqtt_message{qos = ?QOS_0}, Session = #session{client_pid = ClientPid}) ->
    ClientPid ! {deliver, Msg}, Session; 
 
deliver(Msg = #mqtt_message{qos = QoS}, Session = #session{packet_id = PktId,
                                                           client_pid = ClientPid,
                                                           inflight_queue = InflightQ})
    when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
    Msg1 = Msg#mqtt_message{pktid = PktId, dup = false},
    ClientPid ! {deliver, Msg1},
    await(Msg1, next_packet_id(Session#session{inflight_queue = [{PktId, Msg1}|InflightQ]})).

6、-module(emqttd_mqueue).

out(MQ = #mqueue{type = simple, len = 0}) ->
    {empty, MQ};
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = infinity}) ->
    {R, Q2} = queue:out(Q),
    {R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
    {R, Q2} = queue:out(Q),
    {R, maybe_clear_alarm(MQ#mqueue{q = Q2, len = Len - 1})};
out(MQ = #mqueue{type = priority, q = Q}) ->
    {R, Q2} = priority_queue:out(Q),
    {R, MQ#mqueue{q = Q2}}.

7、-module(emqttd_client).

handle_info({deliver, Message}, State) ->  
    with_proto_state(fun(ProtoState) ->  
                       emqttd_protocol:send(Message, ProtoState)  
                     end, State);  

8、-module(emqttd_protocol).

send(Packet, State = #proto_state{sendfun = SendFun})  
    when is_record(Packet, mqtt_packet) ->  

上一篇:开发微信小程序中SSL协议的申请、证书绑定、TLS 版本处理等


下一篇:CentOS下杀毒工具ClamAV安装