摘要
專注于為中小企業(yè)提供成都網(wǎng)站制作、網(wǎng)站建設服務,電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)贊皇免費做網(wǎng)站提供優(yōu)質(zhì)的服務。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千余家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
如果要想真正的掌握sparkSQL編程,首先要對sparkSQL的整體框架以及sparkSQL到底能幫助我們解決什么問題有一個整體的認識,然后就是對各個層級關系有一個清晰的認識后,才能真正的掌握它,對于sparkSQL整體框架這一塊,在前一個博客已經(jīng)進行過了一些介紹,如果對這塊還有疑問可以看我前一個博客:http://9269309.blog.51cto.com/9259309/1845525。本篇博客主要是對sparkSQL實戰(zhàn)進行講解和總結,而不是對sparkSQL源碼的講解,如果想看源碼的請繞道。
再多說一點,對于初學者,本人堅持的觀點是不要一上來就看源碼,這樣的效果不是很大,還浪費時間,對這個東西還沒有大致掌握,還不知道它是干什么的,上來就看源碼,門檻太高,而且看源碼對個人的提升也不是很高。我們做軟件開發(fā)的,我們開發(fā)的順序也是,首先是需求,對需求有了詳細的認識,需要解決什么問題,然后才是軟件的設計,代碼的編寫。同樣,學習框架也是,我們只有對這個框架的需求,它需要解決什么問題,它需要干什么工作,都非常了解了,然后再看源碼,這樣效果才能得到很大的提升。對于閱讀源代碼這一塊,是本人的一點看法,說的對與錯,歡迎吐槽......!
1、sparkSQL層級
當我們想用sparkSQL來解決我們的需求時,其實說簡單也簡單,就經(jīng)歷了三步:讀入數(shù)據(jù) -> 對數(shù)據(jù)進行處理 -> 寫入最后結果,那么這三個步驟用的主要類其實就三個:讀入數(shù)據(jù)和寫入最后結果用到兩個類HiveContext和SQLContext,對數(shù)據(jù)進行處理用到的是DataFrame類,此類是你把數(shù)據(jù)從外部讀入到內(nèi)存后,數(shù)據(jù)在內(nèi)存中進行存儲的基本數(shù)據(jù)結構,在對數(shù)據(jù)進行處理時還會用到一些中間類,用到時在進行講解。如下圖所示:
2、HiveContext和SQLContext
把HiveContext和SQLContext放在一起講解是因為他們是差不多的,因為HiveContext繼承自SQLContext,為什么會有兩個這樣的類,其實與hive和sql有關系的,雖然hive擁有HQL語言,但是它是一個類sql語言,和sql語言還是有差別的,有些sql語法,HQL是不支持的。所以他們還是有差別的。選擇不同的類,最后執(zhí)行的查詢引擎的驅(qū)動是不一樣的。但是對于底層是怎么區(qū)別的這里不做詳細的介紹,你就知道一點,使用不同的讀數(shù)據(jù)的類,底層會進行標記,自動識別是使用哪個類進行數(shù)據(jù)操作,然后采用不同的執(zhí)行計劃執(zhí)行操作,這點在上一篇sparkSQL整體框架中進行了介紹,這里不做介紹。當從hive庫中讀數(shù)據(jù)的時候,必須使用HiveContext來進行讀取數(shù)據(jù),不然在進行查詢的時候會出一些奇怪的錯。其他的數(shù)據(jù)源兩者都可以選擇,但是最好使用SQLContext來完成。因為其支持的sql語法更多。由于HiveContext是繼承自SQLContext,這里只對SQLContext進行詳細的介紹,但是以下這些方法是完全可以用在HiveContext中的。其實HiveContext類就擴展了SQLContext的兩個我們可以使用的方法(在看源碼時以protected和private開頭的方法都是我們不能使用的,這個是scala的控制邏輯,相反,不是以這兩個關鍵字標記的方法是我們可以直接使用的方法):analyze(tableName:String)和refreshTable(tableName:String)。
方法 | 用途 |
analyze方法 | 這個我們一般使用不到,它是來對我們寫的sql查詢語句進行分析用的,一般用不到。 |
refreshTable方法 | 當我們在sparkSQL中處理的某個表的存儲位置發(fā)生了變換,但是我們在內(nèi)存的metaData中緩存(cache)了這張表,則需要調(diào)用這個方法來使這個緩存無效,需要重新加載。 |
2.1 讀數(shù)據(jù)
我們在解決我們的需求時,首先是讀入數(shù)據(jù),需要把數(shù)據(jù)讀入到內(nèi)存中去,讀數(shù)據(jù)SQLContext提供了兩個方法,我們提供兩個數(shù)據(jù)表,為了便于演示,我采用的是用JSON格式進行存儲的,寫成這樣的格式,但是可以保存為.txt格式的文件。
1、第一種數(shù)據(jù)讀入:這種是對數(shù)據(jù)源文件進行操作。
import org.apache.spark.sql.SQLContext val sql = new SQLContext(sc) //聲明一個SQLContext的對象,以便對數(shù)據(jù)進行操作 val peopleInfo = sql.read.json("文件路徑") //其中peopleInfo返回的結果是:org.apache.spark.sql.DataFrame = // [age: bigint, id: bigint, name: string],這樣就把數(shù)據(jù)讀入到內(nèi)存中了
寫了這幾行代碼后面總共發(fā)生了什么,首先sparkSQL先找到文件,以解析json的形式進行解析,同時通過json的key形成schema,scheam的字段的順序不是按照我們讀入數(shù)據(jù)時期默認的順序,如上,其字段的順序是通過字符串的順序進行重新組織的。默認情況下,會把整數(shù)解析成bigint類型的,把字符串解析成string類型的,通過這個方法讀入數(shù)據(jù)時,返回值得結果是一個DataFrame數(shù)據(jù)類型。
DataFrame是什么?其實它是sparkSQL處理大數(shù)據(jù)的基本并且是核心的數(shù)據(jù)結構,是來存儲sparkSQL把數(shù)據(jù)讀入到內(nèi)存中,數(shù)據(jù)在內(nèi)存中進行存儲的基本數(shù)據(jù)結構。它采用的存儲是類似于數(shù)據(jù)庫的表的形式進行存儲的。我們想一想,一個數(shù)據(jù)表有幾部分組成:1、數(shù)據(jù),這個數(shù)據(jù)是一行一行進行存儲的,一條記錄就是一行,2、數(shù)據(jù)表的數(shù)據(jù)字典,包括表的名稱,表的字段和字段的類型等元數(shù)據(jù)信息。那么DataFrame也是按照行進行存儲的,這個類是Row,一行一行的進行數(shù)據(jù)存儲。一般情況下處理粒度是行粒度的,不需要對其行內(nèi)數(shù)據(jù)進行操作,如果想單獨操作行內(nèi)數(shù)據(jù)也是可以的,只是在處理的時候要小心,因為處理行內(nèi)的數(shù)據(jù)容易出錯,比如選錯數(shù)據(jù),數(shù)組越界等。數(shù)據(jù)的存儲的形式有了,數(shù)據(jù)表的字段和字段的類型都存放在哪里呢,就是schema中。我們可以調(diào)用schema來看其存儲的是什么。
peopleInfo.schema //返回的結果是:org.apache.spark.sql.types.StructType = //StructType(StructField(age,LongType,true), StructField(id,LongType,true), // StructField(name,StringType,true))
可以看出peopleInfo存儲的是數(shù)據(jù),schema中存儲的是這些字段的信息。需要注意的是表的字段的類型與scala數(shù)據(jù)類型的對應關系:bigint->Long,int -> Int,Float -> Float,double -> Double,string -> String等。一個DataFrame是有兩部分組成的:以行進行存儲的數(shù)據(jù)和scheam,schema是StructType類型的。當我們有數(shù)據(jù)而沒有schema時,我們可以通過這個形式進行構造從而形成一個DataFrame。
read函數(shù)還提供了其他讀入數(shù)據(jù)的接口:
函數(shù) | 用途 |
json(path:String) | 讀取json文件用此方法 |
table(tableName:String) | 讀取數(shù)據(jù)庫中的表 |
jdbc(url: String,table: String,predicates:Array[String],connectionProperties:Properties) | 通過jdbc讀取數(shù)據(jù)庫中的表 |
orc(path:String) | 讀取以orc格式進行存儲的文件 |
parquet(path:String) | 讀取以parquet格式進行存儲的文件 |
schema(schema:StructType) | 這個是一個優(yōu)化,當我們讀入數(shù)據(jù)的時候指定了其schema,底層就不會再次解析schema從而進行了優(yōu)化,一般不需要這樣的優(yōu)化,不進行此優(yōu)化,時間效率還是可以接受 |
2、第二種讀入數(shù)據(jù):這個讀入數(shù)據(jù)的方法,主要是處理從一個數(shù)據(jù)表中選擇部分字段,而不是選擇表中的所有字段。那么這種需求,采用這個數(shù)據(jù)讀入方式比較有優(yōu)勢。這種方式是直接寫sql的查詢語句。把上述json格式的數(shù)據(jù)保存為數(shù)據(jù)庫中表的格式。需要注意的是這種只能處理數(shù)據(jù)庫表數(shù)據(jù)。
val peopleInfo = sql.sql(""" |select | id, | name, | age |from peopleInfo """.stripMargin)//其中stripMargin方法是來解析我們寫的sql語句的。 //返回的結果是和read讀取返回的結果是一樣的: //org.apache.spark.sql.DataFrame = // [age: bigint, id: bigint, name: string]
需要注意的是其返回的schmea中字段的順序和我們查詢的順序還是不一致的。
2.2 寫入數(shù)據(jù)
寫入數(shù)據(jù)就比較的簡單,因為其擁有一定的模式,按照這個模式進行數(shù)據(jù)的寫入。一般情況下,我們需要寫入的數(shù)據(jù)是一個DataFrame類型的,如果其不是DataFrame類型的我們需要把其轉(zhuǎn)換為
DataFrame類型,有些人可能會有疑問,數(shù)據(jù)讀入到內(nèi)存中,其類型是DataFrame類型,我們在處理數(shù)據(jù)時用到的是DataFrame類中的方法,但是DataFrame中的方法不一定返回值仍然是DataFrame類型的,同時有時我們需要構建自己的類型,所以我們需要為我們的數(shù)據(jù)構建成DataFrame的類型。把沒有schema的數(shù)據(jù),構建schema類型,我所知道的就有兩種方法。
1、通過類構建schema,還以上面的peopleInfo為例子。
val sql = new SQLContext(sc) //創(chuàng)建一個SQLContext對象 import sql.implicits._ //這個sql是上面我們定義的sql,而不是某一個jar包,網(wǎng)上有很多 //是import sqlContext.implicits._,那是他們定義的是 //sqlContext = SQLContext(sc),這個是scala的一個特性 val people = sc.textFile("people.txt")//我們采用spark的類型讀入數(shù)據(jù),因為如果用 //SQLContext進行讀入,他們自動有了schema case clase People(id:Int,name:String,age:Int)//定義一個類 val peopleInfo = people.map(lines => lines.split(",")) .map(p => People(p(0).toInt,p(1),p(2).toInt)).toDF //這樣的一個toDF就創(chuàng)建了一個DataFrame,如果不導入 //sql.implicits._,這個toDF方法是不可以用的。
上面的例子是利用了scala的反射技術,生成了一個DataFrame類型??梢钥闯鑫覀兪前裄DD給轉(zhuǎn)換為DataFrame的。
2、直接構造schema,以peopelInfo為例子。直接構造,我們需要把我們的數(shù)據(jù)類型進行轉(zhuǎn)化成Row類型,不然會報錯。
val sql = new SQLContext(sc) //創(chuàng)建一個SQLContext對象 val people = sc.textFile("people.txt").map(lines => lines.split(",")) val peopleRow = sc.map(p => Row(p(0),p(1),(2)))//把RDD轉(zhuǎn)化成RDD(Row)類型 val schema = StructType(StructFile("id",IntegerType,true):: StructFile("name",StringType,true):: StructFile("age",IntegerType,true)::Nil) val peopleInfo = sql.createDataFrame(peopleRow,schema)//peopleRow的每一行的數(shù)據(jù) //類型一定要與schema的一致 //否則會報錯,說類型無法匹配 //同時peopleRow每一行的長度 //也要和schema一致,否則 //也會報錯
構造schema用到了兩個類StructType和StructFile,其中StructFile類的三個參數(shù)分別是(字段名稱,類型,數(shù)據(jù)是否可以用null填充)
采用直接構造有很大的制約性,字段少了還可以,如果有幾十個甚至一百多個字段,這種方法就比較耗時,不僅要保證Row中數(shù)據(jù)的類型要和我們定義的schema類型一致,長度也要一樣,不然都會報錯,所以要想直接構造schema,一定要細心細心再細心,本人就被自己的不細心虐慘了,處理的字段將近一百,由于定義的schema和我的數(shù)據(jù)類型不一致,我就需要每一個字段每一個字段的去確認,字段一多在對的時候就容易疲勞,就這樣的一個錯誤,由于本人比較笨,就花費了一個下午的時間,所以字段多了,在直接構造schema的時候,一定要細心、細心、細心,重要的事情說三遍,不然會死的很慘。
好了,現(xiàn)在我們已經(jīng)把我們的數(shù)據(jù)轉(zhuǎn)化成DataFrame類型的,下面就要往數(shù)據(jù)庫中寫我們的數(shù)據(jù)了
寫數(shù)據(jù)操作:
val sql = new SQLContext(sc) val people = sc.textFile("people.txt").map(lines => lines.split(",")) val peopleRow = sc.map(p => Row(p(0),p(1),(2))) val schema = StructType(StructFile("id",IntegerType,true):: StructFile("name",StringType,true):: StructFile("age",IntegerType,true)::Nil) val peopleInfo = sql.createDataFrame(peopleRow,schema) peopleInfo.registerTempTable("tempTable")//只有有了這個注冊的表tempTable,我們 //才能通過sql.sql(“”“ ”“”)進行查詢 //這個是在內(nèi)存中注冊一個臨時表用戶查詢 sql.sql.sql(""" |insert overwrite table tagetTable |select | id, | name, | age |from tempTable """.stripMargin)//這樣就把數(shù)據(jù)寫入到了數(shù)據(jù)庫目標表tagetTable中
有上面可以看到,sparkSQL的sql()其實就是用來執(zhí)行我們寫的sql語句的。
好了,上面介紹了讀和寫的操作,現(xiàn)在需要對最重要的地方來進行操作了啊。
2.3 通過DataFrame中的方法對數(shù)據(jù)進行操作
在介紹DataFrame之前,我們還是要先明確一下,sparkSQL是用來干什么的,它主要為我們提供了怎樣的便捷,我們?yōu)槭裁匆盟?。它是為了讓我們能用寫代碼的形式來處理sql,這樣說可能有點不準確,如果就這么簡單,只是對sql進行簡單的替換,要是我,我也不學習它,因為我已經(jīng)會sql了,會通過sql進行處理數(shù)據(jù)倉庫的etl,我還學習sparkSQL干嘛,而且學習的成本又那么高。sparkSQL肯定有好處了,不然也不會有這篇博客啦。我們都知道通過寫sql來進行數(shù)據(jù)邏輯的處理時有限的,寫程序來進行數(shù)據(jù)邏輯的處理是非常靈活的,所以sparkSQL是用來處理那些不能夠用sql來進行處理的數(shù)據(jù)邏輯或者用sql處理起來比較復雜的數(shù)據(jù)邏輯。一般的原則是能用sql來處理的,盡量用sql來處理,畢竟開發(fā)起來簡單,sql處理不了的,再選擇用sparkSQL通過寫代碼的方式來處理。好了廢話不多說了,開始DataFrame之旅。
sparkSQL非常強大,它提供了我們sql中的正刪改查所有的功能,每一個功能都對應了一個實現(xiàn)此功能的方法。
對schema的操作
val sql = new SQLContext(sc) val people = sql.read.json("people.txt")//people是一個DataFrame類型的對象 //數(shù)據(jù)讀進來了,那我們查看一下其schema吧 people.schema //返回的類型 //org.apache.spark.sql.types.StructType = //StructType(StructField(age,LongType,true), // StructField(id,LongType,true), // StructField(name,StringType,true)) //以數(shù)組的形式分會schema people.dtypes //返回的結果: //Array[(String, String)] = // Array((age,LongType), (id,LongType), (name,StringType)) //返回schema中的字段 people.columns //返回的結果: //Array[String] = Array(age, id, name) //以tree的形式打印輸出schema people.printSchema //返回的結果: //root // |-- age: long (nullable = true) // |-- id: long (nullable = true) // |-- name: string (nullable = true)
對表的操作,對表的操作語句一般情況下是不常用的,因為雖然sparkSQL把sql查的每一個功能都封裝到了一個方法中,但是處理起來還是不怎么靈活一般情況下我們采用的是用sql()方法直接來寫sql,這樣比較實用,還更靈活,而且代碼的可讀性也是很高的。那下面就把能用到的方法做一個簡要的說明。
方法(sql使我們定義的sql = new SQLContext(sc)) df是一個DataFrame對象 | 實例說明 |
sql.read.table(tableName) | 讀取一張表的數(shù)據(jù) |
df.where(), df.filter() | 過濾條件,相當于sql的where部分; 用法:選擇出年齡字段中年齡大于20的字段。 返回值類型:DataFrame df.where("age >= 20"),df.filter("age >= 20") |
df.limit() | 限制輸出的行數(shù),對應于sql的limit 用法:限制輸出一百行 返回值類型:DataFrame df.limit(100) |
df.join() | 鏈接操作,相當于sql的join 對于join操作,下面會單獨進行介紹 |
df.groupBy() | 聚合操作,相當于sql的groupBy 用法:對于某幾行進行聚合 返回值類型:DataFrame df.groupBy("id") |
df.agg() | 求聚合用的相關函數(shù),下面會詳細介紹 |
df.intersect(other:DataFrame) | 求兩個DataFrame的交集 |
df.except(other:DataFrame) | 求在df中而不在other中的行 |
df.withColumn(colName:String,col:Column) | 增加一列 |
df.withColumnRenamed(exName,newName) | 對某一列的名字進行重新命名 |
df.map(), df.flatMap, df.mapPartitions(), df.foreach() df.foreachPartition() df.collect() df.collectAsList() df.repartition() df.distinct() df.count() | 這些方法都是spark的RDD的基本操作,其中在DataFrame類中也封裝了這些方法,需要注意的是這些方法的返回值是RDD類型的,不是DataFrame類型的,在這些方法的使用上,一定要記清楚返回值類型,不然就容易出現(xiàn)錯誤 |
df.select() | 選取某幾列元素,這個方法相當于sql的select的功能 用法:返回選擇的某幾列數(shù)據(jù) 返回值類型:DataFrame df.select("id","name") |
以上是兩個都是一寫基本的方法,下面就詳細介紹一下join和agg,na,udf操作
2.4 sparkSQL的join操作
spark的join操作就沒有直接寫sql的join操作來的靈活,在進行鏈接的時候,不能對兩個表中的字段進行重新命名,這樣就會出現(xiàn)同一張表中出現(xiàn)兩個相同的字段。下面就一點一點的進行展開用到的兩個表,一個是用戶信息表,一個是用戶的收入薪資表:
1、內(nèi)連接,等值鏈接,會把鏈接的列合并成一個列
val sql = new SQLContext(sc) val pInfo = sql.read.json("people.txt") val pSalar = sql.read.json("salary.txt") val info_salary = pInfo.join(pSalar,"id")//單個字段進行內(nèi)連接 val info_salary1 = pInfo.join(pSalar,Seq("id","name"))//多字段鏈接
返回的結果如下圖:
單個id進行鏈接 (一張表出現(xiàn)兩個name字段) 兩個字段進行鏈接
2、join還支持左聯(lián)接和右鏈接,但是其左聯(lián)接和右鏈接和我們sql的鏈接的意思是一樣的,同樣也是在鏈接的時候不能對字段進行重新命名,如果兩個表中有相同的字段,則就會出現(xiàn)在同一個join的表中,同事左右鏈接,不會合并用于鏈接的字段。鏈接用的關鍵詞:outer,inner,left_outer,right_outer
//單字段鏈接 val left = pInfo.join(pSalar,pInfo("id") === pSalar("id"),"left_outer") //多字段鏈接 val left2 = pInfo.join(pSalar,pInfo("id") === pSalar("id") and pInfo("name") === pSalar("name"),"left_outer")
返回的結果:
單字段鏈接 多字段鏈接
由上可以發(fā)現(xiàn),sparkSQL的join操作還是沒有sql的join靈活,容易出現(xiàn)重復的字段在同一張表中,一般我們進行鏈接操作時,我們都是先利用registerTempTable()函數(shù)把此DataFrame注冊成一個內(nèi)部表,然后通過sql.sql("")寫sql的方法進行鏈接,這樣可以更好的解決了重復字段的問題。
2.5 sparkSQL的agg操作
其中sparkSQL的agg是sparkSQL聚合操作的一種表達式,當我們調(diào)用agg時,其一般情況下都是和groupBy()的一起使用的,選擇操作的數(shù)據(jù)表為:
val pSalar = new SQLContext(sc).read.json("salary.txt") val group = pSalar.groupBy("name").agg("salary" -> "avg") val group2 = pSalar.groupBy("id","name").agg("salary" -> "avg") val group3 = pSalar.groupBy("name").agg(Map("id" -> "avg","salary"->"max"))
得到的結過如下:
group的結果 group2 group3
使用agg時需要注意的是,同一個字段不能進行兩次操作比如:agg(Map("salary" -> "avg","salary" -> "max"),他只會計算max的操作,原因很簡單,agg接入的參數(shù)是Map類型的key-value對,當key相同時,會覆蓋掉之前的value。同時還可以直接使用agg,這樣是對所有的行而言的。聚合所用的計算參數(shù)有:avg,max,min,sum,count,而不是只有例子中用到的avg
2.6 sparkSQL的na操作
sparkSQL的na方法,返回的是一個DataFrameFuctions對象,此類主要是對DataFrame中值為null的行的操作,只提供三個方法,drop()刪除行,fill()填充行,replace()代替行的操作。很簡單不做過多的介紹。
3、總結
我們使用sparkSQL的目的就是為了解決用寫sql不能解決的或者解決起來比較困難的問題,在平時的開發(fā)過程中,我們不能為了高逼格什么樣的sql問題都是用sparkSQL,這樣不是最高效的。使用sparkSQL,主要是利用了寫代碼處理數(shù)據(jù)邏輯的靈活性,但是我們也不能完全的只使用sparkSQL提供的sql方法,這樣同樣是走向了另外一個極端,有上面的討論可知,在使用join操作時,如果使用sparkSQL的join操作,有很多的弊端。為了能結合sql語句的優(yōu)越性,我們可以先把要進行鏈接的DataFrame對象,注冊成內(nèi)部的一個中間表,然后在通過寫sql語句,用SQLContext提供的sql()方法來執(zhí)行我們寫的sql,這樣處理起來更加的合理而且高效。在工作的開發(fā)過程中,我們要結合寫代碼和寫sql的各自的所長來處理我們的問題,這樣會更加的高效。
寫這篇博客,花費了我兩周的時間,由于工作比較忙,只有在業(yè)余時間進行思考和總結。也算對自己學習的一個交代。關于sparkSQL的兩個類HiveContext和SQLContext提供的udf方法,如果用好了udf方法,可以使我們代碼的開發(fā)更加的簡潔和高效,可讀性也是很強的。由于在代碼中注冊udf方法,還有很多很細的知識點需要注意,我準備在另外寫一篇博客進行詳細的介紹。
累死我了,已經(jīng)兩天宅在家里了,該出去溜達溜達了??!
本文名稱:sparkSQL實戰(zhàn)詳解
標題路徑:http://jinyejixie.com/article36/ggsopg.html
成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供做網(wǎng)站、虛擬主機、微信公眾號、移動網(wǎng)站建設、網(wǎng)站營銷、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)