成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Sparkdriver端得到executor返回值的方法

這篇文章將為大家詳細(xì)講解有關(guān)Spark driver端得到executor返回值的方法,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

創(chuàng)新互聯(lián)從2013年開(kāi)始,是專(zhuān)業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都做網(wǎng)站、成都網(wǎng)站設(shè)計(jì)、成都外貿(mào)網(wǎng)站建設(shè)網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元靜樂(lè)做網(wǎng)站,已為上家服務(wù),為靜樂(lè)各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:18982081108

有人說(shuō)spark的代碼不優(yōu)雅,這個(gè)浪尖就忍不了了。實(shí)際上,說(shuō)spark代碼不優(yōu)雅的主要是對(duì)scala不熟悉,spark代碼我覺(jué)得還是很贊的,最值得閱讀的大數(shù)據(jù)框架之一。

今天這篇文章不是為了爭(zhēng)辯Spark 代碼優(yōu)雅與否,主要是講一下理解了spark源碼之后我們能使用的一些小技巧吧。

spark 使用的時(shí)候,總有些需求比較另類(lèi)吧,比如有球友問(wèn)過(guò)這樣一個(gè)需求:

浪尖,我想要在driver端獲取executor執(zhí)行task返回的結(jié)果,比如task是個(gè)規(guī)則引擎,我想知道每條規(guī)則命中了幾條數(shù)據(jù),請(qǐng)問(wèn)這個(gè)怎么做呢?

這個(gè)是不是很騷氣,也很常見(jiàn),按理說(shuō)你輸出之后,在MySQL里跑條sql就行了,但是這個(gè)往往顯的比較麻煩。而且有時(shí)候,在 driver可能還要用到這些數(shù)據(jù)呢?具體該怎么做呢?

大部分的想法估計(jì)是collect方法,那么用collect如何實(shí)現(xiàn)呢?大家自己可以考慮一下,我只能告訴你不簡(jiǎn)單,不如輸出到數(shù)據(jù)庫(kù)里,然后driver端寫(xiě)sql分析一下。

還有一種考慮就是使用自定義累加器。這樣就可以在executor端將結(jié)果累加然后在driver端使用,不過(guò)具體實(shí)現(xiàn)也是很麻煩。大家也可以自己琢磨一下下~

那么,浪尖就給大家介紹一個(gè)比較常用也比較騷的操作吧。

其實(shí),這種操作我們最先想到的應(yīng)該是count函數(shù),因?yàn)樗褪菍ask的返回值返回到driver端,然后進(jìn)行聚合的。我們可以從idea count函數(shù)點(diǎn)擊進(jìn)去,可以看到

  def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

也即是sparkcontext的runJob方法。

Utils.getIteratorSize _這個(gè)方法主要是計(jì)算每個(gè)iterator的元素個(gè)數(shù),也即是每個(gè)分區(qū)的元素個(gè)數(shù),返回值就是元素個(gè)數(shù):

/**   * Counts the number of elements of an iterator using a while loop rather than calling   * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower   * in the current version of Scala.   */  def getIteratorSize[T](iterator: Iterator[T]): Long = {    var count = 0L    while (iterator.hasNext) {      count += 1L      iterator.next()    }    count  }

然后就是runJob返回的是一個(gè)數(shù)組,每個(gè)數(shù)組的元素就是我們task執(zhí)行函數(shù)的返回值,然后調(diào)用sum就得到我們的統(tǒng)計(jì)值了。

那么我們完全可以借助這個(gè)思路實(shí)現(xiàn)我們開(kāi)頭的目標(biāo)。浪尖在這里直接上案例了:

import org.apache.spark.{SparkConf, SparkContext, TaskContext}import org.elasticsearch.hadoop.cfg.ConfigurationOptions
object es2sparkRunJob {
 def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)
   conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")    conf.set(ConfigurationOptions.ES_PORT, "9200")    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")    conf.set("es.write.rest.error.handlers", "ignoreConflict")    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")
   val sc = new SparkContext(conf)    import org.elasticsearch.spark._
   val rdd = sc.esJsonRDD("posts").repartition(10)
   rdd.count()    val func = (itr : Iterator[(String,String)]) => {      var count = 0      itr.foreach(each=>{        count += 1      })      (TaskContext.getPartitionId(),count)    }
   val res = sc.runJob(rdd,func)
   res.foreach(println)
   sc.stop()  }}

例子中driver端獲取的就是每個(gè)task處理的數(shù)據(jù)量。

關(guān)于Spark driver端得到executor返回值的方法就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

分享題目:Sparkdriver端得到executor返回值的方法
網(wǎng)站網(wǎng)址:http://jinyejixie.com/article36/gpgepg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)公司、網(wǎng)站建設(shè)面包屑導(dǎo)航、、網(wǎng)站設(shè)計(jì)、全網(wǎng)營(yíng)銷(xiāo)推廣

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

搜索引擎優(yōu)化
隆尧县| 巨鹿县| 云梦县| 门头沟区| 清远市| 和田市| 黄浦区| 务川| 保定市| 凤阳县| 繁昌县| 城步| 静乐县| 万荣县| 来宾市| 敦化市| 翼城县| 观塘区| 大兴区| 东安县| 昭苏县| 青岛市| 临武县| 得荣县| 锡林浩特市| 郴州市| 乐至县| 湛江市| 盐源县| 阳春市| 泸水县| 年辖:市辖区| 宁安市| 图片| 昭觉县| 丹阳市| 瑞金市| 富阳市| 甘南县| 邢台县| 固原市|