App下載

Springboot中使用MQTT的詳細教程

耳機依賴患者 2021-08-21 16:41:08 瀏覽數(shù) (11570)
反饋

MQTT的定義相信很多人都能講的頭頭是道,本文章也不討論什么高大上的東西,旨在用最簡單直觀的方式讓每一位剛接觸的同行們可以最快的應用起來

先從使用MQTT需要什么開始分析:

  • 消息服務器
  • 不同應用/設備之間的頻繁交互
  • 可能涉及一對多的消息傳遞

根據(jù)上面列舉的這三點,我們大概可以了解到, MQTT最適合的場景是消息做為系統(tǒng)的重要組成部分,且參與著系統(tǒng)關鍵業(yè)務邏輯的情形

MQTT, 啟動!

既然決定使用它,我們首先要研究的是如何讓MQTT正常工作,畢竟它不是簡單的在maven里加入個依賴就完事的

我們總共需要干如下兩件事:

  • 下載EMQX消息服務器, 作為broker
  • 在maven中引入依賴
<dependency>  
    <groupId>org.springframework.integration</groupId>  
    <artifactId>spring-integration-mqtt</artifactId>  
    <version>5.3.2.RELEASE</version>  
</dependency>

完成上面兩步后, 啟動EMQX服務器, 正式進入我們的MQTT旅途

使用方式

在Spring Boot中使用MQTT的代碼, 筆者總結了如下兩種方式:

  • 使用spring-integration的消息通道概念
  • 使用傳統(tǒng)的Client客戶端概念

第一種會產生一定程度的心智負擔,但在筆者成功搭配(抄襲+造輪子)自動注冊后, 比后者要方便許多

在介紹具體代碼之前, 我們先簡單整理下使用中最常見的概念:

  • 主題: MQTT消息的主要傳播途徑, 我們向主題發(fā)布消息, 訂閱主題, 從主題中讀取消息并進行業(yè)務邏輯處理, 主題是消息的通道
  • 生產者: MQTT消息的發(fā)送者, 他們向主題發(fā)送消息
  • 消費者: MQTT消息的接收者, 他們訂閱自己需要的主題, 并從中獲取消息
  • broker: 消息轉發(fā)器, 消息是通過它來承載的, EMQX就是我們的broker, 在使用中我們不用關心它的具體實現(xiàn)

其實, MQTT的使用流程就是: 生產者給主題發(fā)消息->broker進行消息的傳遞->訂閱該主題的消費者拿到消息并進行相應的業(yè)務邏輯

Client模式

本模式和傳統(tǒng)的數(shù)據(jù)庫鏈接,Redis鏈接基本一致,有開發(fā)經(jīng)驗的小伙伴們可以很輕松的駕馭,我們需要考慮的就是如果創(chuàng)建對應的工廠,是單例模式,還是原型,亦或是造個池子呢?

我們使用單例模式來進行本次的介紹

創(chuàng)建工廠類

首先, 我們創(chuàng)造一個工廠(就不承認設計模式中毒)

public class MqttFactory {  

    private static MqttProperties configuration;  
    
    private static MqttClient client;  
 
    /**
    *   獲取客戶端實例
    *   單例模式, 存在則返回, 不存在則初始化
    */
    public static MqttClient getInstance() {    
        if (client == null) {      
            init();    
        }    
        return client;  
    }  
    
    /**
    *   初始化客戶端
    */
    public static void init() {    
        try {      
            client = new MqttClient(configuration.getAddress(), "client-" + System.currentTimeMillis());      
            // MQTT配置對象
            MqttConnectOptions options = new MqttConnectOptions();      
            // 設置自動重連, 其它具體參數(shù)可以查看MqttConnectOptions
            options.setAutomaticReconnect(true);      
            if (!client.isConnected()) {        
            client.connect(options);      
            }    
        } catch (MqttException e) {      
            LOGGER.error(String.format("MQTT: 連接消息服務器[%s]失敗", configuration.getAddress()));    
        }  
    }
    
}

關于MQTT的具體配置可以查看MqttConnectOptions, 在這里就不做說明了

多嘴一句, 文檔永遠比某些博客給力!!!

創(chuàng)建工具類

接下來, 我們創(chuàng)建MqttUtil, 用于消息的發(fā)送以及主題的訂閱

public class MqttUtil {  

    /**
    *   發(fā)送消息
    *   @param topic 主題
    *   @param data 消息內容
    */
    public static void send(String topic, Object data) {    
        // 獲取客戶端實例
        MqttClient client = MqttFactory.getInstance();    
        ObjectMapper mapper = new ObjectMapper();    
        try {
            // 轉換消息為json字符串
            String json = mapper.writeValueAsString(data);      
            client.publish(topic, new MqttMessage(json.getBytes(StandardCharsets.UTF_8)));    
        } catch (JsonProcessingException e) {      
            LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息轉換json失敗", topic));    
        } catch (MqttException e) {      
            LOGGER.error(String.format("MQTT: 主題[%s]發(fā)送消息失敗", topic));    
        }  
    }
    
    /** 
    * 訂閱主題 
    * @param topic 主題 
    * @param listener 消息監(jiān)聽處理器 
    */
    public static void subscribe(String topic, IMqttMessageListener listener) {  
        MqttClient client = MqttFactory.getInstance();  
        try {    
            client.subscribe(topic, listener);  
        } catch (MqttException e) {    
            LOGGER.error(String.format("MQTT: 訂閱主題[%s]失敗", topic));  
        }
    }
    
}

相信小伙伴們注意到了IMqttMessageListener這個東西, 我們只需要創(chuàng)建一個監(jiān)聽類, 實現(xiàn)IMqttMessageListener接口, 就可以處理消息啦, 代碼如下:

public class MessageListener implements IMqttMessageListener {  

    /** 
    * 處理消息
    * @param topic 主題 
    * @param mqttMessage 消息 
    */
    @Override  
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {   
        LOGGER.info(String.format("MQTT: 訂閱主題[%s]發(fā)來消息[%s]", topic, new String(mqttMessage.getPayload())));  
    }
    
    public static void main(String[] args) {  
        //訂閱主題test01, 使用MessageListener來處理它的消息
        MqttUtil.subscribe("test01", new MessageListener());
    }

}

無論是發(fā)送還是訂閱,是不是都很好理解?

舒服的事情結束后, 帶來的是無盡的折磨和空虛, 來吧, 讓我們挑戰(zhàn)下心智負擔大的第二種模式!

Spring Integration

什么是Spring Integration?對不起,我不知道,我也不想知道

為什么使用Spring Integration?因為它真的很好維護

網(wǎng)上大部分教程都是針對Spring Integration的, 可能是我第一次接觸, 千篇一律看的我莫名其妙, 所以我選擇放棄了他們, 選擇了大神的自動配置方式,并在其基礎上,針對心智負擔進行了相應的調整

還記得我們之前討論過的概念嗎?主題/生產者/消費者

在Spring Integration中,我們新加入一些概念, 并把之前的進行微調:

  • 通道: 消息傳輸和接受的管道, 每一條消息都是通過它鉆進鉆出
  • 客戶端工廠: 用于創(chuàng)建MQTT客戶端, 和模式一中的類似
  • 消息適配器: 用于接收MQTT消息, 進行轉換, 但不參與業(yè)務邏輯
  • 入站通道: 搭配消息適配器, 消息進入站臺的通道
  • 出站通道: 搭配客戶端工廠, 消息發(fā)出站臺的通道
  • 主題: 還是主題, 它不變
  • 生產者: 擁有出站通道的家伙
  • 消費者: 擁有入站通道的家伙

如果能漸漸理解上面定義的話, 這種模式的流程其實可以變成這樣:

  • 生產者: 創(chuàng)建指定客戶端工廠的出站通道->發(fā)送消息
  • 消費者: 創(chuàng)建指定消息適配器的入站通道->接收消息->進入消息攔截器->業(yè)務邏輯

其實在筆者看來, 這符合Spring Boot的理念, 約定優(yōu)于配置

代碼已挪入公司私服, 待后續(xù)個人私服配置好后再補充筆記

總結

MQTT作為消息服務, 能夠滿足我們大部分的開發(fā)需求, 但還有一些遺留問題筆者還沒進行過深入思考和實踐:

  • 如何利用qos機制保證數(shù)據(jù)不會丟失
  • 消息的隊列和排序
  • 集群模式下的應用

以上就是如何在Spring Boot中使用MQTT的詳細內容,更多關于在Spring Boot中使用MQTT的資料,請關注W3Cschool其它相關文章!

1 人點贊