成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

HiveUDAF開發(fā)詳解

說明

這篇文章是來自Hadoop Hive UDAF Tutorial - Extending Hive with Aggregation Functions:的不嚴(yán)格翻譯,因為翻譯的文章示例寫得比較通俗易懂,此外,我把自己對于Hive的UDAF理解穿插到文章里面。

龍亭網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站等網(wǎng)站項目制作,到程序開發(fā),運(yùn)營維護(hù)。成都創(chuàng)新互聯(lián)公司從2013年創(chuàng)立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運(yùn)維經(jīng)驗,來保證我們的工作的順利進(jìn)行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司。

udfa是Hive中用戶自定義的聚集函數(shù),hive內(nèi)置UDAF函數(shù)包括有sum()與count(),UDAF實現(xiàn)有簡單與通用兩種方式,簡單UDAF因為使用Java反射導(dǎo)致性能損失,而且有些特性不能使用,已經(jīng)被棄用了;在這篇博文中我們將關(guān)注Hive中自定義聚類函數(shù)-GenericUDAF,UDAF開發(fā)主要涉及到以下兩個抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver??

  2. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

源碼鏈接

博文中的所有的代碼和數(shù)據(jù)可以在以下鏈接找到:hive examples

示例數(shù)據(jù)準(zhǔn)備

首先先創(chuàng)建一張包含示例數(shù)據(jù)的表:people,該表只有name一列,該列中包含了一個或多個名字,該表數(shù)據(jù)保存在people.txt文件中。

[plain]?view plain?copy

  1. ~$?cat?./people.txt??

  2. ??

  3. John?Smith??

  4. John?and?Ann?White??

  5. Ted?Green??

  6. Dorothy??

把該文件上載到hdfs目錄/user/matthew/people中:

[plain]?view plain?copy

  1. hadoop?fs?-mkdir?people??

  2. hadoop?fs?-put?./people.txt?people??

下面要創(chuàng)建hive外部表,在hive shell中執(zhí)行

[sql]?view plain?copy

  1. CREATE?EXTERNAL?TABLE?people?(name?string)??

  2. ROW?FORMAT?DELIMITED?FIELDS???

  3. ????TERMINATED?BY?'\t'???

  4. ????ESCAPED?BY?''???

  5. ????LINES?TERMINATED?BY?'\n'??

  6. STORED?AS?TEXTFILE???

  7. LOCATION?'/user/matthew/people';??

相關(guān)抽象類介紹

創(chuàng)建一個GenericUDAF必須先了解以下兩個抽象類:

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver???

[java]?view plain?copy

  1. org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator??

為了更好理解上述抽象類的API,要記住hive只是mapreduce函數(shù),只不過hive已經(jīng)幫助我們寫好并隱藏mapreduce,向上提供簡潔的sql函數(shù),所以我們要結(jié)合Mapper、Combiner與Reducer來幫助我們理解這個函數(shù)。要記住在Hadoop集群中有若干臺機(jī)器,在不同的機(jī)器上Mapper與Reducer任務(wù)獨立運(yùn)行。

所以大體上來說,這個UDAF函數(shù)讀取數(shù)據(jù)(mapper),聚集一堆mapper輸出到部分聚集結(jié)果(combiner),并且最終創(chuàng)建一個最終的聚集結(jié)果(reducer)。因為我們跨域多個combiner進(jìn)行聚集,所以我們需要保存部分聚集結(jié)果。

AbstractGenericUDAFResolver

Resolver很簡單,要覆蓋實現(xiàn)下面方法,該方法會根據(jù)sql傳人的參數(shù)數(shù)據(jù)格式指定調(diào)用哪個Evaluator進(jìn)行處理。

[java]?view plain?copy

  1. <span?style="background-color:?rgb(255,?255,?255);"><span?style="font-size:14px;">public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)?throws?SemanticException;</span></span>??

GenericUDAFEvaluator

UDAF邏輯處理主要發(fā)生在Evaluator中,要實現(xiàn)該抽象類的幾個方法。

在理解Evaluator之前,必須先理解objectInspector接口與GenericUDAFEvaluator中的內(nèi)部類Model。

ObjectInspector

作用主要是解耦數(shù)據(jù)使用與數(shù)據(jù)格式,使得數(shù)據(jù)流在輸入輸出端切換不同的輸入輸出格式,不同的Operator上使用不同的格式。可以參考這兩篇文章:first post on Hive UDFs、Hive中ObjectInspector的作用,里面有關(guān)于objectinspector的介紹。

Model

Model代表了UDAF在mapreduce的各個階段。

[java]?view plain?copy

  1. public?static?enum?Mode?{??

  2. ????/**?

  3. ?????*?PARTIAL1:?這個是mapreduce的map階段:從原始數(shù)據(jù)到部分?jǐn)?shù)據(jù)聚合?

  4. ?????*?將會調(diào)用iterate()和terminatePartial()?

  5. ?????*/??

  6. ????PARTIAL1,??

  7. ????????/**?

  8. ?????*?PARTIAL2:?這個是mapreduce的map端的Combiner階段,負(fù)責(zé)在map端合并map的數(shù)據(jù)::從部分?jǐn)?shù)據(jù)聚合到部分?jǐn)?shù)據(jù)聚合:?

  9. ?????*?將會調(diào)用merge()?和?terminatePartial()??

  10. ?????*/??

  11. ????PARTIAL2,??

  12. ????????/**?

  13. ?????*?FINAL:?mapreduce的reduce階段:從部分?jǐn)?shù)據(jù)的聚合到完全聚合??

  14. ?????*?將會調(diào)用merge()和terminate()?

  15. ?????*/??

  16. ????FINAL,??

  17. ????????/**?

  18. ?????*?COMPLETE:?如果出現(xiàn)了這個階段,表示mapreduce只有map,沒有reduce,所以map端就直接出結(jié)果了:從原始數(shù)據(jù)直接到完全聚合?

  19. ??????*?將會調(diào)用?iterate()和terminate()?

  20. ?????*/??

  21. ????COMPLETE??

  22. ??};??

一般情況下,完整的UDAF邏輯是一個mapreduce過程,如果有mapper和reducer,就會經(jīng)歷PARTIAL1(mapper),F(xiàn)INAL(reducer),如果還有combiner,那就會經(jīng)歷PARTIAL1(mapper),PARTIAL2(combiner),F(xiàn)INAL(reducer)。

而有一些情況下的mapreduce,只有mapper,而沒有reducer,所以就會只有COMPLETE階段,這個階段直接輸入原始數(shù)據(jù),出結(jié)果。

GenericUDAFEvaluator的方法

[java]?view plain?copy

  1. //?確定各個階段輸入輸出參數(shù)的數(shù)據(jù)格式ObjectInspectors??

  2. public??ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)?throws?HiveException;??

  3. ??

  4. //?保存數(shù)據(jù)聚集結(jié)果的類??

  5. abstract?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException;??

  6. ??

  7. //?重置聚集結(jié)果??

  8. public?void?reset(AggregationBuffer?agg)?throws?HiveException;??

  9. ??

  10. //?map階段,迭代處理輸入sql傳過來的列數(shù)據(jù)??

  11. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)?throws?HiveException;??

  12. ??

  13. //?map與combiner結(jié)束返回結(jié)果,得到部分?jǐn)?shù)據(jù)聚集結(jié)果??

  14. public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException;??

  15. ??

  16. //?combiner合并map返回的結(jié)果,還有reducer合并mapper或combiner返回的結(jié)果。??

  17. public?void?merge(AggregationBuffer?agg,?Object?partial)?throws?HiveException;??

  18. ??

  19. //?reducer階段,輸出最終結(jié)果??

  20. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException;??

圖解Model與Evaluator關(guān)系

Hive UDAF開發(fā)詳解

Evaluator各個階段下處理mapreduce流程

實例

下面將講述一個聚集函數(shù)UDAF的實例,我們將計算people這張表中的name列字母的個數(shù)。

下面的函數(shù)代碼是計算指定列中字符的總數(shù)(包括空格)

代碼

[java]?view plain?copy

  1. @Description(name?=?"letters",?value?=?"_FUNC_(expr)?-?返回該列中所有字符串的字符總數(shù)")??

  2. public?class?TotalNumOfLettersGenericUDAF?extends?AbstractGenericUDAFResolver?{??

  3. ??

  4. ????@Override??

  5. ????public?GenericUDAFEvaluator?getEvaluator(TypeInfo[]?parameters)??

  6. ????????????throws?SemanticException?{??

  7. ????????if?(parameters.length?!=?1)?{??

  8. ????????????throw?new?UDFArgumentTypeException(parameters.length?-?1,??

  9. ????????????????????"Exactly?one?argument?is?expected.");??

  10. ????????}??

  11. ??????????

  12. ????????ObjectInspector?oi?=?TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);??

  13. ??????????

  14. ????????if?(oi.getCategory()?!=?ObjectInspector.Category.PRIMITIVE){??

  15. ????????????throw?new?UDFArgumentTypeException(0,??

  16. ????????????????????????????"Argument?must?be?PRIMITIVE,?but?"??

  17. ????????????????????????????+?oi.getCategory().name()??

  18. ????????????????????????????+?"?was?passed.");??

  19. ????????}??

  20. ??????????

  21. ????????PrimitiveObjectInspector?inputOI?=?(PrimitiveObjectInspector)?oi;??

  22. ??????????

  23. ????????if?(inputOI.getPrimitiveCategory()?!=?PrimitiveObjectInspector.PrimitiveCategory.STRING){??

  24. ????????????throw?new?UDFArgumentTypeException(0,??

  25. ????????????????????????????"Argument?must?be?String,?but?"??

  26. ????????????????????????????+?inputOI.getPrimitiveCategory().name()??

  27. ????????????????????????????+?"?was?passed.");??

  28. ????????}??

  29. ??????????

  30. ????????return?new?TotalNumOfLettersEvaluator();??

  31. ????}??

  32. ??

  33. ????public?static?class?TotalNumOfLettersEvaluator?extends?GenericUDAFEvaluator?{??

  34. ??

  35. ????????PrimitiveObjectInspector?inputOI;??

  36. ????????ObjectInspector?outputOI;??

  37. ????????PrimitiveObjectInspector?integerOI;??

  38. ??????????

  39. ????????int?total?=?0;??

  40. ??

  41. ????????@Override??

  42. ????????public?ObjectInspector?init(Mode?m,?ObjectInspector[]?parameters)??

  43. ????????????????throws?HiveException?{??

  44. ??????????????

  45. ????????????assert?(parameters.length?==?1);??

  46. ????????????super.init(m,?parameters);??

  47. ?????????????

  48. ?????????????//map階段讀取sql列,輸入為String基礎(chǔ)數(shù)據(jù)格式??

  49. ????????????if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  50. ????????????????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  51. ????????????}?else?{??

  52. ????????????//其余階段,輸入為Integer基礎(chǔ)數(shù)據(jù)格式??

  53. ????????????????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  54. ????????????}??

  55. ??

  56. ?????????????//?指定各個階段輸出數(shù)據(jù)格式都為Integer類型??

  57. ????????????outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  58. ????????????????????ObjectInspectorOptions.JAVA);??

  59. ????????????return?outputOI;??

  60. ??

  61. ????????}??

  62. ??

  63. ????????/**?

  64. ?????????*?存儲當(dāng)前字符總數(shù)的類?

  65. ?????????*/??

  66. ????????static?class?LetterSumAgg?implements?AggregationBuffer?{??

  67. ????????????int?sum?=?0;??

  68. ????????????void?add(int?num){??

  69. ????????????????sum?+=?num;??

  70. ????????????}??

  71. ????????}??

  72. ??

  73. ????????@Override??

  74. ????????public?AggregationBuffer?getNewAggregationBuffer()?throws?HiveException?{??

  75. ????????????LetterSumAgg?result?=?new?LetterSumAgg();??

  76. ????????????return?result;??

  77. ????????}??

  78. ??

  79. ????????@Override??

  80. ????????public?void?reset(AggregationBuffer?agg)?throws?HiveException?{??

  81. ????????????LetterSumAgg?myagg?=?new?LetterSumAgg();??

  82. ????????}??

  83. ??????????

  84. ????????private?boolean?warned?=?false;??

  85. ??

  86. ????????@Override??

  87. ????????public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  88. ????????????????throws?HiveException?{??

  89. ????????????assert?(parameters.length?==?1);??

  90. ????????????if?(parameters[0]?!=?null)?{??

  91. ????????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  92. ????????????????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  93. ????????????????myagg.add(String.valueOf(p1).length());??

  94. ????????????}??

  95. ????????}??

  96. ??

  97. ????????@Override??

  98. ????????public?Object?terminatePartial(AggregationBuffer?agg)?throws?HiveException?{??

  99. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  100. ????????????total?+=?myagg.sum;??

  101. ????????????return?total;??

  102. ????????}??

  103. ??

  104. ????????@Override??

  105. ????????public?void?merge(AggregationBuffer?agg,?Object?partial)??

  106. ????????????????throws?HiveException?{??

  107. ????????????if?(partial?!=?null)?{??

  108. ??????????????????

  109. ????????????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  110. ??????????????????

  111. ????????????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  112. ??????????????????

  113. ????????????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  114. ??????????????????

  115. ????????????????myagg2.add(partialSum);??

  116. ????????????????myagg1.add(myagg2.sum);??

  117. ????????????}??

  118. ????????}??

  119. ??

  120. ????????@Override??

  121. ????????public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  122. ????????????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  123. ????????????total?=?myagg.sum;??

  124. ????????????return?myagg.sum;??

  125. ????????}??

  126. ??

  127. ????}??

  128. }??

代碼說明

這里有一些關(guān)于combiner的資源,Philippe Adjiman?講得不錯。

AggregationBuffer?允許我們保存中間結(jié)果,通過定義我們的buffer,我們可以處理任何格式的數(shù)據(jù),在代碼例子中字符總數(shù)保存在AggregationBuffer?。

[java]?view plain?copy

  1. /**?

  2. *?保存當(dāng)前字符總數(shù)的類?

  3. */??

  4. static?class?LetterSumAgg?implements?AggregationBuffer?{??

  5. ????int?sum?=?0;??

  6. ????void?add(int?num){??

  7. ????????sum?+=?num;??

  8. ????}??

  9. }??

這意味著UDAF在不同的mapreduce階段會接收到不同的輸入。Iterate讀取我們表中的一行(或者準(zhǔn)確來說是表),然后輸出其他數(shù)據(jù)格式的聚集結(jié)果。

artialAggregation合并這些聚集結(jié)果到另外相同格式的新的聚集結(jié)果,然后最終的reducer取得這些聚集結(jié)果然后輸出最終結(jié)果(該結(jié)果或許與接收數(shù)據(jù)的格式不一致)。

在init()方法中我們指定輸入為string,結(jié)果輸出格式為integer,還有,部分聚集結(jié)果輸出格式為integer(保存在aggregation buffer中);terminate()terminatePartial()兩者輸出一個integer。

[java]?view plain?copy

  1. //?init方法中根據(jù)不同的mode指定輸出數(shù)據(jù)的格式objectinspector??

  2. if?(m?==?Mode.PARTIAL1?||?m?==?Mode.COMPLETE)?{??

  3. ????inputOI?=?(PrimitiveObjectInspector)?parameters[0];??

  4. }?else?{??

  5. ????integerOI?=?(PrimitiveObjectInspector)?parameters[0];??

  6. }??

  7. ??

  8. //?不同model階段的輸出數(shù)據(jù)格式??

  9. outputOI?=?ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,??

  10. ????????????????????ObjectInspectorOptions.JAVA);??

iterate()函數(shù)讀取到每行中列的字符串,計算與保存該字符串的長度

[java]?view plain?copy

  1. public?void?iterate(AggregationBuffer?agg,?Object[]?parameters)??

  2. ????throws?HiveException?{??

  3. ????...??

  4. ????Object?p1?=?((PrimitiveObjectInspector)?inputOI).getPrimitiveJavaObject(parameters[0]);??

  5. ????myagg.add(String.valueOf(p1).length());??

  6. ????}??

  7. }??

Merge函數(shù)增加部分聚集總數(shù)到AggregationBuffer

[java]?view plain?copy

  1. public?void?merge(AggregationBuffer?agg,?Object?partial)??

  2. ????????throws?HiveException?{??

  3. ????if?(partial?!=?null)?{??

  4. ??????????????????

  5. ????????LetterSumAgg?myagg1?=?(LetterSumAgg)?agg;??

  6. ??????????????????

  7. ????????Integer?partialSum?=?(Integer)?integerOI.getPrimitiveJavaObject(partial);??

  8. ??????????????????

  9. ????????LetterSumAgg?myagg2?=?new?LetterSumAgg();??

  10. ??????????????????

  11. ????????myagg2.add(partialSum);??

  12. ????????myagg1.add(myagg2.sum);??

  13. ????}??

  14. }??

Terminate()函數(shù)返回AggregationBuffer中的內(nèi)容,這里產(chǎn)生了最終結(jié)果。

[java]?view plain?copy

  1. public?Object?terminate(AggregationBuffer?agg)?throws?HiveException?{??

  2. ????LetterSumAgg?myagg?=?(LetterSumAgg)?agg;??

  3. ????total?=?myagg.sum;??

  4. ????return?myagg.sum;??

  5. }??

使用自定義函數(shù)

[plain]?view plain?copy

  1. ADD?JAR?./hive-extension-examples-master/target/hive-extensions-1.0-SNAPSHOT-jar-with-dependencies.jar;??

  2. CREATE?TEMPORARY?FUNCTION?letters?as?'com.matthewrathbone.example.TotalNumOfLettersGenericUDAF';??

  3. ??

  4. SELECT?letters(name)?FROM?people;??

  5. OK??

  6. 44??

  7. Time?taken:?20.688?seconds ?

文章題目:HiveUDAF開發(fā)詳解
網(wǎng)頁地址:http://jinyejixie.com/article0/jjpgoo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、微信公眾號網(wǎng)頁設(shè)計公司、品牌網(wǎng)站設(shè)計外貿(mào)網(wǎng)站建設(shè)、服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站建設(shè)網(wǎng)站維護(hù)公司
黄大仙区| 绥宁县| 长子县| 广安市| 洮南市| 湖南省| 高州市| 四川省| 沙坪坝区| 响水县| 宁德市| 上栗县| 城口县| 克东县| 肇东市| 松潘县| 凭祥市| 抚远县| 弥勒县| 昌邑市| 云林县| 珠海市| 秦皇岛市| 台东市| 仙游县| 赞皇县| 阳东县| 甘洛县| 耿马| 东山县| 永川市| 水富县| 黄陵县| 皋兰县| 申扎县| 綦江县| 新安县| 淳安县| 峨边| 宁南县| 长岭县|