文章轉(zhuǎn)載自公眾號:程序員內(nèi)點事
前幾天粉絲群里有個小伙伴問過:web 頁面的未讀消息(小紅點)怎么實現(xiàn)比較簡單,剛好本周手頭有類似的開發(fā)任務(wù),索性就整理出來供小伙伴們參考,沒準(zhǔn)哪天就能用得上呢。
web
端實時消息推送,常用的實現(xiàn)方式比較多,但萬變不離其宗,底層基本上還是依賴于 websocket
,MQTT
協(xié)議也不例外。
RabbitMQ 搭建
RabbitMQ
的基礎(chǔ)搭建就不詳細(xì)說了,自行百度一步一步搞問題不大,這里主要說一下兩個比較重要的配置。
1、開啟 mqtt 協(xié)議
默認(rèn)情況下RabbitMQ
是不開啟MQTT
協(xié)議的,所以需要我們手動的開啟相關(guān)的插件,而RabbitMQ
的MQTT
協(xié)議分為兩種。
第一種 rabbitmq_mqtt
提供與后端服務(wù)交互使用,對應(yīng)端口1883
。
rabbitmq-plugins enable rabbitmq_mqtt
第二種 rabbitmq_web_mqtt
提供與前端交互使用,對應(yīng)端口15675
。
rabbitmq-plugins enable rabbitmq_web_mqtt
在 RabbitMQ
管理后臺看到如下的顯示,就表示MQTT
協(xié)議開啟成功,到這中間件環(huán)境就搭建完畢了。
使用MQTT
協(xié)議默認(rèn)的交換機(jī) Exchange
為 amp.topic
,而我們訂閱的主題會在 Queues
注冊一個客戶端隊列,路由 Routing key
就是我們設(shè)置的主題。
服務(wù)端消息發(fā)送
web
端實時消息推送一般都是單向的推送,前端接收服務(wù)端推送的消息顯示即可,所以就只實現(xiàn)消息發(fā)送即可。
1、mqtt 客戶端依賴包
引入 spring-integration-mqtt
、org.eclipse.paho.client.mqttv3
兩個工具包實現(xiàn)
<!--mqtt依賴包-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
2、消息發(fā)送者
消息的發(fā)送比較簡單,主要是應(yīng)用到 @ServiceActivator
注解,需要注意messageHandler.setAsync
屬性,如果設(shè)置成 false
,關(guān)閉異步模式發(fā)送消息時可能會阻塞。
@Configuration
public class IotMqttProducerConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(mqttConfig.getServers());
return factory;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "iotMqttInputChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
messageHandler.setAsync(false);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
}
MQTT
對外提供發(fā)送消息的 API
時,需要使用 @MessagingGateway
注解,去提供一個消息網(wǎng)關(guān)代理,參數(shù) defaultRequestChannel
指定發(fā)送消息綁定的channel
。
可以實現(xiàn)三種API
接口,payload
為發(fā)送的消息,topic
發(fā)送消息的主題,qos
消息質(zhì)量。
@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {
// 向默認(rèn)的 topic 發(fā)送消息
void sendMessage2Mqtt(String payload);
// 向指定的 topic 發(fā)送消息
void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
// 向指定的 topic 發(fā)送消息,并指定服務(wù)質(zhì)量參數(shù)
void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
前端消息訂閱
前端使用與服務(wù)端對應(yīng)的工具 paho-mqtt
mqttws31.js
實現(xiàn),實現(xiàn)方式與傳統(tǒng)的 websocket
方式差不多,核心方法 client = new Paho.MQTT.Client
和 各種監(jiān)聽事件,代碼比較簡潔。
注意:要保證前后端 clientId
的全局唯一性,我這里就簡單用隨機(jī)數(shù)解決了
<script type="text/javascript">
// mqtt協(xié)議rabbitmq服務(wù)
var brokerIp = location.hostname;
// mqtt協(xié)議端口號
var port = 15675;
// 接受推送消息的主題
var topic = "push_message_topic";
// mqtt連接
client = new Paho.MQTT.Client(brokerIp, port, "/ws", "clientId_" + parseInt(Math.random() * 100, 10));
var options = {
timeout: 3, //超時時間
keepAliveInterval: 30,//心跳時間
onSuccess: function () {
console.log(("連接成功~"));
client.subscribe(topic, {qos: 1});
},
onFailure: function (message) {
console.log(("連接失敗~" + message.errorMessage));
}
};
// 考慮到https的情況
if (location.protocol == "https:") {
options.useSSL = true;
}
client.connect(options);
console.log(("已經(jīng)連接到" + brokerIp + ":" + port));
// 連接斷開事件
client.onConnectionLost = function (responseObject) {
console.log("失去連接 - " + responseObject.errorMessage);
};
// 接收消息事件
client.onMessageArrived = function (message) {
console.log("接受主題: " + message.destinationName + "的消息: " + message.payloadString);
$("#arrivedDiv").append("<br/>"+message.payloadString);
var count = $("#count").text();
count = Number(count) + 1;
$("#count").text(count);
};
// 推送給指定主題
function sendMessage() {
var a = $("#message").val();
if (client.isConnected()) {
var message = new Paho.MQTT.Message(a);
message.destinationName = topic;
client.send(message);
}
}
</script>
測試
前后端的代碼并不多,接下來我們測試一下,弄了個頁面看看效果。
首先用 postman
模擬后端發(fā)送消息
http://127.0.0.1:8080/fun/sendMessage?message=我是程序員內(nèi)點事&topic=push_message_topic
再看一下前端訂閱消息的效果,看到消息被實時推送到了前端,這里只做了未讀消息數(shù)量統(tǒng)計,一般還會做未讀消息詳情列表。
總結(jié)
未讀消息是一個十分常見的功能,不管是 web
端還是移動端系統(tǒng)都是必備的模塊,MQTT
協(xié)議只是其中的一種實現(xiàn)方式,還是有必要掌握一種方法。具體用什么工具實現(xiàn)還是要看具體的業(yè)務(wù)場景和學(xué)習(xí)成本,像我用RabbitMQ
做還考慮到一些運維成本在里邊。
以上就是W3Cschool編程獅
關(guān)于未讀消息(小紅點),前端 與 RabbitMQ 實時消息推送實踐,賊簡單~的相關(guān)介紹了,希望對大家有所幫助。