Storm Bolts

2018-09-28 16:07 更新

Bolts

正如你已經(jīng)看到的,bolts 是一個(gè) Storm 集群中的關(guān)鍵組件。你將在這一章學(xué)到 bolt 生命周期,一些 bolt 設(shè)計(jì)策略,以及幾個(gè)有關(guān)這些內(nèi)容的例子。

Bolt 生命周期

Bolt 是這樣一種組件,它把元組作為輸入,然后產(chǎn)生新的元組作為輸出。實(shí)現(xiàn)一個(gè) bolt 時(shí),通常需要實(shí)現(xiàn) IRichBolt 接口。Bolts 對(duì)象由客戶端機(jī)器創(chuàng)建,序列化為拓?fù)?,并提交給集群中的主機(jī)。然后集群?jiǎn)?dòng)工人進(jìn)程反序列化 bolt,調(diào)用 prepare****,最后開始處理元組。

NOTE:要?jiǎng)?chuàng)建一個(gè) bolt 對(duì)象,它通過構(gòu)造器參數(shù)初始化成員屬性,bolt 被提交到集群時(shí),這些屬性值會(huì)隨著一起序列化。

Bolt 結(jié)構(gòu)

Bolts擁有如下方法:

declareOutputFields(OutputFieldsDeclarer declarer)
    為bolt聲明輸出模式
prepare(java.util.Map stormConf, TopologyContext context, OutputCollector collector)
    僅在bolt開始處理元組之前調(diào)用
execute(Tuple input)
    處理輸入的單個(gè)元組
cleanup()
    在bolt即將關(guān)閉時(shí)調(diào)用  

下面看一個(gè)例子,在這個(gè)例子中 bolt 把一句話分割成單詞列表:

class SplitSentence implements IRichBolt {
    private OutputCollector collector;
    publlic void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}  

正如你所看到的,這是一個(gè)很簡(jiǎn)單的 bolt。值得一提的是在這個(gè)例子里,沒有消息擔(dān)保。這就意味著,如果 bolt 因?yàn)槟承┰騺G棄了一些消息——不論是因?yàn)?bolt 掛了,還是因?yàn)槌绦蚬室鈦G棄的——生成這條消息的 spout 不會(huì)收到任何通知,任何其它的 spouts 和 bolts 也不會(huì)收到。

然而在許多情況下,你想確保消息在整個(gè)拓?fù)浞秶鷥?nèi)都被處理過了。

可靠的 bolts 和不可靠的 bolts

正如前面所說的,Storm 保證通過 spout 發(fā)送的每條消息會(huì)得到所有 bolt 的全面處理?;谠O(shè)計(jì)上的考慮,這意味著你要自己決定你的 bolts 是否保證這一點(diǎn)。

拓?fù)涫且粋€(gè)樹型結(jié)構(gòu),消息(元組)穿過其中一條或多條分支。樹上的每個(gè)節(jié)點(diǎn)都會(huì)調(diào)用 ack(tuple)fail(tuple),Storm 因此知道一條消息是否失敗了,并通知那個(gè)/那些制造了這些消息的 spout(s)。既然一個(gè) Storm 拓?fù)溥\(yùn)行在高度并行化的環(huán)境里,跟蹤始發(fā) spout 實(shí)例的最好方法就是在消息元組內(nèi)包含一個(gè)始發(fā) spout 引用。這一技巧稱做錨定(譯者注:原文為Anchoring)。修改一下剛剛講過的 SplitSentence,使它能夠確保消息都被處理了。

class SplitSentence implenents IRichBolt {
    private OutputCollector collector;

    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(tuple, new Values(word));
        }
        collector.ack(tuple);
    }

    public void cleanup(){}

    public void declareOutputFields(OutputFieldsDeclarer declarer){
        declar.declare(new Fields("word"));
    }
}  

錨定發(fā)生在調(diào)用 collector.emit() 時(shí)。正如前面提到的,Storm 可以沿著元組追蹤到始發(fā)spout。collector.ack(tuple)collector.fail(tuple)會(huì)告知 spout 每條消息都發(fā)生了什么。當(dāng)樹上的每條消息都已被處理了,Storm 就認(rèn)為來自 spout 的元組被全面的處理了。如果一個(gè)元組沒有在設(shè)置的超時(shí)時(shí)間內(nèi)完成對(duì)消息樹的處理,就認(rèn)為這個(gè)元組處理失敗。默認(rèn)超時(shí)時(shí)間為30秒。

NOTE:你可以通過修改Config.TOPOLOGY_MESSAGE_TIMEOUT修改拓?fù)涞某瑫r(shí)時(shí)間。

當(dāng)然了spout需要考慮消息的失敗情況,并相應(yīng)的重試或丟棄消息。

NOTE:你處理的每條消息要么是確認(rèn)的(譯者注:collector.ack())要么是失敗的(譯者注:collector.fail())。Storm 使用內(nèi)存跟蹤每個(gè)元組,所以如果你不調(diào)用這兩個(gè)方法,該任務(wù)最終將耗盡內(nèi)存。

多數(shù)據(jù)流

一個(gè) bolt 可以使用 emit(streamId, tuple) 把元組分發(fā)到多個(gè)流,其中參數(shù) streamId 是一個(gè)用來標(biāo)識(shí)流的字符串。然后,你可以在 TopologyBuilder 決定由哪個(gè)流訂閱它。

多錨定

為了用 bolt 連接或聚合數(shù)據(jù)流,你需要借助內(nèi)存緩沖元組。為了在這一場(chǎng)景下確保消息完成,你不得不把流錨定到多個(gè)元組上??梢韵?emit 方法傳入一個(gè)元組列表來達(dá)成目的。

...
List anchors = new ArrayList();
anchors.add(tuple1);
anchors.add(tuple2);
collector.emit(anchors, values);
...  

通過這種方式,bolt 在任意時(shí)刻調(diào)用 ackfail 方法,都會(huì)通知消息樹,而且由于流錨定了多個(gè)元組,所有相關(guān)的 spout 都會(huì)收到通知。

使用 IBasicBolt 自動(dòng)確認(rèn)

你可能已經(jīng)注意到了,在許多情況下都需要消息確認(rèn)。簡(jiǎn)單起見,Storm 提供了另一個(gè)用來實(shí)現(xiàn)bolt 的接口,IBasicBolt。對(duì)于該接口的實(shí)現(xiàn)類的對(duì)象,會(huì)在執(zhí)行 execute 方法之后自動(dòng)調(diào)用 ack 方法。

class SplitSentence extends BaseBasicBolt {
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        String sentence = tuple.getString(0);
        for(String word : sentence.split(" ")) {
            collector.emit(new Values(word));
        }
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}  

NOTE:分發(fā)消息的 BasicOutputCollector 自動(dòng)錨定到作為參數(shù)傳入的元組。

以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)