package com.fwmagic.test
import com.alibaba.fastjson.{JSON, JSONException}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.slf4j.LoggerFactory
/**
* created by fwmagic
*/
object RealtimeEtl {
private val logger = LoggerFactory.getLogger(PVUV.getClass)
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "hadoop")
val conf = new SparkConf().setAppName("RealtimeEtl").setMaster("local[*]")
val spark = SparkSession.builder().config(conf).getOrCreate()
val streamContext = new StreamingContext(spark.sparkContext, Seconds(5))
//直連方式相當(dāng)于跟kafka的Topic至直接連接
//"auto.offset.reset:earliest(每次重啟重新開始消費),latest(重啟時會從最新的offset開始讀取)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "hd1:9092,hd2:9092,hd3:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "fwmagic",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("access")
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
streamContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
//如果使用SparkStream和Kafka直連方式整合,生成的kafkaDStream必須調(diào)用foreachRDD
kafkaDStream.foreachRDD(kafkaRDD => {
if (!kafkaRDD.isEmpty()) {
//獲取當(dāng)前批次的RDD的偏移量
val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
//拿出kafka中的數(shù)據(jù)
val lines = kafkaRDD.map(_.value())
//將lines字符串轉(zhuǎn)換成json對象
val logBeanRDD = lines.map(line => {
var logBean: LogBean = null
try {
logBean = JSON.parseObject(line, classOf[LogBean])
} catch {
case e: JSONException => {
//logger記錄
logger.error("json解析錯誤!line:" + line, e)
}
}
logBean
})
//過濾
val filteredRDD = logBeanRDD.filter(_ != null)
//將RDD轉(zhuǎn)化成DataFrame,因為RDD中裝的是case class
import spark.implicits._
val df = filteredRDD.toDF()
df.show()
//將數(shù)據(jù)寫到hdfs中:hdfs://hd1:9000/360
df.repartition(1).write.mode(SaveMode.Append).parquet(args(0))
//提交當(dāng)前批次的偏移量,偏移量最后寫入kafka
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
})
//啟動
streamContext.start()
streamContext.awaitTermination()
streamContext.stop()
}
}
case class LogBean(time:String,
longitude:Double,
latitude:Double,
openid:String,
page:String,
evnet_type:Int)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fwmagic.360</groupId>
<artifactId>fwmagic-360</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.11.7</scala.version>
<spark.version>2.2.2</spark.version>
<hadoop.version>2.7.7</hadoop.version>
<encoding>UTF-8</encoding>
</properties>
<dependencies>
<!-- 導(dǎo)入scala的依賴 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- 導(dǎo)入spark的依賴 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 導(dǎo)入spark-sql的依賴 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- spark streamingd的依賴 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- 指定hadoop-client API的版本 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.39</version>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<!-- 編譯scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<!-- 編譯java的插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
網(wǎng)頁標(biāo)題:SparkStreaming消費kafka數(shù)據(jù)
本文地址:http://jinyejixie.com/article32/ggicpc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供關(guān)鍵詞優(yōu)化、全網(wǎng)營銷推廣、手機(jī)網(wǎng)站建設(shè)、網(wǎng)站排名、網(wǎng)站收錄、電子商務(wù)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)