和RDD類似,transformation允許從輸入DStream來(lái)的數(shù)據(jù)被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:
Transformation | Meaning |
---|---|
map(func) | 利用函數(shù)func 處理原DStream的每個(gè)元素,返回一個(gè)新的DStream |
flatMap(func) | 與map相似,但是每個(gè)輸入項(xiàng)可用被映射為0個(gè)或者多個(gè)輸出項(xiàng) |
filter(func) | 返回一個(gè)新的DStream,它僅僅包含源DStream中滿足函數(shù)func的項(xiàng) |
repartition(numPartitions) | 通過(guò)創(chuàng)建更多或者更少的partition改變這個(gè)DStream的并行級(jí)別(level of parallelism) |
union(otherStream) | 返回一個(gè)新的DStream,它包含源DStream和otherStream的聯(lián)合元素 |
count() | 通過(guò)計(jì)算源DStream中每個(gè)RDD的元素?cái)?shù)量,返回一個(gè)包含單元素(single-element)RDDs的新DStream |
reduce(func) | 利用函數(shù)func聚集源DStream中每個(gè)RDD的元素,返回一個(gè)包含單元素(single-element)RDDs的新DStream。函數(shù)應(yīng)該是相關(guān)聯(lián)的,以使計(jì)算可以并行化 |
countByValue() | 這個(gè)算子應(yīng)用于元素類型為K的DStream上,返回一個(gè)(K,long)對(duì)的新DStream,每個(gè)鍵的值是在原DStream的每個(gè)RDD中的頻率。 |
reduceByKey(func, [numTasks]) | 當(dāng)在一個(gè)由(K,V)對(duì)組成的DStream上調(diào)用這個(gè)算子,返回一個(gè)新的由(K,V)對(duì)組成的DStream,每一個(gè)key的值均由給定的reduce函數(shù)聚集起來(lái)。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks 參數(shù)設(shè)置不同的任務(wù)數(shù) |
join(otherStream, [numTasks]) | 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含(K,W)對(duì)),返回一個(gè)包含(K, (V, W))對(duì)的新DStream |
cogroup(otherStream, [numTasks]) | 當(dāng)應(yīng)用于兩個(gè)DStream(一個(gè)包含(K,V)對(duì),一個(gè)包含(K,W)對(duì)),返回一個(gè)包含(K, Seq[V], Seq[W])的元組 |
transform(func) | 通過(guò)對(duì)源DStream的每個(gè)RDD應(yīng)用RDD-to-RDD函數(shù),創(chuàng)建一個(gè)新的DStream。這個(gè)可以在DStream中的任何RDD操作中使用 |
updateStateByKey(func) | 利用給定的函數(shù)更新DStream的狀態(tài),返回一個(gè)新"state"的DStream。 |
最后兩個(gè)transformation算子需要重點(diǎn)介紹一下:
updateStateByKey操作允許不斷用新信息更新它的同時(shí)保持任意狀態(tài)。你需要通過(guò)兩步來(lái)使用它
讓我們舉個(gè)例子說(shuō)明。在例子中,你想保持一個(gè)文本數(shù)據(jù)流中每個(gè)單詞的運(yùn)行次數(shù),運(yùn)行次數(shù)用一個(gè)state表示,它的類型是整數(shù)
def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
val newCount = ... // add the new values with the previous running count to get the new count
Some(newCount)
}
這個(gè)函數(shù)被用到了DStream包含的單詞上
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
更新函數(shù)將會(huì)被每個(gè)單詞調(diào)用,newValues
擁有一系列的1(從 (詞, 1)對(duì)而來(lái)),runningCount擁有之前的次數(shù)。要看完整的代碼,見例子
transform
操作(以及它的變化形式如transformWith
)允許在DStream運(yùn)行任何RDD-to-RDD函數(shù)。它能夠被用來(lái)應(yīng)用任何沒在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,連接數(shù)據(jù)流中的每個(gè)批(batch)和另外一個(gè)數(shù)據(jù)集的功能并沒有在DStream API中提供,然而你可以簡(jiǎn)單的利用transform
方法做到。如果你想通過(guò)連接帶有預(yù)先計(jì)算的垃圾郵件信息的輸入數(shù)據(jù)流來(lái)清理實(shí)時(shí)數(shù)據(jù),然后過(guò)了它們,你可以按如下方法來(lái)做:
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd => {
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
})
事實(shí)上,你也可以在transform
方法中用機(jī)器學(xué)習(xí)和圖計(jì)算算法
Spark Streaming也支持窗口計(jì)算,它允許你在一個(gè)滑動(dòng)窗口數(shù)據(jù)上應(yīng)用transformation算子。下圖闡明了這個(gè)滑動(dòng)窗口。
如上圖顯示,窗口在源DStream上滑動(dòng),合并和操作落入窗內(nèi)的源RDDs,產(chǎn)生窗口化的DStream的RDDs。在這個(gè)具體的例子中,程序在三個(gè)時(shí)間單元的數(shù)據(jù)上進(jìn)行窗口操作,并且每?jī)蓚€(gè)時(shí)間單元滑動(dòng)一次。這說(shuō)明,任何一個(gè)窗口操作都需要指定兩個(gè)參數(shù):
這兩個(gè)參數(shù)必須是源DStream的批時(shí)間間隔的倍數(shù)。
下面舉例說(shuō)明窗口操作。例如,你想擴(kuò)展前面的例子用來(lái)計(jì)算過(guò)去30秒的詞頻,間隔時(shí)間是10秒。為了達(dá)到這個(gè)目的,我們必須在過(guò)去30秒的pairs
DStream上應(yīng)用reduceByKey
操作。用方法reduceByKeyAndWindow
實(shí)現(xiàn)。
// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些常用的窗口操作如下所示,這些操作都需要用到上文提到的兩個(gè)參數(shù):窗口長(zhǎng)度和滑動(dòng)的時(shí)間間隔
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | 基于源DStream產(chǎn)生的窗口化的批數(shù)據(jù)計(jì)算一個(gè)新的DStream |
countByWindow(windowLength, slideInterval) | 返回流中元素的一個(gè)滑動(dòng)窗口數(shù) |
reduceByWindow(func, windowLength, slideInterval) | 返回一個(gè)單元素流。利用函數(shù)func聚集滑動(dòng)時(shí)間間隔的流的元素創(chuàng)建這個(gè)單元素流。函數(shù)必須是相關(guān)聯(lián)的以使計(jì)算能夠正確的并行計(jì)算。 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每一個(gè)key的值均由給定的reduce函數(shù)聚集起來(lái)。注意:在默認(rèn)情況下,這個(gè)算子利用了Spark默認(rèn)的并發(fā)任務(wù)數(shù)去分組。你可以用numTasks 參數(shù)設(shè)置不同的任務(wù)數(shù) |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 應(yīng)用到一個(gè)(K,V)對(duì)組成的DStream上,返回一個(gè)由(K,V)對(duì)組成的新的DStream。每個(gè)key的值都是它們?cè)诨瑒?dòng)窗口中出現(xiàn)的頻率。 |
更多建議: