成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SparkStreaming消費kafka數(shù)據(jù)

概要:本例子為SparkStreaming消費kafka消息的例子,實現(xiàn)的功能是將數(shù)據(jù)實時的進(jìn)行抽取、過濾、轉(zhuǎn)換,然后存儲到HDFS中。

實例代碼

package com.fwmagic.test

import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory

/**
  * created by fwmagic
  */
object RealtimeEtl {

  private val logger = LoggerFactory.getLogger(PVUV.getClass)

  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "hadoop")

    val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")

    val spark = SparkSession.builder().config(conf).getOrCreate()

    val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))

    //直連方式相當(dāng)于跟kafka的Topic至直接連接
    //"auto.offset.reset:earliest(每次重啟重新開始消費),latest(重啟時會從最新的offset開始讀取)
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "fwmagic",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("access")

    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      streamContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    //如果使用SparkStream和Kafka直連方式整合,生成的kafkaDStream必須調(diào)用foreachRDD
    kafkaDStream.foreachRDD(kafkaRDD => {
      if (!kafkaRDD.isEmpty()) {
        //獲取當(dāng)前批次的RDD的偏移量
        val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges

        //拿出kafka中的數(shù)據(jù)
        val lines = kafkaRDD.map(_.value())
        //將lines字符串轉(zhuǎn)換成json對象
        val logBeanRDD = lines.map(line => {
          var logBean: LogBean = null
          try {
            logBean = JSON.parseObject(line, classOf[LogBean])
          } catch {
            case e: JSONException => {
              //logger記錄
              logger.error("json解析錯誤!line:" + line, e)
            }
          }
          logBean
        })

        //過濾
        val filteredRDD = logBeanRDD.filter(_ != null)

        //將RDD轉(zhuǎn)化成DataFrame,因為RDD中裝的是case class
        import spark.implicits._

        val df = filteredRDD.toDF()

        df.show()
        //將數(shù)據(jù)寫到hdfs中:hdfs://hd1:9000/360
        df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))

        //提交當(dāng)前批次的偏移量,偏移量最后寫入kafka
        kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
      }
    })

    //啟動
    streamContext.start()
    streamContext.awaitTermination()
    streamContext.stop()

  }

}

case class LogBean(time:String,
                   longitude:Double,
                   latitude:Double,
                   openid:String,
                   page:String,
                   evnet_type:Int)

依賴環(huán)境(pom.xml)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.fwmagic.360</groupId>
    <artifactId>fwmagic-360</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <scala.version>2.11.7</scala.version>
        <spark.version>2.2.2</spark.version>
        <hadoop.version>2.7.7</hadoop.version>
        <encoding>UTF-8</encoding>
    </properties>

    <dependencies>
        <!-- 導(dǎo)入scala的依賴 -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <!-- 導(dǎo)入spark的依賴 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 導(dǎo)入spark-sql的依賴 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- spark streamingd的依賴 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- 指定hadoop-client API的版本 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.39</version>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <!-- 編譯scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <!-- 編譯java的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 打jar插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

網(wǎng)頁標(biāo)題:SparkStreaming消費kafka數(shù)據(jù)
本文地址:http://jinyejixie.com/article32/ggicpc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化、全網(wǎng)營銷推廣、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站排名、網(wǎng)站收錄、電子商務(wù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站托管運營
芜湖市| 渑池县| 怀仁县| 呼和浩特市| 阿巴嘎旗| 鄂托克前旗| 监利县| 平邑县| 闽清县| 马山县| 商城县| 卢龙县| 镇巴县| 德惠市| 桑日县| 丰顺县| 前郭尔| 嵊州市| 台北县| 汝南县| 贺兰县| 陵川县| 万安县| 阳朔县| 元阳县| 祥云县| 青铜峡市| 陵水| 乌兰察布市| 马山县| 定陶县| 潢川县| 太和县| 蛟河市| 灵寿县| 马鞍山市| 乐昌市| 巨野县| 绥中县| 台南市| 蓝田县|