服务端,采用 Mosquitto 来转发分发消息。
客户端自己写。
服务端 启动 mosquitto (底下的命令是我自己放到环境变量里面的,通过alias 运行mosquitto)
IshallbeThatIshallbe:~ iamthat$ mosquitto_start
1427629906: mosquitto version 1.3.5 (build date 2014-10-27 15:12:32+0000) starting
1427629906: Config loaded from /usr/local/Cellar/mosquitto/1.3.5/etc/mosquitto/mosquitto.conf.
IshallbeThatIshallbe:~ iamthat$ 1427629906: Opening ipv4 listen socket on port 1883.
1427629906: Opening ipv6 listen socket on port 1883.
客户端代码如下: 记住导入三个ibm 的jar 包。
package mqtthelloworld; import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttNotConnectedException;
import com.ibm.mqtt.MqttPersistenceException; public class Client1 { private final static String CONNECTION_STRING = "tcp://127.0.0.1:1883";
private final static boolean CLEAN_START = true;
private final static short KEEP_ALIVE = 30;//低耗网络,但是又需要及时获取数据,心跳30s
private final static String CLIENT_ID = "client1";
private final static String[] SUBSCRIBE_TOPICS = {
"gabriel"
};
private final static int[] QOS_VALUES = {01};
private static final String[] PUBLISH_TOPICS = {"gabriel"};
private static final String publish_topic="gabriel";
//////////////////
private MqttClient mqttClient; public Client1(){
try {
mqttClient = new MqttClient(CONNECTION_STRING);
SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
mqttClient.registerSimpleHandler(simpleCallbackHandler);//注册接收消息方法
mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
mqttClient.subscribe(SUBSCRIBE_TOPICS, QOS_VALUES);//订阅接主题
/**
* 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
*/
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} private void publishMessage(String message) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException{
mqttClient.publish(publish_topic, message.getBytes(), QOS_VALUES[0], true);
} public static void main(String[] args) throws MqttNotConnectedException, MqttPersistenceException, IllegalArgumentException, MqttException {
Client1 client1=new Client1();
for(int i=0;i<10;i++){
client1.publishMessage("testing Client1 and this is "+i+"th message published");
}
}
}
package mqtthelloworld; import com.ibm.mqtt.MqttSimpleCallback; public class SimpleCallbackHandler implements MqttSimpleCallback { /**
* 当客户机和broker意外断开时触发
* 可以再此处理重新订阅
*/
@Override
public void connectionLost() throws Exception {
// TODO Auto-generated method stub
System.out.println("客户机和broker已经断开");
} /**
* 客户端订阅消息后,该方法负责回调接收处理消息
*/
@Override
public void publishArrived(String topicName, byte[] payload, int Qos, boolean retained) throws Exception {
// TODO Auto-generated method stub
System.out.println("订阅主题: " + topicName);
System.out.println("消息数据: " + new String(payload));
System.out.println("消息级别(0,1,2): " + Qos);
System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): " + retained);
} }
再终端开一个客户端:
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
运行结果:
运行Client1 结果如下:
终端显示
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
1427630077: New connection from ::1 on port 1883.
1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60).
1427630094: New connection from 127.0.0.1 on port 1883.
1427630094: New client connected from 127.0.0.1 as client1 (c1, k30).
gabriel testing Client1 and this is 0th message published
gabriel testing Client1 and this is 1th message published
gabriel testing Client1 and this is 2th message published
gabriel testing Client1 and this is 3th message published
gabriel testing Client1 and this is 4th message published
gabriel testing Client1 and this is 5th message published
gabriel testing Client1 and this is 6th message published
gabriel testing Client1 and this is 7th message published
gabriel testing Client1 and this is 8th message published
gabriel testing Client1 and this is 9th message published
控制台显示:
订阅主题: gabriel
消息数据: testing Client1 and this is 0th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 1th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 2th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
再开一个终端,看看订阅者如何与发布者沟通:
IshallbeThatIshallbe:bin iamthat$ mosquitto_pub -t 'gabriel' -m 'connecting to publisher'
IshallbeThatIshallbe:bin iamthat$
之前的订阅者终端:
IshallbeThatIshallbe:bin iamthat$ mosquitto_sub -v -t 'gabriel'
1427630077: New connection from ::1 on port 1883.
1427630077: New client connected from ::1 as mosqsub/418-IshallbeTha (c1, k60).
1427630094: New connection from 127.0.0.1 on port 1883.
1427630094: New client connected from 127.0.0.1 as client1 (c1, k30).
gabriel testing Client1 and this is 0th message published
gabriel testing Client1 and this is 1th message published
gabriel testing Client1 and this is 2th message published
gabriel testing Client1 and this is 3th message published
gabriel testing Client1 and this is 4th message published
gabriel testing Client1 and this is 5th message published
gabriel testing Client1 and this is 6th message published
gabriel testing Client1 and this is 7th message published
gabriel testing Client1 and this is 8th message published
gabriel testing Client1 and this is 9th message published
1427631029: New connection from ::1 on port 1883.
1427631029: New client connected from ::1 as mosqpub/468-IshallbeTha (c1, k60).
gabriel connecting to publisher
控制台显示:
订阅主题: gabriel
消息数据: testing Client1 and this is 0th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 1th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 2th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 3th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 4th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 5th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 6th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 7th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 8th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: testing Client1 and this is 9th message published
消息级别(0,1,2): 1
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
订阅主题: gabriel
消息数据: connecting to publisher
消息级别(0,1,2): 0
是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): false
可以看出订阅者成功与发布者联系交互。
个人对MQTT的理解是:
就三种角色: 订阅者,服务代理,发布者。
客户端订阅者订阅主题---》 服务代理broker-----> 反馈发布者消息给订阅者(消息的传递通过主题)
发布者创建主题发布消息---》服务端broker ---》反馈给订阅者(消息传递通过主题)
订阅者发送消息---》服务端broker---> 发布者(消息传递通过主题)