成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

PySpark進(jìn)階--深入剖析wordcount.py-創(chuàng)新互聯(lián)

在本文中, 我們借由深入剖析wordcount.py, 來揭開Spark內(nèi)部各種概念的面紗。我們再次回顧wordcount.py代碼來回答如下問題

成都創(chuàng)新互聯(lián)公司專注于企業(yè)成都營銷網(wǎng)站建設(shè)、網(wǎng)站重做改版、夏津網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5技術(shù)成都做商城網(wǎng)站、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站制作、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為夏津等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
  1. 對于大多數(shù)語言的Hello Word示例,都有main()函數(shù), wordcount.py的main函數(shù),或者說調(diào)用Spark的main() 在哪里

  2. 數(shù)據(jù)的讀入,各個RDD數(shù)據(jù)如何轉(zhuǎn)換

  3. map與flatMap的工作機(jī)制,以及區(qū)別

  4. reduceByKey的作用

WordCount.py 的代碼如下:

from __future__ import print_functionimport sysfrom operator import add# SparkSession:是一個對Spark的編程入口,取代了原本的SQLContext與HiveContext,方便調(diào)用Dataset和DataFrame API# SparkSession可用于創(chuàng)建DataFrame,將DataFrame注冊為表,在表上執(zhí)行SQL,緩存表和讀取parquet文件。from pyspark.sql import SparkSessionif __name__ == "__main__":    # Python 常用的簡單參數(shù)傳入     if len(sys.argv) != 2:         print("Usage: wordcount <file>", file=sys.stderr)         exit(-1)             # appName 為 Spark 應(yīng)用設(shè)定一個應(yīng)用名,改名會顯示在 Spark Web UI 上     # 假如SparkSession 已經(jīng)存在就取得已存在的SparkSession,否則創(chuàng)建一個新的。     spark = SparkSession\         .builder\         .appName("PythonWordCount")\         .getOrCreate()             # 讀取傳入的文件內(nèi)容,并寫入一個新的RDD實例lines中,此條語句所做工作有些多,不適合初學(xué)者,可以截成兩條語句以便理解。     # map是一種轉(zhuǎn)換函數(shù),將原來RDD的每個數(shù)據(jù)項通過map中的用戶自定義函數(shù)f映射轉(zhuǎn)變?yōu)橐粋€新的元素。原始RDD中的數(shù)據(jù)項與新RDD中的數(shù)據(jù)項是一一對應(yīng)的關(guān)系。     lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])        # flatMap與map類似,但每個元素輸入項都可以被映射到0個或多個的輸出項,最終將結(jié)果”扁平化“后輸出      counts = lines.flatMap(lambda x: x.split(' ')) \                   .map(lambda x: (x, 1)) \                   .reduceByKey(add)                     # collect() 在驅(qū)動程序中將數(shù)據(jù)集的所有元素作為數(shù)組返回。 這在返回足夠小的數(shù)據(jù)子集的過濾器或其他操作之后通常是有用的。由于collect 是將整個RDD匯聚到一臺機(jī)子上,所以通常需要預(yù)估返回數(shù)據(jù)集的大小以免溢出。                  output = counts.collect()         for (word, count) in output:         print("%s: %i" % (word, count))     spark.stop()
Spark 入口 SparkSession

Spark2.0中引入了SparkSession的概念,它為用戶提供了一個統(tǒng)一的切入點來使用Spark的各項功能,這邊不妨對照Http Session, 在此Spark就在充當(dāng)Web service的角色,程序調(diào)用Spark功能的時候需要先建立一個Session。因此看到getOrCreate()就很容易理解了, 表明可以視情況新建session或利用已有的session。

    spark = SparkSession\         .builder\         .appName("PythonWordCount")\         .getOrCreate()

既然將Spark 想象成一個Web server, 也就意味著可能用多個訪問在進(jìn)行,為了便于監(jiān)控管理, 對應(yīng)用命名一個恰當(dāng)?shù)拿Q是個好辦法。Web UI并不是本文的重點,有興趣的同學(xué)可以參考 ?Spark Application’s Web Console

加載數(shù)據(jù)

在建立SparkSession之后, 就是讀入數(shù)據(jù)并寫入到Dateset中。

    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

為了更好的分解執(zhí)行過程,是時候借助PySpark了, PySpark是python調(diào)用Spark的 API,它可以啟動一個交互式Python Shell。為了方便腳本調(diào)試,暫時切換到Linux執(zhí)行

# pysparkPython 2.7.6 (default, Jun 22 2015, 17:58:13)  [GCC 4.8.2] on linux2 Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 17/02/23 08:30:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/02/23 08:30:31 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0 17/02/23 08:30:31 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException 17/02/23 08:30:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException Welcome to       ____              __      / __/__  ___ _____/ /__     _\ \/ _ \/ _ `/ __/  '_/   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0       /_/ Using Python version 2.7.6 (default, Jun 22 2015 17:58:13) SparkSession available as 'spark'.>>> ds = spark.read.text('/home/spark2.1/spark/examples/src/main/python/a.txt')>>> type(ds) <class 'pyspark.sql.dataframe.DataFrame'>>>> print ds DataFrame[value: string]>>> lines = ds.rdd

交互式Shell的好處是可以方便的查看變量內(nèi)容和類型。此刻文件a.txt已經(jīng)加載到lines中,它是RDD(Resilient Distributed Datasets)彈性分布式數(shù)據(jù)集的實例。

RDD操作

RDD在內(nèi)存中的結(jié)構(gòu)可以參考論文, 理解RDD有兩點比較重要:

一是RDD一種只讀、只能由已存在的RDD變換而來的共享內(nèi)存,然后將所有數(shù)據(jù)都加載到內(nèi)存中,方便進(jìn)行多次重用。

二是RDD的數(shù)據(jù)默認(rèn)情況下存放在集群中不同節(jié)點的內(nèi)存中,本身提供了容錯性,可以自動從節(jié)點失敗中恢復(fù)過來。即如果某個節(jié)點上的RDD partition,因為節(jié)點故障,導(dǎo)致數(shù)據(jù)丟了,那么RDD會自動通過自己的數(shù)據(jù)來源重新計算該partition。

為了探究RDD內(nèi)部的數(shù)據(jù)內(nèi)容,可以利用collect()函數(shù), 它能夠以數(shù)組的形式,返回RDD數(shù)據(jù)集的所有元素。

>>> lines = ds.rdd>>> for i in lines.collect():...     print i... Row(value=u'These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.')

lines存儲的是Row object類型,而我們希望的是對String類型進(jìn)行處理,所以需要利用map api進(jìn)一步轉(zhuǎn)換RDD

>>> lines_map = lines.map(lambda x: x[0])>>> for i in lines_map.collect():...     print i... These examples give a quick overview of the Spark API. Spark is built on the concept of distributed datasets, which contain arbitrary Java or Python objects.

為了統(tǒng)計每個單詞的出現(xiàn)頻率,需要對每個單詞分別統(tǒng)計,那么第一步需要將上面的字符串以空格作為分隔符將單詞提取出來,并為每個詞設(shè)置一個計數(shù)器。比如 These出現(xiàn)次數(shù)是1, 我們期望的數(shù)據(jù)結(jié)構(gòu)是['There', 1]。但是如何將包含字符串的RDD轉(zhuǎn)換成元素為類似 ['There', 1] 的RDD呢?

>>> flat_map = lines_map.flatMap(lambda x: x.split(' '))>>> rdd_map = flat_map.map(lambda x: [x, 1])>>> for i in rdd_map.collect():...     print i... [u'These', 1] [u'examples', 1] [u'give', 1] [u'a', 1] [u'quick', 1]

下圖簡要的講述了flatMap 和 map的轉(zhuǎn)換過程。

PySpark進(jìn)階--深入剖析wordcount.py

transfrom.png

不難看出,map api只是為所有出現(xiàn)的單詞初始化了計數(shù)器為1,并沒有統(tǒng)計相同詞,接下來這個任務(wù)由reduceByKey()來完成。在rdd_map 中,所有的詞被視為一個key,而key相同的value則執(zhí)行reduceByKey內(nèi)的算子操作,因為統(tǒng)計相同key是累加操作,所以可以直接add操作。

>>> from operator import add>>> add_map = rdd_map.reduceByKey(add)>>> for i in add_map.collect():...     print i... (u'a', 1) (u'on', 1) (u'of', 2) (u'arbitrary', 1) (u'quick', 1) (u'the', 2) (u'or', 1)>>> print rdd_map.count()26>>> print add_map.count()23

根據(jù)a.txt 的內(nèi)容,可知只有 of 和 the 兩個單詞出現(xiàn)了兩次,符合預(yù)期。

總結(jié)

以上的分解步驟,可以幫我們理解RDD的操作,需要提示的是,RDD將操作分為兩類:transformation與action。無論執(zhí)行了多少次transformation操作,RDD都不會真正執(zhí)行運算,只有當(dāng)action操作被執(zhí)行時,運算才會觸發(fā)。也就是說,上面所有的RDD都是通過collect()觸發(fā)的, 那么如果將上述的transformation放入一條簡練語句中, 則展現(xiàn)為原始wordcount.py的書寫形式。

counts = lines.flatMap(lambda x: x.split(' ')) \                   .map(lambda x: (x, 1)) \                   .reduceByKey(add)

而真正的action 則是由collect()完成。

output = counts.collect()

至此,已經(jīng)完成了對wordcount.py的深入剖析,但是有意的忽略了一些更底層的執(zhí)行過程,比如DAG, stage, 以及Driver程序。

作者:或然子
鏈接:https://www.jianshu.com/p/067907b23546
來源:簡書
簡書著作權(quán)歸作者所有,任何形式的轉(zhuǎn)載都請聯(lián)系作者獲得授權(quán)并注明出處。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)cdcxhl.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

網(wǎng)頁標(biāo)題:PySpark進(jìn)階--深入剖析wordcount.py-創(chuàng)新互聯(lián)
文章網(wǎng)址:http://jinyejixie.com/article20/ccchjo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制開發(fā)、網(wǎng)站導(dǎo)航、Google、網(wǎng)站維護(hù)、商城網(wǎng)站、營銷型網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站建設(shè)
大新县| 江陵县| 邳州市| 安图县| 台中县| 沾化县| 临漳县| 峡江县| 汾阳市| 厦门市| 全南县| 厦门市| 鹤壁市| 会泽县| 淳化县| 从江县| 钦州市| 迭部县| 山丹县| 黄龙县| 柳江县| 德化县| 高清| 调兵山市| 龙游县| 行唐县| 剑阁县| 凉山| 丹凤县| 高安市| 乌恰县| 绥化市| 都匀市| 宜君县| 拜泉县| 浙江省| 肥城市| 延津县| 长岛县| 沭阳县| 建昌县|