成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Kafka+SparkStream+Hive的項(xiàng)目實(shí)現(xiàn)方法是什么

本篇內(nèi)容主要講解“Kafka+SparkStream+Hive的項(xiàng)目實(shí)現(xiàn)方法是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實(shí)用性強(qiáng)。下面就讓小編來帶大家學(xué)習(xí)“Kafka+SparkStream+Hive的項(xiàng)目實(shí)現(xiàn)方法是什么”吧!

十多年的石棉網(wǎng)站建設(shè)經(jīng)驗(yàn),針對(duì)設(shè)計(jì)、前端、開發(fā)、售后、文案、推廣等六對(duì)一服務(wù),響應(yīng)快,48小時(shí)及時(shí)工作處理。全網(wǎng)整合營銷推廣的優(yōu)勢(shì)是能夠根據(jù)用戶設(shè)備顯示端的尺寸不同,自動(dòng)調(diào)整石棉建站的顯示方式,使網(wǎng)站能夠適用不同顯示終端,在瀏覽器中調(diào)整網(wǎng)站的寬度,無論在任何一種瀏覽器上瀏覽網(wǎng)站,都能展現(xiàn)優(yōu)雅布局與設(shè)計(jì),從而大程度地提升瀏覽體驗(yàn)。成都創(chuàng)新互聯(lián)從事“石棉網(wǎng)站設(shè)計(jì)”,“石棉網(wǎng)站推廣”以來,每個(gè)客戶項(xiàng)目都認(rèn)真落實(shí)執(zhí)行。

目前的項(xiàng)目中需要將kafka隊(duì)列的數(shù)據(jù)實(shí)時(shí)存到hive表中。

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
  def main(args: Array[String]): Unit = {
      //    val conf = new SparkConf()
      //    conf.setMaster("local")
      //    conf.setAppName("SparkStreamingOnKafkaDirect")
      val spark = SparkSession.builder().appName("test").master("local").enableHiveSupport().getOrCreate()
      val ssc = new StreamingContext(spark.sparkContext, Durations.seconds(3))
      //設(shè)置日志級(jí)別
      ssc.sparkContext.setLogLevel("Error")

      val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "MyGroupId", //

        /**
         * 當(dāng)沒有初始的offset,或者當(dāng)前的offset不存在,如何處理數(shù)據(jù)
         * earliest :自動(dòng)重置偏移量為最小偏移量
         * latest:自動(dòng)重置偏移量為最大偏移量【默認(rèn)】
         * none:沒有找到以前的offset,拋出異常
         */
        "auto.offset.reset" -> "earliest",

        /**
         * 當(dāng)設(shè)置 enable.auto.commit為false時(shí),不會(huì)自動(dòng)向kafka中保存消費(fèi)者offset.需要異步的處理完數(shù)據(jù)之后手動(dòng)提交
         */
        "enable.auto.commit" -> (false: java.lang.Boolean) //默認(rèn)是true
      )

      //設(shè)置Kafka的topic
      val topics = Array("test")
      //創(chuàng)建與Kafka的連接,接收數(shù)據(jù)
      /*這里接收到數(shù)據(jù)的樣子
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487411604   1235    497 Kafka   Register
      2019-09-26  1569487414838   390    778  Flink   View
      */
      val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
        ssc,
        PreferConsistent, //
        Subscribe[String, String](topics, kafkaParams)
      )

      //對(duì)接收到的數(shù)據(jù)進(jìn)行處理,打印出來接收到的key跟value,最后放回的是value
      val transStrem: DStream[String] = stream.map(record => {
        val key_value = (record.key, record.value)
        println("receive message key = " + key_value._1)
        println("receive message value = " + key_value._2)
        key_value._2
      })


      //這里用了一下動(dòng)態(tài)創(chuàng)建的Schema
      val structType: StructType = StructType(List[StructField](
        StructField("Date_", StringType, nullable = true),
        StructField("Timestamp_", StringType, nullable = true),
        StructField("UserID", StringType, nullable = true),
        StructField("PageID", StringType, nullable = true),
        StructField("Channel", StringType, nullable = true),
        StructField("Action", StringType, nullable = true)
      ))

      //因?yàn)閒oreachRDD可以拿到封裝到DStream中的rdd,可以對(duì)里面的rdd進(jìn)行,
      /*代碼解釋:
          先從foreach中拿到一條數(shù)據(jù),,在函數(shù)map中對(duì)接收來的數(shù)據(jù)用 “\n” 進(jìn)行切分,放到Row中,用的是動(dòng)態(tài)創(chuàng)建Schema,因?yàn)槲覀冃枰賹?shù)據(jù)存儲(chǔ)到hive中,所以需要Schema。
          因?yàn)閙ap是transformance算子,所以用rdd.count()觸發(fā)一下
           spark.createDataFrame:創(chuàng)建一個(gè)DataFrame,因?yàn)橐?cè)一個(gè)臨時(shí)表,必須用到DataFrame
           frame.createOrReplaceTempView("t1"):注冊(cè)臨時(shí)表
             spark.sql("use spark"):使用 hive 的 spark 庫
           result.write.mode(SaveMode.Append).saveAsTable("test_kafka"):將數(shù)據(jù)放到 test_kafka 中
      */
      transStrem.foreachRDD(one => {
        val rdd: RDD[Row] = one.map({
          a =>
            val arr = a.toString.split("\t")
            Row(arr(0).toString, arr(1).toString, arr(2).toString, arr(3).toString, arr(4).toString, arr(5).toString)
        })
        rdd.count()
        val frame: DataFrame = spark.createDataFrame(rdd, structType)
        //      println(" Scheme: "+frame.printSchema())

        frame.createOrReplaceTempView("t1")
        //      spark.sql("select * from t1").show()
        spark.sql("use spark")
        spark.sql("select * from t1").
          write.mode(SaveMode.Append).saveAsTable("test_kafka")
      }
      )

      /**
       * 以上業(yè)務(wù)處理完成之后,異步的提交消費(fèi)者offset,這里將 enable.auto.commit 設(shè)置成false,就是使用kafka 自己來管理消費(fèi)者offset
       * 注意這里,獲取 offsetRanges: Array[OffsetRange] 每一批次topic 中的offset時(shí),必須從 源頭讀取過來的 stream中獲取,不能從經(jīng)過stream轉(zhuǎn)換之后的DStream中獲取。
       */
      stream.foreachRDD { rdd =>
        val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        // some time later, after outputs have completed
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
      ssc.start()
      ssc.awaitTermination()
      ssc.stop()
  }

到此,相信大家對(duì)“Kafka+SparkStream+Hive的項(xiàng)目實(shí)現(xiàn)方法是什么”有了更深的了解,不妨來實(shí)際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

網(wǎng)站標(biāo)題:Kafka+SparkStream+Hive的項(xiàng)目實(shí)現(xiàn)方法是什么
本文來源:http://jinyejixie.com/article34/gpggpe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站建設(shè)自適應(yīng)網(wǎng)站、用戶體驗(yàn)、動(dòng)態(tài)網(wǎng)站建站公司、標(biāo)簽優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

微信小程序開發(fā)
阳春市| 甘南县| 九龙城区| 南丹县| 山丹县| 长治市| 昌宁县| 石棉县| 腾冲县| 嘉鱼县| 遂平县| 辽中县| 林周县| 枣庄市| 抚顺县| 秦安县| 囊谦县| 长乐市| 收藏| 开阳县| 四会市| 根河市| 来宾市| 牙克石市| 凌海市| 田东县| 报价| 如皋市| 板桥市| 遂昌县| 肃南| 南涧| 大洼县| 邮箱| 乌苏市| 大化| 和顺县| 侯马市| 环江| 奉节县| 大庆市|