成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

Spark核心編程-創(chuàng)新互聯(lián)

文章目錄
  • Spark 核心編程
    • 一、RDD
      • 1、什么是 RDD
      • 2、分布式計算模擬
        • (1) 搭建基礎的架子
        • (2) 客戶端向服務器發(fā)送計算任務
      • 3、RDD 創(chuàng)建
        • (1) 從集合(內(nèi)存)中創(chuàng)建
        • (2) 從外部存儲(文件)創(chuàng)建RDD
        • (3) 從其他RDD創(chuàng)建
        • (4) 直接創(chuàng)建 RDD (new)
      • 4、RDD 并行度與分區(qū)
        • (1) makeRDD() 基于內(nèi)存創(chuàng)建的RDD的分區(qū)
        • (2) 基于文件創(chuàng)建的RDD 的分區(qū)
        • (3) 數(shù)據(jù)分區(qū)的規(guī)則

創(chuàng)新互聯(lián)建站是一家專業(yè)提供高昌企業(yè)網(wǎng)站建設,專注與網(wǎng)站設計制作、做網(wǎng)站、H5開發(fā)、小程序制作等業(yè)務。10年已為高昌眾多企業(yè)、政府機構等服務。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡公司優(yōu)惠進行中。Spark 核心編程

Spark 計算框架為了能夠進行高并發(fā)和高吞吐的數(shù)據(jù)處理,封裝了三大數(shù)據(jù)結構,用于處理不同的應用場景。三大數(shù)據(jù)結構分別是:
1)RDD:彈性分布式數(shù)據(jù)集
2)累加器:分布式共享只寫變量
3)廣播變量:分布式共享只讀變量
接下來讓我們看看這三大數(shù)據(jù)結構是如何數(shù)據(jù)處理中使用的

一、RDD 1、什么是 RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是 Spark 中最基本的數(shù)據(jù)處理模型。代碼中是一個抽象類,它代表一個彈性的,不可變,可分區(qū),里面的元素可并行計算的集合。
彈性:
存儲的彈性:內(nèi)存與磁盤的自動切換
容錯的彈性:數(shù)據(jù)丟失可以自動恢復
計算的彈性:計算出錯重試機制
分片的機制:可根據(jù)需要重新分片
分布式:數(shù)據(jù)存儲在大數(shù)據(jù)集群不同的節(jié)點上
數(shù)據(jù)集:RDD 封裝了計算邏輯,并不保存數(shù)據(jù)
數(shù)據(jù)抽象:RDD 是一個抽象類,需要子類具體實現(xiàn)
不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產(chǎn)生新的RDD,在新的RDD里面封裝邏輯計算
可分區(qū),并行計算

2、分布式計算模擬 (1) 搭建基礎的架子

首先分為兩部分,我們把Excuter當成服務器,把Driver當成客戶端。然后用客戶端去連接服務器,然后客戶端發(fā)送數(shù)據(jù)給服務器。
Excuter (服務器):
第一步設置服務器的端口號,ServerScket(9998)方法,里面的參數(shù)是端口號,這可以隨便寫。然后第二步等待客戶端發(fā)送數(shù)據(jù)過來accept()方法。然后第三步使用getInputStream輸入流接收客戶端發(fā)送過來的數(shù)據(jù),使用輸入流的read()方法,這個就是從客戶端拿到的數(shù)據(jù),然后把這個數(shù)據(jù)給輸出。最后把輸出流,數(shù)據(jù)等待,還有服務器依次都給關閉。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.InputStream
import java.net.{ServerSocket, Socket}

//這個是做計算準備的,主要是邏輯代碼部分
//這個相當于是服務器,然后Driver相當于是客戶端,客戶端連接服務器就可以直接使用了
class Excuter {}
object Excuter{def main(args: Array[String]): Unit = {//啟動服務器,接收數(shù)據(jù) 這個端口號是隨便寫的
    val server = new ServerSocket(9998) //這個是網(wǎng)絡編程的
    println("服務器啟動,等待接收數(shù)據(jù)")

    //等待客戶端的鏈接
    val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
    val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
    val i = in.read() //這個就是拿到的值
    println("接收到客戶端發(fā)送的數(shù)據(jù):" + i) //把客戶端拿到的數(shù)據(jù)給輸出

    in.close()  //把輸入流給關閉掉
    client.close()
    server.close() //把服務器給關閉掉

  }
}

在這里插入圖片描述
Driver (客戶端):
首先客戶端連接服務器的端口號Socket("localhost",9998)方法,第一個參數(shù)是連接方式,這里是本地連接,第二個參數(shù)是服務器的端口號。然后第二步就向服務器發(fā)送數(shù)據(jù),getOutputStream方法輸出流,然后使用輸出流的write()方法寫出數(shù)據(jù)。然后使用輸出流的flush()方法,flush方法的作用是,刷新此輸出流并強制寫出所有緩沖的輸出字節(jié)。然后用完之后就把輸出流和客戶端給關閉了。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.OutputStream
import java.net.Socket

//這個是用來執(zhí)行程序的
class Driver {}
object Driver{def main(args: Array[String]): Unit = {//連接服務器 本地連接,然后第二個參數(shù)是服務器定義的端口號
    val client = new Socket("localhost",9998) //這個相當于是是客戶端,連接服務器
    val out: OutputStream = client.getOutputStream //向服務器發(fā)東西,用getOutputStream()
    out.write(2)
    out.flush()

    out.close() //用完了吧這個輸出流給關掉
    client.close() //然后把這個客戶端也關掉
  }
}
(2) 客戶端向服務器發(fā)送計算任務

Excuter 類里面是服務器,Driver是客戶端,Task 里面是準備數(shù)據(jù)和邏輯操作的,那個Driver 里面創(chuàng)建一個Task 對象然后把Task 用ObjectOutputstream輸出流把對象給輸出到Excuter接收,接收也是使用ObjectIntputstream對象輸入流進行接收,因為輸出的是一個操作邏輯,用字節(jié)流接收肯定不對,所有要用對象。然后Excuter 拿到Task之后,就可以直接使用里面的函數(shù)了。Task里面要混入Serializable特質,因為在網(wǎng)絡中肯定是無法直接傳送一個對象過去的,所以要進行序列化。
在這里插入圖片描述7
Excuter 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

//這個是做計算準備的,主要是邏輯代碼部分
//這個相當于是服務器,然后Driver相當于是客戶端,客戶端連接服務器就可以直接使用了
class Excuter {}
object Excuter{//要混入序列化的特征,不然不能那個傳一個對象過去
  def main(args: Array[String]): Unit = {//啟動服務器,接收數(shù)據(jù) 這個端口號是隨便寫的
    val server = new ServerSocket(9998) //這個是網(wǎng)絡編程的
    println("服務器啟動,等待接收數(shù)據(jù)")

    //等待客戶端的鏈接
    val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
    val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
    val objin: ObjectInputStream = new ObjectInputStream(in) //輸出流失obj那么接收也應該是obj
    val task: Task = objin.readObject().asInstanceOf[Task] //這個就是拿到的值 ,但是這里不應該是AnyRef,所以要進行轉換
    val ints = task.compute() //上面已經(jīng)拿到了傳過來的操作了,所以可以直接使用里面定義的函數(shù)了
    println("計算節(jié)點的計算結果為:" + ints) //把客戶端拿到的數(shù)據(jù)給輸出

    objin.close()  //把輸入流給關閉掉
    client.close()
    server.close() //把服務器給關閉掉

  }
}

Driver 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

//這個是用來執(zhí)行程序的
class Driver {}
object Driver {def main(args: Array[String]): Unit = {//連接服務器 本地連接,然后第二個參數(shù)是服務器定義的端口號
    val client = new Socket("localhost",9998) //這個相當于是是客戶端,連接服務器
    val out: OutputStream = client.getOutputStream //向服務器發(fā)東西,用getOutputStream()

    val objout = new ObjectOutputStream(out) //定義這個Object的輸出,因為上面那個是輸出字節(jié)的不能傳輸對象

    val task:Task = new Task() //然后創(chuàng)建一個task
    objout.writeObject(task) //把task 傳入給objout 對象輸出流
    objout.flush()

    objout.close() //用完了吧這個輸出流給關掉
    client.close() //然后把這個客戶端也關掉
    println("客戶端發(fā)送數(shù)據(jù)完畢")
  }
}

Task 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

class Task extends Serializable {//要混入序列化的特征,不然不能那個傳一個對象過去
  val datas = List(1,2,3,4)  //這個是數(shù)據(jù)

  val logic = (num:Int) =>{num * 2} //匿名函數(shù)  這個是邏輯

  //計算
  def compute() = {datas.map(logic)  //莫logic 上面定義的邏輯操作傳入進去

  }

}
3、RDD 創(chuàng)建

在 Spark 中創(chuàng)建 RDD 的創(chuàng)建方式可以分為四種: 一般就是用前兩種就行了,一般前兩種用的比較多。

(1) 從集合(內(nèi)存)中創(chuàng)建

從集合中創(chuàng)建RDD,Spark主要提供了兩個方法:parallelizemakeRDD
parallelize 是并行的意思,makeRDD 的底層則完全就是調用了parallelize方法,因為這個單詞字面意思不大好理解,所以都用makeRDD就行了。
注意:local[*]里面加上*的意思是可以模擬多核多線程,要是不加的話那么就是模擬單線程,從內(nèi)存中創(chuàng)建makeRDD()方法要傳一個集合進去

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.api.java.JavaSparkContext.fromSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//在內(nèi)存(集合)中創(chuàng)建RDD
class Spark01_RDD_Memory {}
object Spark01_RDD_Memory{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
    //這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //從內(nèi)存中創(chuàng)建RDD,將內(nèi)存中集合的數(shù)據(jù)作為處理的數(shù)據(jù)
    val seq: Seq[Int] = Seq(1, 2, 3, 4)
    //parallelize 并行
    //val sc: RDD[Int] = context.parallelize(seq) //這里面?zhèn)魅氲膮?shù)是一個集合,當做數(shù)據(jù)源,
    val sc: RDD[Int] = context.makeRDD(seq) //makeRDD方法和parallelize方法是一樣的
    sc.collect().foreach(println) //只有觸發(fā)collect方法,才會執(zhí)行我們的應用程序

    //TODO 關閉環(huán)境
    context.stop()

  }
}
(2) 從外部存儲(文件)創(chuàng)建RDD

由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建RDD 包括:本地的文件系統(tǒng),所有Hadoop支持的數(shù)據(jù)集,比如HDFS,HBase 等。
注意:這個文件的路徑,可以是項目目錄下,可以洗本地環(huán)境目錄下,或者說hdfs 的路徑下都是可以的。在文件中創(chuàng)建RDD,就要用textFile()方法將文件的路徑給導入進去。或者讀取數(shù)據(jù)的時候用wholeTextFiles()方法可以看到里面的數(shù)據(jù)來源,具體是來自于哪一份文件。
textFile:以行為單位來讀取數(shù)據(jù),讀取的數(shù)據(jù)都是字符串
wholeTextFIles:以文件為單位讀取數(shù)據(jù),讀取的結果表示為元組,第一個元素表示文件路徑,第二個元素表示文件內(nèi)容
在這里插入圖片描述

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.{SparkConf, SparkContext}

//從文件中創(chuàng)建RDD
class Spark02_RDD_File {}
object Spark02_RDD_File{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //從文件中創(chuàng)建RDD,將文件中的數(shù)據(jù)作為處理的數(shù)據(jù)源
    //path路徑默認以當前環(huán)境的根路徑為基準,可以寫絕對路徑,也可以寫相對路徑,
    //還可以hdfs路徑也是可以的,例如:hdfs://master:9080/test.txt
    val file = context.textFile("datas/*")
    file.collect().foreach(println)


    //TODO 關閉環(huán)境
    context.stop()
  }
}
(3) 從其他RDD創(chuàng)建

主要是通過一個RDD運算完后,再產(chǎn)生新的RDD。

(4) 直接創(chuàng)建 RDD (new)

使用new的方式直接構造 RDD,一般由 Spark 框架自身使用。

4、RDD 并行度與分區(qū)

默認情況下,Spark 可以將一個作業(yè)切分多個任務后,發(fā)送給Executor 節(jié)點并行計算,而能夠并行計算的任務數(shù)量我們稱之為并行度。這個數(shù)量可以在構建RDD時指定。記住,這里的并行執(zhí)行的任務數(shù)量,并不是指的切分任務的數(shù)量,不要混淆了。

(1) makeRDD() 基于內(nèi)存創(chuàng)建的RDD的分區(qū)

注意:makeRDD()方法,第二個參數(shù)是個隱式參數(shù),是分區(qū)的數(shù)量,如果不傳的話那么默認分區(qū)跟本地環(huán)境的核有關。比如我的電腦是4核,那么分區(qū)就是分為四個,并行計算。saveAsTextFile()方法 將處理的數(shù)據(jù)保存成分區(qū)文件,里面的參數(shù)是要創(chuàng)建的文件名。然后輸出之后會自動生成一個這個名字的目錄,下面的文件是分區(qū)文件。

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//RDD 并行度
class Spark01_RDD_Memory_Par {}
object Spark01_RDD_Memory_Par{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
    //這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //RDD的并行度 & 分區(qū)
    //makeRDD 方法可以傳入第二個參數(shù),第二個參數(shù)是分區(qū)的數(shù)量
    //第二個參數(shù)是可以不傳的,因為是隱式參數(shù),如果不傳默認分區(qū)就是按照內(nèi)核數(shù)量決定的,我的內(nèi)核是4個,所以分區(qū)是4
    val rdd:RDD[Int] = context.makeRDD(List(1, 2, 3,4,5),3)  //里面的第一個參數(shù)是一個集合,第二個參數(shù)是分區(qū)的數(shù)量,分為幾個區(qū)
    //saveAsTextFile方法 將處理的數(shù)據(jù)保存成分區(qū)文件
    rdd.saveAsTextFile("output")//saveAsTextFile方法

    //TODO 關閉環(huán)境
    context.stop()

  }
}
(2) 基于文件創(chuàng)建的RDD 的分區(qū)

它分區(qū)分配數(shù)據(jù)的方式和Hadoop的分區(qū)的方式是一樣的。和上面的基于內(nèi)存的分配數(shù)據(jù)的方式不一樣。

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class Spark02_RDD_File_Par {}
object Spark02_RDD_File_Par{def main(args: Array[String]): Unit = {//TODO 準備環(huán)境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File2")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //textFile 可以將文件作為數(shù)據(jù)處理的數(shù)據(jù)源,默認也可以設定分區(qū)
    // minPartitions:最小分區(qū)數(shù)量
    //默認分區(qū)是兩個,如果不想使用默認的分區(qū)數(shù)量那么,可以通過第二個參數(shù)指定分區(qū)數(shù)
    val rdd: RDD[String] = context.textFile("datas/one.txt",3)
    rdd.saveAsTextFile("output")


    //TODO 關閉環(huán)境
    context.stop()
  }
}
(3) 數(shù)據(jù)分區(qū)的規(guī)則

首先看字節(jié),可以看到這個文件一共是14個字節(jié),加上回車符
在這里插入圖片描述
然后我們分兩個區(qū),14 / 2 = 7,一個區(qū)是7個字節(jié),再用 14 / 7 = 2 可以看到剛好是2沒有余數(shù),所以沒有問題剛剛好。首先是要計算行偏移量,計算出第一行的行偏移量是多少,計算出第二行是多少,然后計算行偏移量的范圍就可以算出每個分區(qū)得到的數(shù)據(jù)是什么了。
在這里插入圖片描述
查看結果
在這里插入圖片描述

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

新聞名稱:Spark核心編程-創(chuàng)新互聯(lián)
文章鏈接:http://jinyejixie.com/article28/ccesjp.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設、App開發(fā)定制開發(fā)、企業(yè)建站建站公司、關鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設
祁东县| 海阳市| 东阳市| 天祝| 东安县| 西峡县| 兴化市| 清水县| 丹江口市| 沈阳市| 碌曲县| 禹州市| 金湖县| 嘉黎县| 石景山区| 宜昌市| 德格县| 静海县| 康平县| 余江县| 泌阳县| 龙海市| 黔江区| 汉川市| 浠水县| 七台河市| 六枝特区| 和政县| 雅江县| 南平市| 沧州市| 隆子县| 永城市| 曲沃县| 囊谦县| 高要市| 南宫市| 米易县| 濉溪县| 瑞丽市| 新巴尔虎左旗|