成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

周期性清除SparkStreaming流狀態(tài)的方法是什么

本篇文章為大家展示了周期性清除Spark Streaming流狀態(tài)的方法是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

創(chuàng)新互聯(lián)擁有網(wǎng)站維護(hù)技術(shù)和項(xiàng)目管理團(tuán)隊(duì),建立的售前、實(shí)施和售后服務(wù)體系,為客戶提供定制化的網(wǎng)站制作、成都網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、資陽托管服務(wù)器解決方案。為客戶網(wǎng)站安全和日常運(yùn)維提供整體管家式外包優(yōu)質(zhì)服務(wù)。我們的網(wǎng)站維護(hù)服務(wù)覆蓋集團(tuán)企業(yè)、上市公司、外企網(wǎng)站、商城網(wǎng)站開發(fā)、政府網(wǎng)站等各類型客戶群體,為全球上1000家企業(yè)提供全方位網(wǎng)站維護(hù)、服務(wù)器維護(hù)解決方案。

在Spark Streaming程序中,我們經(jīng)常需要使用有狀態(tài)的流來統(tǒng)計(jì)一些累積性的指標(biāo),比如各個(gè)商品的PV。簡單的代碼描述如下,使用mapWithState()算子:

 val productPvStream = stream.mapPartitions(records => {
   var result = new ListBuffer[(String, Int)]
     for (record <- records) {
       result += Tuple2(record.key(), 1)
     }
   result.iterator
 }).reduceByKey(_ + _).mapWithState(
   StateSpec.function((productId: String, pv: Option[Int], state: State[Int]) => {
     val sum = pv.getOrElse(0) + state.getOption().getOrElse(0)
     state.update(sum)
     (productId, sum)
 })).stateSnapshots()

現(xiàn)在的問題是,PV并不是一直累加的,而是每天歸零,重新統(tǒng)計(jì)數(shù)據(jù)。要達(dá)到在凌晨0點(diǎn)清除狀態(tài)的目的,有以下兩種方法。

編寫腳本重啟Streaming程序

用crontab、Azkaban等在凌晨0點(diǎn)調(diào)度執(zhí)行下面的Shell腳本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`

if [ ${cnt} -eq 1 ]; then
 pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
 kill -9 ${pid}
 sleep 20
 cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
 if [ ${cnt} -eq 0 ]; then
   nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
 fi
fi

這種方式最簡單,也不需要對程序本身做任何改動。但隨著同時(shí)運(yùn)行的Streaming任務(wù)越來越多,就會顯得越來越累贅了。

給StreamingContext設(shè)置超時(shí)

在程序啟動之前,先計(jì)算出當(dāng)前時(shí)間點(diǎn)距離第二天凌晨0點(diǎn)的毫秒數(shù):

def msTillTomorrow = {
 val now = new Date()
 val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
 tomorrow.getTime - now.getTime
}

然后將Streaming程序的主要邏輯寫在while(true)循環(huán)中,并且不像平常一樣調(diào)用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {
   val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
   ssc.checkpoint(CHECKPOINT_DIR)

   // ...處理邏輯...

   ssc.start()
   ssc.awaitTerminationOrTimeout(msTillTomorrow)
   ssc.stop(false, true)
   Thread.sleep(BATCH_INTERVAL * 1000)
 }

在經(jīng)過msTillTomorrow毫秒之后,StreamingContext就會超時(shí),再調(diào)用其stop()方法(注意兩個(gè)參數(shù),stopSparkContext表示是否停止關(guān)聯(lián)的SparkContext,stopGracefully表示是否優(yōu)雅停止),就可以停止并重啟StreamingContext。

兩種方法都是仍然采用Spark Streaming的機(jī)制進(jìn)行狀態(tài)計(jì)算的。如果其他條件允許的話,我們還可以拋棄mapWithState(),直接借助外部存儲自己維護(hù)狀態(tài)。比如將redis的Key設(shè)計(jì)為product_pv:[product_id]:[date],然后在Spark Streaming的每個(gè)批次中使用incrby指令,就能方便地統(tǒng)計(jì)PV了,不必考慮定時(shí)的問題。

上述內(nèi)容就是周期性清除Spark Streaming流狀態(tài)的方法是什么,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

新聞標(biāo)題:周期性清除SparkStreaming流狀態(tài)的方法是什么
路徑分享:http://jinyejixie.com/article34/gpsise.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、云服務(wù)器Google、響應(yīng)式網(wǎng)站自適應(yīng)網(wǎng)站、網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司
永春县| 濮阳市| 柘城县| 宣化县| 伊宁县| 南江县| 个旧市| 嘉峪关市| 五寨县| 闽侯县| 兴安盟| 正安县| 平江县| 阿巴嘎旗| 淮滨县| 彭泽县| 白城市| 通渭县| 毕节市| 常熟市| 珲春市| 麻阳| 商洛市| 北宁市| 永登县| 金山区| 盐边县| 柞水县| 平利县| 手游| 班玛县| 江城| 秦皇岛市| 渭源县| 泊头市| 凌海市| 桐城市| 嘉荫县| 沙洋县| 灵璧县| 阿勒泰市|