成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

怎么理解spark的自定義分區(qū)和排序及spark與jdbc-創(chuàng)新互聯(lián)

這篇文章將為大家詳細(xì)講解有關(guān)怎么理解spark的自定義分區(qū)和排序及spark與jdbc,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。

創(chuàng)新互聯(lián)堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時代的儋州網(wǎng)站設(shè)計(jì)、移動媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
//自定義分區(qū)
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
object PrimitivePartitionTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf
    conf.setMaster("local[2]").setAppName("Partitioner")
    val context = new SparkContext(conf)
    val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
    //實(shí)例化類,并設(shè)置分區(qū)類
    val partitioner = new CustomPartitioner(2)
    val rdd1 = rdd.partitionBy(partitioner)
    rdd1.saveAsTextFile("c:\\partitioner")
    context.stop()    
  }
}
//自定義分區(qū)類繼承spark的Partitioner
class CustomPartitioner(val partitions:Int ) extends Partitioner{
     
    def numPartitions: Int= this.partitions
   
    def getPartition(key: Any): Int={
      if(key.toString().length()<=2)
        0
      else
        1      
    }
}
//自定義排序
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.math.Ordered
//自定義排序第一種實(shí)現(xiàn)方式,通過繼承ordered
class Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{
  def compare(that: Student): Int={
    return this.age-that.age
  }
}
class Boy(val name:String,var age:Int) extends  Serializable{
  
}
//第二種方式通過實(shí)現(xiàn)隱式轉(zhuǎn)換實(shí)現(xiàn)
object MyPredef{
  implicit def toOrderBoy = new Ordering[Boy]{
   def compare(x: Boy, y: Boy): Int={
     x.age - y.age
   }
  }
}
//引入隱式轉(zhuǎn)換
import MyPredef._
object CutstomOrder {
   def main(args: Array[String]): Unit = {
     val conf = new SparkConf()
     conf.setMaster("local[2]").setAppName("CutstomOrder")
     val context = new SparkContext(conf)
     val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
     //下面的第二個參數(shù)false為降序排列
     //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1)
     val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1)
     rdd_sorted.saveAsTextFile("d:\\ordered")
     context.stop()
   } 
}
//JDBC
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import scala.collection.mutable.ListBuffer
object DataFromJdbcToSpark {
  def main(args: Array[String]): Unit = {
     val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("BroadCastTest")
    val context = new SparkContext(conf)
    val sql = "select name,age from test where id>=? and id <=?"
    var list = new ListBuffer[(String,Int)]()
    //第七個參數(shù)是一個自定義的函數(shù),spark會調(diào)用該函數(shù),完成自定義的邏輯,y的數(shù)據(jù)類型是ResultSet,該函數(shù)不可以想自己定義的數(shù)組添加數(shù)據(jù),
    //應(yīng)為應(yīng)用的函數(shù)會將結(jié)果保存在JdbcRDD中
    val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{
    (y.getString(1),y.getInt(2))       
    })
     
     println(jdbcRDD.collect().toBuffer)
     context.stop()
    
  }
  
    def getConnection():Connection={
    Class.forName("com.mysql.jdbc.Driver")
    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
    conn
  }
}
//----------------------------------------------------------------------
package hgs.spark.othertest
import java.sql.Connection
import java.sql.DriverManager
import org.apache.commons.dbutils.QueryRunner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//將spark計(jì)算后的結(jié)果錄入數(shù)據(jù)庫
object DataFromSparktoJdbc {
  
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf
    conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc")
    val context = new SparkContext(conf)
    val addressrdd= context.textFile("d:\\words")
    val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
    //println(words.partitions.size)
    var p:Int =0
    words.foreachPartition(iter=>{
      //每個分區(qū)一個鏈接
      val qr = new QueryRunner()
      val conn = getConnection
      println(conn)
      val sql = s"insert into words values(?,?)"
      //可以修改為批量插入效率更高
      while(iter.hasNext){
        val tpm = iter.next()  
        val obj1 :Object = tpm._1
        val obj2 :Object = new Integer(tpm._2)
        //obj1+conn.toString()可以看到數(shù)據(jù)庫的插入數(shù)據(jù)作用有三個不同的鏈接
        qr.update(conn, sql,obj1+conn.toString(),obj2)
      }
      //println(conn)
      //println(p)
      conn.close()
      
    })
    words.saveAsTextFile("d:\\wordresult")
  }
  def getConnection():Connection={
    Class.forName("com.mysql.jdbc.Driver")
    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
    conn
  }
  
}
//廣播變量
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object BroadCastTest{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("BroadCastTest")
    val context = new SparkContext(conf)
    val addressrdd= context.textFile("d:\\address")
    val splitaddrdd =     addressrdd.map(x=>{      
      val cs = x.split(",")
      (cs(0),cs(1))
    }).collect().toMap
    //廣播變量,數(shù)據(jù)被緩存在每個節(jié)點(diǎn),減少了節(jié)點(diǎn)之間的數(shù)據(jù)傳送,可以有效的增加效率,廣播出去的可以是任意的數(shù)據(jù)類型
    val maprdd = context.broadcast(splitaddrdd)
    val namerdd = context.textFile("d:\\name")
    
    val result = namerdd.map(x=>{
      //該出使用了廣播的出去的數(shù)組
      maprdd.value.getOrElse(x, "UnKnown")      
    })
    println(result.collect().toBuffer)
    context.stop()
  }
}
其他一些知識點(diǎn)
1.spark 廣播變量 rdd.brodcastz(rdd),廣播變量的用處是將數(shù)據(jù)匯聚傳輸?shù)礁鱾€excutor上面
	,這樣在做數(shù)據(jù)處理的時候減少了數(shù)據(jù)的傳輸
2.wordcount程序
	context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) 
	wordcount程序代碼,一個wordcount會產(chǎn)生5個RDD
	sc.textFile() 會產(chǎn)生兩個RDD 1.HadoopRDD-> MapPartitionsRDD
	   flatMap() 會產(chǎn)生MapPartitionsRDD
	   map 會產(chǎn)生MapPartitionsRDD
	   reduceByKey 產(chǎn)生ShuuledRDD
	   saveAsTextFile
   
3.緩存數(shù)據(jù)到內(nèi)存 rdd.cache   清理緩存 rdd.unpersist(true),rdd.persist存儲及級別 cache方法調(diào)用的是persist方法
4.spark 遠(yuǎn)程debug,需要設(shè)置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")

關(guān)于怎么理解spark的自定義分區(qū)和排序及spark與jdbc就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

新聞標(biāo)題:怎么理解spark的自定義分區(qū)和排序及spark與jdbc-創(chuàng)新互聯(lián)
URL鏈接:http://jinyejixie.com/article34/cshipe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、定制網(wǎng)站品牌網(wǎng)站設(shè)計(jì)、響應(yīng)式網(wǎng)站、營銷型網(wǎng)站建設(shè)網(wǎng)站維護(hù)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
泉州市| 隆回县| 天全县| 泰宁县| 五寨县| 两当县| 竹山县| 山东省| 东乌珠穆沁旗| 邵阳市| 青州市| 肥城市| 从江县| 巴林左旗| 汾阳市| 巴南区| 天台县| 田阳县| 霍城县| 浦东新区| 宝坻区| 达尔| 大安市| 新乡县| 团风县| 思南县| 达州市| 彭州市| 沈丘县| 西昌市| 阆中市| 莱州市| 桓台县| 化德县| 娄烦县| 搜索| 江达县| 淮滨县| 绥芬河市| 高唐县| 调兵山市|