??Spark Streaming是Spark核心API的擴(kuò)展,可以實(shí)現(xiàn)可伸縮、高吞吐量、具備容錯(cuò)機(jī)制的實(shí)時(shí)流時(shí)數(shù)據(jù)的處理。支持多種數(shù)據(jù)源,比如Kafka、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets。
和順網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián),和順網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為和順1000+提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢(qián),請(qǐng)找那個(gè)售后服務(wù)好的和順做網(wǎng)站的公司定做!
??可以使用諸如map、reduce、join和window等高級(jí)函數(shù)進(jìn)行復(fù)雜算法(比如,機(jī)器學(xué)習(xí)和圖計(jì)算)的處理。最后還可以將處理結(jié)果存儲(chǔ)到文件系統(tǒng),數(shù)據(jù)庫(kù)和儀表盤(pán)。
??Spark Streaming接收實(shí)時(shí)流的數(shù)據(jù),并根據(jù)一定的時(shí)間間隔拆分成一批批的數(shù)據(jù),然后通過(guò)Spark Engine處理這些批數(shù)據(jù),最終得到處理后的一批批結(jié)果數(shù)據(jù)。
??Spark Streaming提供了一個(gè)叫做DStream(discretized stream,離散流)的抽象概念,DStream由一系列的RDD組成,表示每個(gè)批次中連續(xù)的數(shù)據(jù)流。DStream可以從輸入源(比如,Kafka、Flume、Kinesis等)中創(chuàng)建,也可以從其他DStream中使用高級(jí)算子操作轉(zhuǎn)換生成。
??DStream的所有操作其實(shí)都是對(duì)DStream中所有RDD的操作。比如,在單詞統(tǒng)計(jì)案例中,flatMap轉(zhuǎn)化操作會(huì)應(yīng)用到每個(gè)行RDD上來(lái)生成單詞RDD。
Receiver:Spark Streaming內(nèi)置的數(shù)據(jù)流接收器或自定義接收器,用于從數(shù)據(jù)源接收源源不斷的數(shù)據(jù)流。
CurrentBuffer:用于緩存輸入流接收器接收的數(shù)據(jù)流。
BlockIntervalTimer:一個(gè)定時(shí)器,用于將CurrentBuffer中緩存的數(shù)據(jù)流封裝為Block后放入blocksForPushing隊(duì)列中。
BlocksForPushing:待處理的Block
BlockPushingThread:此線程每隔100毫秒從BlocksForPushing隊(duì)列中取出一個(gè)Block存入存儲(chǔ)系統(tǒng),并緩存到ReceivedBlockQueue隊(duì)列中。
Block Batch:Block批次,按照批次時(shí)間間隔,從ReceivedBlockQueue隊(duì)列中獲取一批Block。
??DStream轉(zhuǎn)化操作分為無(wú)狀態(tài)(stateless)和有狀態(tài)(stateful)兩種。
無(wú)狀態(tài)轉(zhuǎn)化操作中,每個(gè)批次的處理不依賴于之前批次的數(shù)據(jù)。
??無(wú)狀態(tài)轉(zhuǎn)化操作就是把簡(jiǎn)單的RDD轉(zhuǎn)化操作應(yīng)用到每個(gè)批次上,轉(zhuǎn)化DStream中的每個(gè)RDD。
常用的無(wú)狀態(tài)轉(zhuǎn)化操作
函數(shù)名稱 | 作用 | scala示例 |
---|---|---|
map() | 對(duì)DStream中的每個(gè)元素應(yīng)用指定函數(shù),返回由各元素輸出的元素組成的DStream | ds.map(x => x+1) |
flatMap() | 對(duì)DStream中的每個(gè)元素應(yīng)用指定函數(shù),返回由各元素輸出的迭代器組成的DStream | ds.flatMap(x => x.split(" ")) |
filter | 返回由給定DStream中通過(guò)篩選的元素組成的DStream | ds.filter(x => x!=1) |
repartition() | 改變DStream的分區(qū)數(shù) | ds.repartition(10) |
reduceByKey | 將每個(gè)批次中鍵相同的記錄聚合 | ds.reduceByKey((x,y) => x+y) |
groupByKey | 將每個(gè)批次中的記錄根據(jù)鍵分組 | ds.groupByKey() |
使用map()和reduceByKey()在每個(gè)時(shí)間區(qū)間中對(duì)日志根據(jù)IP地址進(jìn)行計(jì)數(shù)。
//假設(shè)ApacheAccessingLog是用來(lái)從Apache日志中解析條目的工具類
val accessLogDStream = logData.map(line => ApacheAccessingLog.parseFromLogLine(line))
val ipDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), 1)
val ipCountsDStream = ipDStream.reduceByKey((x,y) => x+y)
//假設(shè)ApacheAccessingLog是用來(lái)從Apache日志中解析條目的工具類
static final class IpTuple implements PairFunction<ApacheAccessLog, String, Long> {
public Tuple2<String, Long> call(ApacheAccessLog log) {
return new Tuple2<>(log.getIpAddress(), 1L);
}
}
JavaDStream<ApacheAccessLog> accessLogDStream = logData.map(new ParseFromLogLine());
JavaPairDStream<String, Long> ipDStream = accessLogDStream.mapToPair(new IpTuple());
JavaPairDStream(String, Long) ipCountsDStream = ipDStream.reduceByKey(new LongSumReducer());
以IP地址為鍵,將請(qǐng)求計(jì)數(shù)的數(shù)據(jù)和傳輸數(shù)據(jù)量的數(shù)據(jù)連接起來(lái)
val ipBytesDStream = accessLogsDStream.map(entry => (entry.getIpAddress(), entry.getContentSize()))
val ipBytesSumDStream = ipBytesDStream.reduceByKey((x,y) => x+y)
val ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream)
JavaPairDStream<String, Long> ipBytesDStream = accessLogsDStream.mapToPair(new IpContentTuple());
JavaPairDStream<String, Long> ipBytesSumDStream = ipBytesDStream.reduceByKey(new LongSumReducer());
JavaPairDStream<String, Tuple2<Long,Long>> ipBytesRequestCountDStream = ipCountsDStream.join(ipBytesSumDStream);
使用transform()操作實(shí)現(xiàn)自定義轉(zhuǎn)化操作,從日志記錄中提取異常值。
val outlierDStream = accessLogsDStream.transform{
rdd => extractOutliers(rdd)
}
JavaPairDStream<String, Long> ipRawDStream = accessLogsDStream.transform(
new Function<JavaPairRDD<ApacheAccessLog>, JavaRDD<ApacheAccessLog>>() {
public JavaPairRDD<ApacheAccessLog> call(JavaRDD<ApacheAccessLog> rdd) {
return extractOutliers(rdd);
}
}
);
??DStream的有狀態(tài)轉(zhuǎn)化操作是跨時(shí)間區(qū)間跟蹤數(shù)據(jù)的操作,先前批次的數(shù)據(jù)也被用來(lái)在新的批次中計(jì)算結(jié)果。
??有狀態(tài)轉(zhuǎn)化操作主要有兩種類型:滑動(dòng)窗口和updateStateByKey()。前者以一個(gè)時(shí)間階段為滑動(dòng)窗口進(jìn)行操作,后者用來(lái)跟蹤每個(gè)鍵的狀態(tài)變化。
??有狀態(tài)轉(zhuǎn)化操作需要在StreamingContext中打開(kāi)檢查點(diǎn)機(jī)制確保容錯(cuò)性。
ssc.checkpoint("hdfs://...")
??基于窗口的操作會(huì)在一個(gè)比StreamingContext批次間隔更長(zhǎng)的時(shí)間范圍內(nèi),通過(guò)整合多個(gè)批次的結(jié)果,計(jì)算出整個(gè)窗口的結(jié)果。
??基于窗口的轉(zhuǎn)化操作需要兩個(gè)參數(shù),分別是窗口時(shí)長(zhǎng)和滑動(dòng)時(shí)長(zhǎng)。兩者都是批次間隔的整數(shù)倍。
窗口時(shí)長(zhǎng):控制每次計(jì)算最近的windowDuration/batchInterval個(gè)批次的數(shù)據(jù)。
使用window()對(duì)窗口進(jìn)行計(jì)數(shù)
val accessLogsWindow = accessLogsDStream.window(Seconds(30), Seconds(10))
val windowCounts = accessLogsWindow.count()
JavaDStream<ApacheAccessLog> accessLogsWindow = accessLogsDStream.window(Durations.seconds(30), Duration.seconds(10));
JavaDStream<Integer> windowCounts = accessLogsWindow.count();
使用reduceByKeyAndWindow對(duì)每個(gè)IP地址的訪問(wèn)量計(jì)數(shù)
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x,y) => x+y}, //加入新進(jìn)入窗口的批次中的元素
{(x,y) => x-y}, //移除離開(kāi)窗口的老批次中的元素
Seconds(30), //窗口時(shí)長(zhǎng)
Seconds(10) //滑動(dòng)步長(zhǎng)
)
class ExtractIp extends PairFunction<ApacheAccessLog, String, Long> {
public Tuple2<String, Long> call(ApacheAccessLog entry) {
return new Tuple2(entry.getIpAddress(), 1L);
}
}
class AddLongs extends Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) {
return v1 + v2;
}
}
class SubtractLongs extends Function2<Long, Long, Long>() {
public Long call(Long v1, Long v2) {
return v1 - v2;
}
}
JavaPairDStream<String, Long> ipAddressPairDStream = accessLogsDStream.mapToPair(new ExtractIp());
JavaPairDStream<String, Long> ipCountDStream = ipAddressPairDStream.reduceByKeyAndWindow(
new AddLongs(), //加上新進(jìn)入窗口的批次中的元素
new SubtractLongs(), //移除離開(kāi)窗口的老批次中的元素
Durations.seconds(30), //窗口時(shí)長(zhǎng)
Durations.seconds(10) //滑動(dòng)步長(zhǎng)
)
使用countByWindow和countByValueAndWindow對(duì)窗口計(jì)數(shù)
scala
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddre***equestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
JavaDStream<String> ip = accessLogsDStream.map(new Function<ApacheAccessLog, String>() {
public String call(ApacheAccessLog entry) {
return entry.getIpAddress();
}
});
JavaDStream<Long> requestCount = accessLogsDStream.countByWindow(Dirations.seconds(30), Durations.seconds(10));
JavaPairDStream<String, Long> ipAddre***equestCount = ip.countByValueAndWindow(Dirations.seconds(30), Durations.seconds(10));
??updateStateByKey提供了跨批次維護(hù)狀態(tài)的功能,用于鍵值對(duì)形式的DStream。
??updateStateByKey提供了一個(gè)update(events, oldState)函數(shù),接收與某鍵相關(guān)的事件及該鍵之前對(duì)應(yīng)的狀態(tài),返回該鍵對(duì)應(yīng)的新?tīng)顟B(tài)。
??使用updateStateByKey()跟蹤日志消息中各HTTP響應(yīng)代碼的計(jì)數(shù)。
def updateRunningSum(values: Seq[Long], state: Option[Long]) = {
Some(state.getOrElse(0L) + values.size)
}
val responseCodeDStream = accessLogsDStream.map(log => (log.getResponseCode(), 1L))
val responseCodeCountDStream = responseCodeDStream.updateStateByKey(updateRunningSum _)
class UpdateRunningSum implements Function2<List<Long>, Optional<Long>, Optional<Long>> {
public Optional<Long> call(List<Long> nums, Optional<Long> current) {
long sum = current.or(0L);
return Optional.of(sum + nums.size());
}
};
JavaPairDStream<Integer, Long> responseCodeCountDStream = accessLogsDStream.mapToPair(
new PairFunction<ApacheAccessLog, Integer, Long>() {
public Tuple2<Integer, Long> call(ApacheAccessLog log) {
return new Tuple2(log.getResponseCode(), 1L);
}
}
).updateStateByKey(new UpdateRunningSum());
??DStream行動(dòng)操作同RDD的行動(dòng)操作。比如,將DStream保存為SequenceFile文件。
val writableIpAddre***equestCount = ipAddre***equestCount.map{
(ip, count) => <new Text(ip), new LongWritable(count))
}
writableIpAddre***equestCount.saveAsHadoopFiles[SequenceFileOutputFormat[Text, LongWritable]]("outputDir", "txt")
}
JavaPairDStream<Text, LongWritable> writableDStream = ipDStream.mapToPair(
new PairFunction<Tuple2<String, Long>, Text, LongWritable>() {
public Tuple2<Text, LongWritable> call(Tuple2<String, Long> e) {
return new Tuple2(new Text(e._1()), new LongWritable(e._2()));
}
}
);
writableDStream.saveAsHadoopFiles("outputDir", "txt", Text.class, LongWritable.class, SequenceFileOutputFormat.class);
忠于技術(shù),熱愛(ài)分享。歡迎關(guān)注公眾號(hào):java大數(shù)據(jù)編程,了解更多技術(shù)內(nèi)容。
分享題目:13.sparkstreaming之快速入門(mén)
鏈接分享:http://jinyejixie.com/article28/ggegcp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供云服務(wù)器、微信公眾號(hào)、品牌網(wǎng)站設(shè)計(jì)、網(wǎng)站策劃、手機(jī)網(wǎng)站建設(shè)、品牌網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
移動(dòng)網(wǎng)站建設(shè)知識(shí)