成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SparkStreaming整合kafka

項(xiàng)目架構(gòu)
SparkStreaming整合kafka
日志數(shù)據(jù)---->flume----->kafka-------->spark streaming---------->MySQL/redis/hbase

創(chuàng)新互聯(lián)-專業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性價(jià)比康樂網(wǎng)站開發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫,直接使用。一站式康樂網(wǎng)站制作公司更省心,省錢,快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋康樂地區(qū)。費(fèi)用合理售后完善,10余年實(shí)體公司更值得信賴。


前置條件

  • 安裝zookeeper
  • 安裝flume
  • 安裝kafak
  • hadoop實(shí)現(xiàn)高可用

(1)實(shí)現(xiàn)flume收集數(shù)據(jù)到kafka

啟動(dòng)kafak:
nohup kafka-server-start.sh \
/application/kafka_2.11-1.1.0/config/server.properties \
1>/home/hadoop/logs/kafka_std.log \
2>/home/hadoop/logs/kafka_err.log &
創(chuàng)建一個(gè)沒有的kafaktopic:
kafka-topics.sh \
--create \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \
--replication-factor 3 \
--partitions 3 \
--topic zy-flume-kafka
查看是否創(chuàng)建成功:
kafka-topics.sh \
--zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka \
--describe \
--topic zy-flume-kafka

SparkStreaming整合kafka


配置flume的采集方案
SparkStreaming整合kafka
第一級(jí):exec-avro.conf

agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
#define sources
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /application/flume-1.8.0-bin/data/sample.log
#define channels
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
#define sink
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 3212
#bind sources and sink to channel
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1

第二級(jí):avro-kafka.conf

agent2.sources = r2
agent2.channels = c2
agent2.sinks = k2
#define sources
agent2.sources.r2.type = avro
agent2.sources.r2.bind = hadoop02
agent2.sources.r2.port = 3212
#define channels
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100
#define sink
agent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092,hadoop03:9092
agent2.sinks.k2.topic = zy-flume-kafka
agent2.sinks.k2.batchSize = 4
agent2.sinks.k2.requiredAcks = 1
#bind sources and sink to channel
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2

啟動(dòng)flume
hadoop02:

flume-ng agent \
--conf /application/flume-1.8.0-bin/conf/ \
--name agent2 \
--conf-file /application/flume-1.8.0-bin/flume_sh/avro-kafka.conf \
-Dflume.root.logger=DEBUG,console

hadoop01:

flume-ng agent \
--conf /application/flume-1.8.0-bin/conf/ \
--name agent1 \
--conf-file /application/flume-1.8.0-bin/flume_sh/exec-avro.conf \
-Dflume.root.logger=DEBUG,console

注意:一定要先啟動(dòng)第二級(jí)在啟動(dòng)第一級(jí)


測(cè)試
啟動(dòng)一個(gè)kafakconsumer

kafka-console-consumer.sh \
--bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 \
--from-beginning \
--topic zy-flume-kafka

向監(jiān)控文件下添加數(shù)據(jù):tail -10 sample.temp>>sample.log
觀察kafkaconsumer:消費(fèi)到數(shù)據(jù)?。?br/>SparkStreaming整合kafka

(2)實(shí)現(xiàn)sparkStreaming讀取kafka中數(shù)據(jù)并處理

 SparkStreaming整合kafka有兩種方式:
   - receiver +checkpoint方式
   - direct +zookeeper方式

1)receiver +checkpoint方式

代碼

/**
  * 基于Receiver的方式去讀取kafka中的數(shù)據(jù)
  */
object _01SparkKafkaReceiverOps {
    def main(args: Array[String]): Unit = {
        //判斷程序傳入的參數(shù)個(gè)數(shù)是否正確
        //2 hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka first zy-flume-kafka
        if (args == null || args.length < 4) {
            println(
                """
                  |Parameter Errors! Usage: <batchInterval> <zkQuorum> <groupId> <topics>
                  |batchInterval        : 批次間隔時(shí)間
                  |zkQuorum             : zookeeper url地址
                  |groupId              : 消費(fèi)組的id
                  |topic                : 讀取的topic
                """.stripMargin)
            System.exit(-1)
        }
        //獲取程序傳入的參數(shù)
        val Array(batchInterval, zkQuorum, groupId, topic) = args
        //1.構(gòu)建程序入口
        val conf: SparkConf = new SparkConf()
            .setMaster("local[2]")
            .setAppName("_01SparkKafkaReceiverOps")
        val ssc =new StreamingContext(conf,Seconds(2))
        /**2.使用Receiver方式讀取數(shù)據(jù)
          * @param ssc
          * @param zkQuorum
          * @param groupId
          * @param topics
          * @param storageLevel  default: StorageLevel.MEMORY_AND_DISK_SER_2
          * @return DStream of (Kafka message key, Kafka message value)
          */
        val topics = topic.split("\\s+").map((_,3)).toMap
        //2.讀取數(shù)據(jù)
        val message: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc,zkQuorum,groupId,topics)
        //3.打印數(shù)據(jù)
        message.print()
        //4.提交任務(wù)
        ssc.start()
        ssc.awaitTermination()
    }
}

注意(receiver +checkpoint):
 - kafka中的topic和sparkstreaming中生成的RDD分區(qū)沒有關(guān)系,在KafkaUtils.createStream中增加分區(qū)數(shù)只會(huì)增加單個(gè)receiver的線程數(shù),不會(huì)增加spark的并行度
 - 可以創(chuàng)建多個(gè)kafka的輸入DStream,使用不同的group和topic,使用多個(gè)receiver并行接收數(shù)據(jù)
 - 如果啟用了HDFS等有容錯(cuò)的存儲(chǔ)系統(tǒng),并且啟用了寫入日,則接收到的數(shù)據(jù)已經(jīng)被復(fù)制到日志中。

2)direct +zookeeper方式

代碼實(shí)現(xiàn)

package com.zy.streaming

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用zk來管理的消費(fèi)的偏移量,確保當(dāng)SparkStreaming掛掉之后在重啟的時(shí)候,
  * 能夠從正確的offset偏移量的位置開始消費(fèi),而不是從頭開始消費(fèi)
  */
object  SparkStreamingDriverHAOps {
    //設(shè)置zookeeper中存放偏移量的位置
    val zkTopicOffsetPath="/offset"
    //獲取zookeeper的編程入口
    val client:CuratorFramework={
        val client=CuratorFrameworkFactory.builder()
                .connectString("hadoop01:2181,hadoop02:2181,hadoop03:2181/kafka")
                .namespace("2019_1_7")
            .retryPolicy(new ExponentialBackoffRetry(1000,3))
            .build()
        client.start()
        client
    }

    def main(args: Array[String]): Unit = {
        //屏蔽日志
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        //2 direct zy-flume-kafka
        if(args==null||args.length<3){
            println(
                """
                  |Parameter Errors! Usage: <batchInterval> <groupId> <topics>
                  |batchInterval        : 批次間隔時(shí)間
                  |groupId              : 消費(fèi)組的id
                  |topic                : 讀取的topic
                """.stripMargin)
            System.exit(-1)
        }
        //獲取傳入的參數(shù)
        val Array(batchInterval,groupId,topic)=args
        //1.構(gòu)建程序入口
        val conf: SparkConf = new SparkConf()
            .setMaster("local[2]")
            .setAppName("SparkStreamingDriverHAOps")
        val ssc =new StreamingContext(conf,Seconds(batchInterval.toLong))
        //連接kafka的參數(shù)
        val kafkaParams=Map(
            "bootstrap.servers"->"hadoop01:9092,hadoop02:9092,hadoop03:9092", //集群入口
            "auto.offset.reset"->"smallest" //消費(fèi)方式
        )
        //2.創(chuàng)建kafka的message
        val message:DStream[(String,String)]=createMessage(topic,groupId,ssc,kafkaParams)
        //3.業(yè)務(wù)處理,這里主要是介紹如何kafka整合sparkStreaming,所以這里不做業(yè)務(wù)處理
        message.foreachRDD(rdd=>{
            if(!rdd.isEmpty()){
                println(
                    """
                      |####################>_<####################
                    """.stripMargin+rdd.count())
            }
            //更新偏移量
            storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges,groupId)
        })
        //4.啟動(dòng)程序
        ssc.start()
        ssc.awaitTermination()
    }

    /**
      * 創(chuàng)建kafka對(duì)應(yīng)的message
      * 分兩種情況:
      *  1.第一次消費(fèi)的時(shí)候,從zk中讀取不到偏移量
      *  2.之后的消費(fèi)從zk中才能讀取到偏移量
      */
    def createMessage(topic: String, groupId: String, ssc: StreamingContext, kafkaParams: Map[String, String]): InputDStream[(String, String)] = {
        //獲取偏移量,以及判斷是否是第一次消費(fèi)
        val (fromOffsets,flag)=getFromOffsets(topic, groupId)
        var message:InputDStream[(String, String)] = null
        //構(gòu)建kafka對(duì)應(yīng)的message
        if(flag){ //標(biāo)記位,使用zk中得到的對(duì)應(yīng)的partition偏移量信息,如果有為true
            /**
              * recordClass: Class[R],
              * kafkaParams: JMap[String, String],
              * fromOffsets: JMap[TopicAndPartition, JLong],
              * messageHandler: JFunction[MessageAndMetadata[K, V], R]
              */
            val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
            message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String,String)](ssc,kafkaParams,fromOffsets,messageHandler)
        }else{  //如果是第一次讀取為false
            /**
              * createDirectStream[
              * String, key的類型
              * String, value的類型
              * StringDecoder, key的序列化的類型
              * StringDecoder] value的序列化的類型
              *
              */
            message=KafkaUtils.createDirectStream[String,
                String,
                StringDecoder
                , StringDecoder](ssc,kafkaParams,topic.split("\\s+").toSet)
        }
        message
    }

    //獲取對(duì)應(yīng)的topic中的每一個(gè)partition的偏移量信息
    def getFromOffsets(topic: String, groupId: String):(Map[TopicAndPartition, Long], Boolean)= {
        //構(gòu)建存儲(chǔ)offset的位置信息的路徑
        val zkPath=s"${zkTopicOffsetPath}/${topic}/${groupId}"
        //判斷當(dāng)前路徑是否存在,不存在則創(chuàng)建
        nsureZKPathExists(zkPath)

        //獲取所有分區(qū)中存儲(chǔ)的offset信息
        import scala.collection.JavaConversions._
        val offsets=for{p<-client.getChildren.forPath(zkPath)}yield{
            val offset=new String(client.getData.forPath(s"${zkPath}/${p}")).toLong
            (new TopicAndPartition(topic,p.toInt),offset)
        }
        //如果未空表示第一次讀取,無偏移量信息
        if(offsets.isEmpty){
            (offsets.toMap,false)
        }else{
            (offsets.toMap,true)
        }
    }

    def storeOffsets(offsetRanges: Array[OffsetRange], groupId: String): Unit = {
        for(offsetRange<-offsetRanges){
            val partition=offsetRange.partition
            val topic=offsetRange.topic
            //獲取偏移量
            val offset=offsetRange.untilOffset
            //構(gòu)建存放偏移量的znode
            val path=s"${zkTopicOffsetPath}/${topic}/${groupId}/${partition}"
            //判斷是否存在,不存在則創(chuàng)建
            nsureZKPathExists(path)
            client.setData().forPath(path,(""+offset).getBytes())
        }
    }
    def nsureZKPathExists(zkPath: String) = {
        //如果為空的話就創(chuàng)建
        if(client.checkExists().forPath(zkPath)==null){
            //如果父目錄不存在,連父目錄一起創(chuàng)建
            client.create().creatingParentsIfNeeded().forPath(zkPath)
        }
    }
}

注意(direct +zookeeper):
 - 不需要?jiǎng)?chuàng)建多個(gè)輸入kafka流并將其合并,使用directStream,spark Streaming將創(chuàng)建于使用kafka分區(qū)一樣多的RDD分區(qū),這些分區(qū)的數(shù)據(jù)全部從kafka并行讀取數(shù)據(jù),kafka和RDD分區(qū)之間有一對(duì)一的映射關(guān)系。
 - Direct方式?jīng)]有接收器,不需要預(yù)先寫入日志,只要kafka數(shù)據(jù)保留時(shí)間足夠長(zhǎng)就行
 - 保證了正好一次的消費(fèi)語義(offset保存在zookeeper中)

分享標(biāo)題:SparkStreaming整合kafka
網(wǎng)頁網(wǎng)址:http://jinyejixie.com/article44/posdee.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)頁設(shè)計(jì)公司、微信公眾號(hào)關(guān)鍵詞優(yōu)化、軟件開發(fā)、Google、電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司
北票市| 静宁县| 晋城| 渭源县| 长治县| 普陀区| 介休市| 霸州市| 墨脱县| 武冈市| 晋城| 涪陵区| 安远县| 精河县| 济源市| 宁强县| 肇东市| 衡东县| 辽中县| 灵璧县| 湘阴县| 开阳县| 萨嘎县| 鄯善县| 辽阳县| 成武县| 磐安县| 赣州市| 武鸣县| 泸溪县| 寻乌县| 阿鲁科尔沁旗| 沧州市| 西盟| 鲁甸县| 华容县| 宁国市| 唐海县| 苍溪县| 虎林市| 沙坪坝区|