這篇文章主要介紹“Apache Spark窗口功能的介紹”,在日常操作中,相信很多人在Apache Spark窗口功能的介紹問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Apache Spark窗口功能的介紹”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
讓客戶滿意是我們工作的目標,不斷超越客戶的期望值來自于我們對這個行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價值的長期合作伙伴,公司提供的服務(wù)項目有:域名注冊、虛擬空間、營銷軟件、網(wǎng)站建設(shè)、漣源網(wǎng)站維護、網(wǎng)站推廣。
創(chuàng)建Spark DataFrame
現(xiàn)在,我們創(chuàng)建一個示例Spark DataFrame,我們將在整個博客中使用它。 首先,讓我們加載所需的庫。
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._
現(xiàn)在,我們將使用一些虛擬數(shù)據(jù)創(chuàng)建DataFrame,這些虛擬數(shù)據(jù)將用于討論各種窗口函數(shù)。
case class Salary(depName: String, empNo: Long, salary: Long) val empsalary = Seq( Salary("sales", 1, 5000), Salary("personnel", 2, 3900), Salary("sales", 3, 4800), Salary("sales", 4, 4800), Salary("personnel", 5, 3500), Salary("develop", 7, 4200), Salary("develop", 8, 6000), Salary("develop", 9, 4500), Salary("develop", 10, 5200), Salary("develop", 11, 5200)).toDF()
這是我們的DataFrame的樣子:
窗口集合函數(shù)
讓我們看一些聚合的窗口函數(shù),看看它們?nèi)绾喂ぷ鳌?/p>
首先,我們需要定義窗口的規(guī)范。 假設(shè)我們要根據(jù)部門獲取匯總數(shù)據(jù)。 因此,在此示例中,我們將基于部門名稱(列:depname)定義窗口。
為聚合函數(shù)創(chuàng)建窗口規(guī)范
val byDepName = Window.partitionBy("depName")
在窗口上應(yīng)用聚合函數(shù)
現(xiàn)在,在部門內(nèi)(列:depname),我們可以應(yīng)用各種聚合函數(shù)。 因此,讓我們嘗試查找每個部門的最高和最低工資。 在這里,我們僅選擇了所需的列(depName,max_salary和min_salary),并刪除了重復(fù)的記錄。
val agg_sal = empsalary .withColumn("max_salary", max("salary").over(byDepName)) .withColumn("min_salary", min("salary").over(byDepName)) agg_sal.select("depname", "max_salary", "min_salary") .dropDuplicates() .show()
輸出:
+---------+----------+----------+ | depname|max_salary|min_salary| +---------+----------+----------+ | develop| 6000| 4200| | sales| 5000| 4800| |personnel| 3900| 3500| +---------+----------+----------+
現(xiàn)在讓我們看看它是如何工作的。 我們已經(jīng)按部門名稱對數(shù)據(jù)進行了分區(qū):
現(xiàn)在,當(dāng)我們執(zhí)行合計函數(shù)時,它將應(yīng)用于每個分區(qū)并返回合計值(在本例中為min和max)。
注意:可用的匯總函數(shù)為最大,最小,總和,平均和計數(shù)。
窗口排名功能
在本節(jié)中,我們將討論幾種類型的排名函數(shù)。
創(chuàng)建用于排序功能的窗口規(guī)范
現(xiàn)在,我們要根據(jù)員工在部門內(nèi)的薪水進行排名。 薪水最高的員工將排名第一,薪水最低的員工將排名最后。 在這里,我們將基于部門(列:depname)對數(shù)據(jù)進行分區(qū),并且在部門內(nèi),我們將根據(jù)薪水以降序?qū)?shù)據(jù)進行排序。
val winSpec = Window.partitionBy("depName").orderBy("salary".desc)
對于每個部門,記錄將根據(jù)薪水以降序排序。
1.等級功能:等級
此函數(shù)將返回分區(qū)中每個記錄的等級,并跳過任何重復(fù)等級之后的后續(xù)等級:
val rank_df = empsalary.withColumn("rank", rank().over(winSpec)) rank_df.show()
輸出:
+---------+-----+------+----+ | depName|empNo|salary|rank| +---------+-----+------+----+ | develop| 8| 6000| 1| | develop| 11| 5200| 2| | develop| 10| 5200| 2| | develop| 9| 4500| 4| | develop| 7| 4200| 5| | sales| 1| 5000| 1| | sales| 4| 4800| 2| | sales| 3| 4800| 2| |personnel| 2| 3900| 1| |personnel| 5| 3500| 2| +---------+-----+------+----+
在這里我們可以看到某些等級重復(fù),而有些等級丟失。 例如,在開發(fā)部門中,我們有2名等級= 2的員工,而沒有等級= 3的員工,因為等級函數(shù)將為相同的值保留相同的等級,并相應(yīng)地跳過下一個等級。
2.密集等級:densed_rank
此函數(shù)將返回分區(qū)中每個記錄的等級,但不會跳過任何等級。
val dense_rank_df = empsalary.withColumn("dense_rank", dense_rank().over(winSpec)) dense_rank_df.show()
輸出:
+---------+-----+------+-----------+ | depName|empNo|salary|desnse_rank| +---------+-----+------+-----------+ | develop| 8| 6000| 1| | develop| 10| 5200| 2| | develop| 11| 5200| 2| | develop| 9| 4500| 3| | develop| 7| 4200| 4| | sales| 1| 5000| 1| | sales| 3| 4800| 2| | sales| 4| 4800| 2| |personnel| 2| 3900| 1| |personnel| 5| 3500| 2| +---------+-----+------+-----------+
在這里,我們可以看到某些等級是重復(fù)的,但是排名并沒有像我們使用等級功能時那樣丟失。 例如,在開發(fā)部門中,我們有2名員工,等級=2。density_rank函數(shù)將為相同的值保留相同的等級,但不會跳過下一個等級。
3.行號函數(shù):row_number
此功能將在窗口內(nèi)分配行號。 如果2行的排序列值相同,則不確定將哪個行號分配給具有相同值的每一行。
val row_num_df = empsalary.withColumn("row_number", row_number().over(winSpec)) row_num_df.show()
輸出:
+---------+-----+------+----------+ | depName|empNo|salary|row_number| +---------+-----+------+----------+ | develop| 8| 6000| 1| | develop| 10| 5200| 2| | develop| 11| 5200| 3| | develop| 9| 4500| 4| | develop| 7| 4200| 5| | sales| 1| 5000| 1| | sales| 3| 4800| 2| | sales| 4| 4800| 3| |personnel| 2| 3900| 1| |personnel| 5| 3500| 2| +---------+-----+------+----------+
4.百分比排名函數(shù):percent_rank
此函數(shù)將返回分區(qū)中的相對(百分數(shù))等級。
val percent_rank_df = empsalary.withColumn("percent_rank", percent_rank().over(winSpec)) percent_rank_df.show()
輸出:
+---------+-----+------+------------+ | depName|empNo|salary|percent_rank| +---------+-----+------+------------+ | develop| 8| 6000| 0.0| | develop| 10| 5200| 0.25| | develop| 11| 5200| 0.25| | develop| 9| 4500| 0.75| | develop| 7| 4200| 1.0| | sales| 1| 5000| 0.0| | sales| 3| 4800| 0.5| | sales| 4| 4800| 0.5| |personnel| 2| 3900| 0.0| |personnel| 5| 3500| 1.0| +---------+-----+------+------------+
5. N-tile功能:ntile
此功能可以根據(jù)窗口規(guī)格或分區(qū)將窗口進一步細分為n個組。 例如,如果需要將部門進一步劃分為三類,則可以將ntile指定為3。
val ntile_df = empsalary.withColumn("ntile", ntile(3).over(winSpec)) ntile_df.show()
輸出:
+---------+-----+------+-----+ | depName|empNo|salary|ntile| +---------+-----+------+-----+ | develop| 8| 6000| 1| | develop| 10| 5200| 1| | develop| 11| 5200| 2| | develop| 9| 4500| 2| | develop| 7| 4200| 3| | sales| 1| 5000| 1| | sales| 3| 4800| 2| | sales| 4| 4800| 3| |personnel| 2| 3900| 1| |personnel| 5| 3500| 2| +---------+-----+------+-----+
窗口分析功能
接下來,我們將討論諸如累積分布,滯后和超前的分析功能。
1.累積分布函數(shù):cume_dist
此函數(shù)提供窗口/分區(qū)的值的累積分布。
val winSpec = Window.partitionBy("depName").orderBy("salary") val cume_dist_df = empsalary.withColumn("cume_dist",cume_dist().over(winSpec)) cume_dist_df.show()
定義窗口規(guī)范并應(yīng)用cume_dist函數(shù)以獲取累積分布。
輸出:
+---------+-----+------+------------------+ | depName|empNo|salary| cume_dist| +---------+-----+------+------------------+ | develop| 7| 4200| 0.2| | develop| 9| 4500| 0.4| | develop| 10| 5200| 0.8| | develop| 11| 5200| 0.8| | develop| 8| 6000| 1.0| | sales| 4| 4800|0.6666666666666666| | sales| 3| 4800|0.6666666666666666| | sales| 1| 5000| 1.0| |personnel| 5| 3500| 0.5 |personnel| 2| 3900| 1.0| +---------+-----+------+------------------+
2.滯后功能:滯后
此函數(shù)將在從DataFrame偏移行之前返回該值。
lag函數(shù)采用3個參數(shù)(lag(col,count = 1,默認= None)),col:定義需要在其上應(yīng)用函數(shù)的列。 count:需要回顧多少行。 default:定義默認值。
val winSpec = Window.partitionBy("depName").orderBy("salary") val lag_df = empsalary.withColumn("lag", lag("salary", 2).over(winSpec)) lag_df.show()
輸出:
+---------+-----+------+----+ | depName|empNo|salary| lag| +---------+-----+------+----+ | develop| 7| 4200|null| | develop| 9| 4500|null| | develop| 10| 5200|4200| | develop| 11| 5200|4500| | develop| 8| 6000|5200| | sales| 4| 4800|null| | sales| 3| 4800|null| | sales| 1| 5000|4800| |personnel| 5| 3500|null| |personnel| 2| 3900|null| +---------+-----+------+----+
例如,讓我們在當(dāng)前行之前查找薪水2行。
對于depname = develop,salary = 4500。沒有這樣的行,該行在該行之前2行。 因此它將為空。
對于部門名稱=發(fā)展,薪水= 6000(以藍色突出顯示)。 如果我們提前兩排,我們將獲得5200的薪水(以綠色突出顯示)。
3.導(dǎo)聯(lián)功能:導(dǎo)聯(lián)
此函數(shù)將返回DataFrame的偏移行之后的值。
val winSpec = Window.partitionBy("depName").orderBy("salary") val lead_df = empsalary.withColumn("lead", lead("salary", 2).over(winSpec)) lead_df.show()
lead函數(shù)采用3個參數(shù)(lead(col,count = 1,默認= None))col:定義需要在其上應(yīng)用函數(shù)的列。 count:對于當(dāng)前行,我們需要向前/向后查找多少行。 default:定義默認值。
輸出:
+---------+-----+------+----+ | depName|empNo|salary| lag| +---------+-----+------+----+ | develop| 7| 4200|5200| | develop| 9| 4500|5200| | develop| 10| 5200|6000| | develop| 11| 5200|null| | develop| 8| 6000|null| | sales| 3| 4800|5000| | sales| 4| 4800|null| | sales| 1| 5000|null| |personnel| 5| 3500|null| |personnel| 2| 3900|null| +---------+-----+------+----+
讓我們嘗試從當(dāng)前行中查找前進/后兩行的薪水。
對于depname =開發(fā)人員,薪水= 4500(以藍色突出顯示)。 如果我們在前進/后退兩行,我們將獲得5200的薪水(以綠色突出顯示)。
對于depname =人員,薪水=3500。在此分區(qū)中,沒有此行向前2行/在該行之后。 因此我們將獲得空值。
自定義窗口定義
默認情況下,窗口的邊界由分區(qū)列定義,我們可以通過窗口規(guī)范指定順序。 例如,對于開發(fā)部門,窗口的開始是工資的最小值,窗口的結(jié)束是工資的最大值。
但是,如果我們想更改窗口的邊界怎么辦? 以下功能可用于定義每個分區(qū)內(nèi)的窗口。
1. rangeBetween
使用rangeBetween函數(shù),我們可以顯式定義邊界。例如,從當(dāng)前薪水開始,將其定義為100,然后將其定義為300,并查看其含義。 從100開始表示窗口將從100個單位開始,從當(dāng)前值開始以300個值結(jié)束(包括開始值和結(jié)束值)。
val winSpec = Window.partitionBy("depName") .orderBy("salary") .rangeBetween(100L, 300L)
定義窗口規(guī)格
起始值和結(jié)束值后的L表示該值是Scala Long類型。
val range_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec)) range_between_df.show()
應(yīng)用自定義窗口規(guī)范
輸出:
+---------+-----+------+----------+ | depName|empNo|salary|max_salary| +---------+-----+------+----------+ | develop| 7| 4200| 4500| | develop| 9| 4500| null| | develop| 10| 5200| null| | develop| 11| 5200| null| | develop| 8| 6000| null| | sales| 3| 4800| 5000| | sales| 4| 4800| 5000| | sales| 1| 5000| null| |personnel| 5| 3500| null| |personnel| 2| 3900| null| +---------+-----+------+----------+
現(xiàn)在,讓我們嘗試了解輸出。
對于depname = developer,salary = 4200,窗口的開始將是(當(dāng)前值+開始),即4200 + 100 =4300。窗口的結(jié)束將是(當(dāng)前值+結(jié)束),即4200 + 300 = 4500。
由于只有一個薪水值在4300到4500之間,包括開發(fā)部門的4500,所以我們將4500作為max_salary作為4200(上面的檢查輸出)。
同樣,對于depname = develop,salary = 4500,窗口將為(開始:4500 + 100 = 4600,結(jié)束:4500 + 300 = 4800)。 但是開發(fā)部門沒有薪水值在4600到4800之間,因此最大值不會為空(上面的檢查輸出)。
這里有一些特殊的邊界值可以使用。
Window.currentRow:指定一行中的當(dāng)前值。
Window.unboundedPreceding:這可以用于使窗口無限制地開始。
Window.unbounded以下:此方法可用于使窗口具有無限的末端。
例如,我們需要從員工工資中找到最高工資,該最高工資大于300。 因此,我們將起始值定義為300L,將結(jié)束值定義為Window.unboundedFollowing:
val winSpec = Window.partitionBy("depName").orderBy("salary") .rangeBetween(300L, Window.unboundedFollowing) val range_unbounded_df = empsalary.withColumn("max_salary", max("salary").over(winSpec)) range_unbounded_df.show()
輸出:
+---------+-----+------+----------+ | depName|empNo|salary|max_salary| +---------+-----+------+----------+ | develop| 7| 4200| 6000| | develop| 9| 4500| 6000| | develop| 10| 5200| 6000| | develop| 11| 5200| 6000| | develop| 8| 6000| null| | sales| 3| 4800| null| | sales| 4| 4800| null| | sales| 1| 5000| null| |personnel| 5| 3500| 3900| |personnel| 2| 3900| null| +---------+-----+------+----------+
因此,對于depname =人員,薪水=3500。窗口將是(開始:3500 + 300 = 3800,結(jié)束:無邊界)。 因此,此范圍內(nèi)的最大值為3900(檢查上面的輸出)。
同樣,對于depname = sales,salary = 4800,窗口將為(開始:4800 + 300、5100,結(jié)束:無邊界)。 由于銷售部門的值不大于5100,因此結(jié)果為空。
2.rowsBetween
通過rangeBetween,我們使用排序列的值定義了窗口的開始和結(jié)束。 但是,我們也可以使用相對行位置定義窗口的開始和結(jié)束。
例如,我們要創(chuàng)建一個窗口,其中窗口的開始是當(dāng)前行之前的一行,結(jié)束是當(dāng)前行之后的一行。
定義自定義窗口規(guī)范
val winSpec = Window.partitionBy("depName") .orderBy("salary").rowsBetween(-1, 1)
應(yīng)用自定義窗口規(guī)范
val rows_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec)) rows_between_df.show()
輸出:
+---------+-----+------+----------+ | depName|empNo|salary|max_salary| +---------+-----+------+----------+ | develop| 7| 4200| 4500| | develop| 9| 4500| 5200| | develop| 10| 5200| 5200| | develop| 11| 5200| 6000| | develop| 8| 6000| 6000| | sales| 3| 4800| 4800| | sales| 4| 4800| 5000| | sales| 1| 5000| 5000| |personnel| 5| 3500| 3900| |personnel| 2| 3900| 3900| +---------+-----+------+----------+
現(xiàn)在,讓我們嘗試了解輸出。
對于depname =開發(fā),salary = 4500,將定義一個窗口,該窗口在當(dāng)前行之前和之后一行(以綠色突出顯示)。 因此窗口內(nèi)的薪水為(4200、4500、5200),最高為5200(上面的檢查輸出)。
同樣,對于depname = sales,salary = 5000,將在當(dāng)前行的前后定義一個窗口。 由于此行之后沒有行,因此該窗口將只有2行(以綠色突出顯示),其薪水分別為(4800,5000)和max為5000(上面的檢查輸出)。
我們還可以像以前使用rangeBetween一樣使用特殊邊界Window.unboundedPreceding,Window.unboundedFollowing和Window.currentRow。
注意:rowsBetween不需要排序,但是我使用它來使每次運行的結(jié)果保持一致。
到此,關(guān)于“Apache Spark窗口功能的介紹”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
當(dāng)前文章:ApacheSpark窗口功能的介紹
文章源于:http://jinyejixie.com/article42/ghosec.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計、電子商務(wù)、做網(wǎng)站、定制網(wǎng)站、全網(wǎng)營銷推廣、App開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)