Apache Storm Trident

2018-01-04 18:55 更新

TridentStorm的延伸。像Storm,Trident也是由Twitter開發(fā)的。開發(fā)Trident的主要原因是在Storm上提供高級(jí)抽象,以及狀態(tài)流處理和低延遲分布式查詢。

Trident使用spoutbolt,但是這些低級(jí)組件在執(zhí)行之前由Trident自動(dòng)生成。 Trident具有函數(shù),過濾器,聯(lián)接,分組和聚合。

Trident將流處理為一系列批次,稱為事務(wù)。通常,這些小批量的大小將是大約數(shù)千或數(shù)百萬個(gè)元組,這取決于輸入流。這樣,Trident不同于Storm,它執(zhí)行元組一元組處理。

批處理概念非常類似于數(shù)據(jù)庫事務(wù)。每個(gè)事務(wù)都分配了一個(gè)事務(wù)ID。該事務(wù)被認(rèn)為是成功的,一旦其所有的處理完成。然而,處理事務(wù)的元組中的一個(gè)的失敗將導(dǎo)致整個(gè)事務(wù)被重傳。對(duì)于每個(gè)批次,Trident將在事務(wù)開始時(shí)調(diào)用beginCommit,并在結(jié)束時(shí)提交。

Trident拓?fù)?/h2>

Trident API公開了一個(gè)簡(jiǎn)單的選項(xiàng),使用“TridentTopology”類創(chuàng)建Trident拓?fù)洹?span>基本上,Trident拓?fù)鋸牧鞒鼋邮蛰斎肓?,并?duì)流上執(zhí)行有序的操作序列(濾波,聚合,分組等)。Storm元組被替換為Trident元組,bolt被操作替換。一個(gè)簡(jiǎn)單的Trident拓?fù)淇梢詣?chuàng)建如下 -

TridentTopology topology = new TridentTopology();

Trident Tuples

Trident Tuples是一個(gè)命名的值列表。TridentTuple接口是Trident拓?fù)涞臄?shù)據(jù)模型。TridentTuple接口是可由Trident拓?fù)涮幚淼臄?shù)據(jù)的基本單位。

Trident Spout

Trident spout與類似于Storm spout,附加選項(xiàng)使用Trident的功能。實(shí)際上,我們?nèi)匀豢梢允褂肐RichSpout,我們?cè)赟torm拓?fù)渲惺褂盟?,但它本質(zhì)上是非事務(wù)性的,我們將無法使用Trident提供的優(yōu)點(diǎn)。

具有使用Trident的特征的所有功能的基本spout是“ITridentSpout”。它支持事務(wù)和不透明的事務(wù)語義。其他的spouts是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout。

除了這些通用spouts,Trident有許多樣品實(shí)施trident spout其中之一是FeederBatchSpout輸出,我們可以使用它發(fā)送trident tuples的命名列表,而不必?fù)?dān)心批處理,并行性等。

FeederBatchSpout創(chuàng)建和數(shù)據(jù)饋送可以如下所示完成 -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

Trident操作

Trident依靠“Trident操作”來處理trident tuples的輸入流。Trident API具有多個(gè)內(nèi)置操作來處理簡(jiǎn)單到復(fù)雜的流處理。這些操作的范圍從簡(jiǎn)單驗(yàn)證到復(fù)雜的trident tuples分組和聚合。讓我們經(jīng)歷最重要和經(jīng)常使用的操作。

過濾

過濾器是用于執(zhí)行輸入驗(yàn)證任務(wù)的對(duì)象。Trident過濾器獲取trident tuples字段的子集作為輸入,并根據(jù)是否滿足某些條件返回真或假。如果返回true,則該元組保存在輸出流中;否則,從流中移除元組。過濾器將基本上繼承自BaseFilter類并實(shí)現(xiàn)isKeep方法。這里是一個(gè)濾波器操作的示例實(shí)現(xiàn) -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用“each”方法在拓?fù)渲姓{(diào)用過濾器功能?!癋ields”類可以用于指定輸入(trident tuple的子集)。示例代碼如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

函數(shù)

函數(shù)是用于對(duì)單個(gè)trident tuple執(zhí)行簡(jiǎn)單操作的對(duì)象。它需要一個(gè)trident tuple字段的子集,并發(fā)出零個(gè)或多個(gè)新的trident tuple字段。

函數(shù)基本上從BaseFunction類繼承并實(shí)現(xiàn)execute方法。下面給出了一個(gè)示例實(shí)現(xiàn):

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

與過濾操作類似,可以使用每個(gè)方法在拓?fù)渲姓{(diào)用函數(shù)操作。示例代碼如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是用于對(duì)輸入批處理或分區(qū)或流執(zhí)行聚合操作的對(duì)象。Trident有三種類型的聚合。他們?nèi)缦?-

  • aggregate -單獨(dú)聚合每批trident tuple。在聚合過程期間,首先使用全局分組將元組重新分區(qū),以將同一批次的所有分區(qū)組合到單個(gè)分區(qū)中。

  • partitionAggregate -聚合每個(gè)分區(qū),而不是整個(gè)trident tuple。分區(qū)集合的輸出完全替換輸入元組。分區(qū)集合的輸出包含單個(gè)字段元組。

  • persistentaggregate -聚合所有批次中的所有trident tuple,并將結(jié)果存儲(chǔ)在內(nèi)存或數(shù)據(jù)庫中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator,ReducerAggregator或通用Aggregator接口創(chuàng)建聚合操作。上面例子中使用的“計(jì)數(shù)”聚合器是內(nèi)置聚合器之一,它使用“CombinerAggregator”實(shí)現(xiàn),實(shí)現(xiàn)如下 -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分組

分組操作是一個(gè)內(nèi)置操作,可以由groupBy方法調(diào)用。groupBy方法通過在指定字段上執(zhí)行partitionBy來重新分區(qū)流,然后在每個(gè)分區(qū)中,它將組字段相等的元組組合在一起。通常,我們使用“groupBy”以及“persistentAggregate”來獲得分組聚合。示例代碼如下 -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和連接

合并和連接可以分別通過使用“合并”和“連接”方法來完成。合并組合一個(gè)或多個(gè)流。加入類似于合并,除了加入使用來自兩邊的trident tuple字段來檢查和連接兩個(gè)流的事實(shí)。此外,加入將只在批量級(jí)別工作。示例代碼如下 -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

狀態(tài)維護(hù)

Trident提供了狀態(tài)維護(hù)的機(jī)制。狀態(tài)信息可以存儲(chǔ)在拓?fù)浔旧碇?,否則也可以將其存儲(chǔ)在單獨(dú)的數(shù)據(jù)庫中。原因是維護(hù)一個(gè)狀態(tài),如果任何元組在處理過程中失敗,則重試失敗的元組。這會(huì)在更新狀態(tài)時(shí)產(chǎn)生問題,因?yàn)槟淮_定此元組的狀態(tài)是否已在之前更新過。如果在更新狀態(tài)之前元組已經(jīng)失敗,則重試該元組將使?fàn)顟B(tài)穩(wěn)定。然而,如果元組在更新狀態(tài)后失敗,則重試相同的元組將再次增加數(shù)據(jù)庫中的計(jì)數(shù)并使?fàn)顟B(tài)不穩(wěn)定。需要執(zhí)行以下步驟以確保消息僅處理一次 -

  • 小批量處理元組。

  • 為每個(gè)批次分配唯一的ID。如果重試批次,則給予相同的唯一ID。

  • 狀態(tài)更新在批次之間排序。例如,第二批次的狀態(tài)更新將不可能,直到第一批次的狀態(tài)更新完成為止。

分布式RPC

分布式RPC用于查詢和檢索Trident拓?fù)浣Y(jié)果。 Storm有一個(gè)內(nèi)置的分布式RPC服務(wù)器。分布式RPC服務(wù)器從客戶端接收RPC請(qǐng)求并將其傳遞到拓?fù)洹?span>拓?fù)涮幚碚?qǐng)求并將結(jié)果發(fā)送到分布式RPC服務(wù)器,分布式RPC服務(wù)器將其重定向到客戶端。Trident的分布式RPC查詢像正常的RPC查詢一樣執(zhí)行,除了這些查詢并行運(yùn)行的事實(shí)。

什么時(shí)候使用Trident?

在許多使用情況下,如果要求是只處理一次查詢,我們可以通過在Trident中編寫拓?fù)鋪韺?shí)現(xiàn)。另一方面,在Storm的情況下將難以實(shí)現(xiàn)精確的一次處理。因此,Trident將對(duì)那些需要一次處理的用例有用。Trident不適用于所有用例,特別是高性能用例,因?yàn)樗黾恿薙torm的復(fù)雜性并管理狀態(tài)。

Trident的工作實(shí)例

我們將把上一節(jié)中制定的呼叫日志分析器應(yīng)用程序轉(zhuǎn)換為Trident框架。由于其高級(jí)API,Trident應(yīng)用程序?qū)⒈绕胀L(fēng)暴更容易。Storm基本上需要執(zhí)行Trident中的Function,F(xiàn)ilter,Aggregate,GroupBy,Join和Merge操作中的任何一個(gè)。最后,我們將使用LocalDRPC類啟動(dòng)DRPC服務(wù)器,并使用LocalDRPC類的execute方法搜索一些關(guān)鍵字。

格式化呼叫信息

FormatCall類的目的是格式化包括“呼叫者號(hào)碼”和“接收者號(hào)碼”的呼叫信息。完整的程序代碼如下 -

編碼:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSVSplit

CSVSplit類的目的是基于“comma(,)”拆分輸入字符串,并發(fā)出字符串中的每個(gè)字。此函數(shù)用于解析分布式查詢的輸入?yún)?shù)。完整的代碼如下 -

編碼:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

這是主要的應(yīng)用程序。最初,應(yīng)用程序?qū)⑹褂?strong>FeederBatchSpout初始化TridentTopology并提供調(diào)用者信息。Trident拓?fù)淞骺梢允褂肨ridentTopology類的newStream方法創(chuàng)建。類似地,Trident拓?fù)銬RPC流可以使用TridentTopology類的newDRCPStream方法創(chuàng)建。可以使用LocalDRPC類創(chuàng)建一個(gè)簡(jiǎn)單的DRCP服務(wù)器。LocalDRPC有execute方法來搜索一些關(guān)鍵字。完整的代碼如下。

編碼:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

構(gòu)建和運(yùn)行應(yīng)用程序

完整的應(yīng)用程序有三個(gè)Java代碼。他們?nèi)缦?-

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令構(gòu)建應(yīng)用程序 -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

可以使用以下命令運(yùn)行應(yīng)用程序 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

輸出

一旦應(yīng)用程序啟動(dòng),應(yīng)用程序?qū)⑤敵鲇嘘P(guān)集群?jiǎn)?dòng)過程,操作處理,DRPC服務(wù)器和客戶端信息的完整詳細(xì)信息,以及最后的集群關(guān)閉過程。此輸出將顯示在控制臺(tái)上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends
以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)