成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

SparkSQL初步應用

最近項目中使用SparkSQL來做數(shù)據(jù)的統(tǒng)計分析,閑來就記錄下來。
直接上代碼:


import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object SparkSQL {

  //定義兩個case class A和B:
  //    A是用戶的基本信息:包括客戶號、***號和性別
  //    B是用戶的交易信息:包括客戶號、消費金額和消費狀態(tài)
  case class A(custom_id:String,id_code:String,sex:String)
  case class B(custom_id:String,money:String,status:Int)
  
  def main(args:Array[String]): Unit = {

    //數(shù)據(jù)量不大時,測試發(fā)現(xiàn)使用local[*]的效率要比local和基于YARN的效率都高。
    //這里使用local[*]模式,設置AppName為"SparkSQL"
    val sc = new SparkContext("local[*]", "SparkSQL")
    val sqlContext = new SQLContext(sc)
    import sqlContext.createSchemaRDD
    
    //定義兩個RDD:A_RDD和B_RDD。數(shù)據(jù)之間以char(1)char(1)分隔,取出對應的客戶信息。
    val A_RDD = sc.textFile("hdfs://172.16.30.2:25000/usr/tmpdata/A.dat").map(_.split("\u0001\u0001")).map(t => tbclient(t(0), t(4), t(13)))
    val B_RDD = sc.textFile("hdfs://172.16.30.3:25000/usr/tmpdata/B.dat").map(_.split("\u0001\u0001")).map(t=>tbtrans(t(16),t(33),t(71).toInt))
    
    //將普通RDD轉為SchemaRDD
    A_RDD.registerTempTable("A_RDD")
    B_RDD.registerTempTable("B_RDD")
    
 
    def toInt(s: String): Int = {
      try {
        s.toInt
      } catch {
        case e: Exception => 9999
      }
    }

    def myfun2(id_code:String):Int = {
      val i = id_code.length
      i
    }

    //定義函數(shù):根據(jù)***號判斷屬相
    //這里注意Scala的substring方法的使用,和Java、Oracle等都不同
       
    def myfun5(id_code:String):String = {
      var year = ""
      if(id_code.length == 18){
        val md = toInt(id_code.substring(6,10))
        val i = 1900
        val years=new Array[String](12)
        years(0) = "鼠"
        years(1) = "牛"
        years(2) = "虎"
        years(3) = "兔"
        years(4) = "龍"
        years(5) = "蛇"
        years(6) = "馬"
        years(7) = "羊"
        years(8) = "猴"
        years(9) = "雞"
        years(10) = "狗"
        years(11) = "豬"
        year = years((md-i)%years.length)
      }
      year
    }

    //設置年齡段
    
    def myfun3(id_code:String):String = {
      var rt = ""
      if(id_code.length == 18){
        val age = toInt(id_code.substring(6,10))
        if(age >= 1910 && age < 1920){
          rt = "1910 ~ 1920"
        }
        else if(age >= 1920 && age < 1930){
          rt = "1920 ~ 1930"
        }
        else if(age >= 1930 && age < 1940){
          rt = "1930 ~ 1940"
        }
        else if(age >= 1940 && age < 1950){
          rt = "1940 ~ 1950"
        }
        else if(age >= 1950 && age < 1960){
          rt = "1950 ~ 1960"
        }
        else if(age >= 1960 && age <1970){
          rt = "1960 ~ 1970"
        }
        else if(age >= 1970 && age <1980){
          rt = "1970 ~ 1980"
        }
        else if(age >= 1980 && age <1990){
          rt = "1980 ~ 1990"
        }
        else if(age >= 1990 && age <2000){
          rt = "1990 ~ 2000"
        }
        else if(age >= 2000 && age <2010){
          rt = "2000 ~ 2010"
        }
        else if(age >= 2010 && age<2014){
          rt = "2010以后"
        }
      }
      rt
    }

    //劃分消費金額區(qū)間
    
    def myfun4(money:String):String = {
      var rt = ""
      if(money>="10000" && money<"50000"){
        rt = "10000 ~ 50000"
      }
      else if(money>="50000" && money<"60000"){
        rt = "50000 ~ 60000"
      }
      else if(money>="60000" && money<"70000"){
        rt = "60000 ~ 70000"
      }
      else if(money>="70000" && money<"80000"){
        rt = "70000 ~ 80000"
      }
      else if(money>="80000" && money<"100000"){
        rt = "80000 ~ 100000"
      }
      else if(money>="100000" && money<"150000"){
        rt = "100000 ~ 150000"
      }
      else if(money>="150000" && money<"200000"){
        rt = "150000 ~ 200000"
      }
      else if(money>="200000" && money<"1000000"){
        rt = "200000 ~ 1000000"
      }
      else if(money>="1000000" && money<"10000000"){
        rt = "1000000 ~ 10000000"
      }
      else if(money>="10000000" && money<"50000000"){
        rt = "10000000 ~ 50000000"
      }
      else if(money>="5000000" && money<"100000000"){
        rt = "5000000 ~ 100000000"
      }
      rt
    }

    //根據(jù)生日判斷星座
    
    def myfun1(id_code:String):String = {

      var rt = ""
      if(id_code.length == 18){
          val md = toInt(id_code.substring(10,14))
          if (md >= 120 && md <= 219){
            rt = "水瓶座"
          }
          else if (md >= 220 && md <= 320){
            rt = "雙魚座"
          }
          else if (md >= 321 && md <= 420){
            rt = "白羊座"
          }
          else if (md >= 421 && md <= 521){
            rt = "金牛座"
          }
          else if (md >= 522 && md <= 621){
            rt = "雙子座"
          }
          else if (md >= 622 && md <= 722){
            rt = "巨蟹座"
          }
          else if (md >= 723 && md <= 823){
            rt = "獅子座"
          }
          else if (md >= 824 && md <= 923){
            rt = "***座"
          }
          else if (md >= 924 && md <= 1023){
            rt = "天秤座"
          }
          else if (md >= 1024 && md <= 1122){
            rt = "天蝎座"
          }
          else if (md >= 1123 && md <= 1222){
            rt = "射手座"
          }
          else if ((md >= 1223 && md <= 1231) | (md >= 101 && md <= 119)){
            rt = "摩蝎座"
          }
          else
            rt = "無效"
        }
      rt
    }

    //注冊函數(shù)
    sqlContext.registerFunction("fun1",(x:String)=>myfun1(x))
    sqlContext.registerFunction("fun3",(z:String)=>myfun3(z))
    sqlContext.registerFunction("fun4",(m:String)=>myfun4(m))
    sqlContext.registerFunction("fun5",(n:String)=>myfun5(n))

    //星座統(tǒng)計,注意,這里必須要有fun2(id_code)=18這個限制,否則,第一個字段有這個限制,而第二個統(tǒng)計字段值卻沒有這個限制
    
    val result1 = sqlContext.sql("select fun1(id_code),count(*) from A_RDD t where fun2(id_code)=18 group by fun1(id_code)")
    
    //屬相統(tǒng)計
    val result2 = sqlContext.sql("select fun5(a.id_code),count(*) from A_RDD a where fun2(id_code)=18 group by fun5(a.id_code)")
    
    //根據(jù)消費區(qū)間統(tǒng)計消費人數(shù)和總金額
    val result3 = sqlContext.sql("select fun4(a.money),count(distinct a.custom_id),SUM(a.money) from B_RDD a where a.status=8 and a.custom_id in (select b.custom_id from A_RDD b where fun2(b.id_code)=18) group by fun4(a.money)")
    
    //打印結果
    result3.collect().foreach(println)
    //也可以將結果保存到OS/HDFS上
    result2.saveAsTextFile("file:///tmp/age")
  }
}

成都創(chuàng)新互聯(lián)是專業(yè)的湘西土家族網(wǎng)站建設公司,湘西土家族接單;提供成都網(wǎng)站建設、成都做網(wǎng)站,網(wǎng)頁設計,網(wǎng)站設計,建網(wǎng)站,PHP網(wǎng)站建設等專業(yè)做網(wǎng)站服務;采用PHP框架,可快速的進行湘西土家族網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團隊,希望更多企業(yè)前來合作!

在測試result3的時候,發(fā)現(xiàn)報錯:

Exception in thread "main" java.lang.RuntimeException: [1.101] failure: ``NOT'' expected but `select' found

select fun5(a.id_code),count(*) from A_RDD a where fun2(a.id_code)=18 and a.custom_id IN (select distinct b.custom_id from B_RDD b where b.status=8) group by fun5

(a.id_code)

                                                                                                    ^

at scala.sys.package$.error(package.scala:27)

at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)

at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:74)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:267)

at SparkSQL$.main(SparkSQL.scala:198)

at SparkSQL.main(SparkSQL.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

目前還在調(diào)試階段,目測可能SparkSQL對條件中子查詢的支持做的不是很好(只是猜測)。

如有問題,還望路過的高手不吝賜教。

當前文章:SparkSQL初步應用
當前路徑:http://jinyejixie.com/article36/pggdsg.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供電子商務定制網(wǎng)站移動網(wǎng)站建設、網(wǎng)站營銷、自適應網(wǎng)站、ChatGPT

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

h5響應式網(wǎng)站建設
基隆市| 敦煌市| 三门峡市| 波密县| 信丰县| 禄丰县| 区。| 布拖县| 长治县| 河西区| 河西区| 乡宁县| 个旧市| 乐平市| 白朗县| 东阳市| 汉源县| 黑河市| 南投县| 莱芜市| 泸水县| 吕梁市| 洛宁县| 家居| 汶上县| 苗栗市| 南和县| 珠海市| 银川市| 顺义区| 兰西县| 浏阳市| 仪陇县| 都兰县| 玉溪市| 巫溪县| 三台县| 永福县| 全南县| 车险| 冷水江市|