成人午夜视频全免费观看高清-秋霞福利视频一区二区三区-国产精品久久久久电影小说-亚洲不卡区三一区三区一区

二、MapReduce基本編程規(guī)范

[TOC]

創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿足客戶于互聯(lián)網(wǎng)時(shí)代的永勝網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

一、MapReduce編程基本組成

編寫(xiě)MapReduce的程序有至少三個(gè)必不可少的部分:mapper,reducer,driver??蛇x的有 partitioner,combiner
而且mapper的輸入輸出、reducer的輸入輸出都是key value型的,所以要求我們?cè)诰帉?xiě)mapper和reducer時(shí),必須實(shí)現(xiàn)明確這4個(gè)鍵值對(duì)中的8種數(shù)據(jù)類型,而且必須還是hadoop的可序列化類型。同時(shí)還需要注意的是,map的輸出其實(shí)就是reduce的輸入,所以包括的數(shù)據(jù)類型是一樣的。

1、map階段

編寫(xiě)基本流程
1)自定義map類,需要繼承 Mapper這個(gè)類
2)繼承Mapper 的時(shí)候,需要指定輸入和輸出的鍵值對(duì)中的類型
3)必須重寫(xiě)繼承自父類的map() 方法
4)上面重寫(xiě)的map() 方法是每個(gè)map task對(duì)每一個(gè)輸入到mapper中的鍵值對(duì)都會(huì)調(diào)用處理一次。

基本編寫(xiě)實(shí)例如下:

/*
指定Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個(gè)類型分別為:
LongWritable, Text, Text, IntWritable,相當(dāng)于普通類型:
long,string,string,int
*/
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        這里是map方法 處理邏輯
    }
}

2、reduce階段

基本編寫(xiě)流程
1)自定義reduce類,需要繼承 Reducer這個(gè)類
2)繼承Reducer的時(shí)候,需要指定輸入和輸出的鍵值對(duì)中的類型
3)必須重寫(xiě)繼承自父類的reduce() 方法
4)上面重寫(xiě)的reduce() 方法是每個(gè)reduer task對(duì)每一個(gè)輸入到reducer中的鍵值對(duì)都會(huì)調(diào)用處理一次。

基本編寫(xiě)實(shí)例如下:

/*
指定Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 這4個(gè)類型分別為:
Text, IntWritable, Text, IntWritable,相當(dāng)于普通類型:
string,int,string,int
*/
public class TestReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    protected void reduce(Text key,
                          Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        這里是reduce方法 處理邏輯
    }
}

3、driver階段

這個(gè)部分是用于配置job對(duì)象的各種必須配置信息,配置完成后,將job提交給yarn執(zhí)行
具體配置啥下面直接上例子看好了。主要起到調(diào)度map和reduce任務(wù)執(zhí)行的作用

4、partitioner階段

這個(gè)階段主要是對(duì)map階段的輸出進(jìn)行分區(qū),而map的分區(qū)數(shù)直接決定reduce task的數(shù)量(一般來(lái)說(shuō)是一對(duì)一),編寫(xiě)流程如下:
1)自定義分區(qū)類,繼承 Partitioner<key, value>
2)繼承Partitioner的時(shí)候,處理的輸入的鍵值對(duì)類型
3)必須重寫(xiě)繼承自父類的getPartition() 方法
4)上面重寫(xiě)的getPartition() () 方法是每個(gè)maptask對(duì)每一個(gè)輸入的鍵值對(duì)都會(huì)調(diào)用處理一次。
5)根據(jù)分區(qū)規(guī)則,返回0~n,表示分區(qū)格式為0~n

編寫(xiě)案例如下:

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text text, IntWritable intWritable, int i) {
        判斷條件1:
        return 0;
        判斷條件2:
        return 1;
        .......
        return n;
    }
}

5、combiner

combiner不是一個(gè)獨(dú)立的階段,它其實(shí)是包含在map階段中的。map本身輸出的鍵值對(duì)中,每個(gè)鍵值對(duì)的value都是1,就算是一樣的key,也是獨(dú)立一個(gè)鍵值對(duì)。如果重復(fù)的鍵值對(duì)越多,那么將map輸出傳遞到reduce的過(guò)程中,就會(huì)占用很多帶寬資源。優(yōu)化的方法就是每個(gè)map輸出時(shí),先在當(dāng)前map task下進(jìn)行局部合并匯總,減少重復(fù)可以的出現(xiàn)。即

<king,1> <>king,1>  這種一樣的key的,就會(huì)合并成 <king,2>
這樣就會(huì)減少傳輸?shù)臄?shù)據(jù)量

所以其實(shí)由此可以知道,其實(shí)combiner的操作和reduce的操作是一樣的,只不過(guò)一個(gè)是局部,一個(gè)是全局。簡(jiǎn)單的做法就是,直接將reducer作為combiner類傳入job,如:

job.setCombinerClass(WordCountReducer.class);

我們可以看看這個(gè)方法的源碼:

public void setCombinerClass(Class<? extends Reducer> cls) throws IllegalStateException {
        this.ensureState(Job.JobState.DEFINE);
        //看到?jīng)],那個(gè)  Reducer.class
        this.conf.setClass("mapreduce.job.combine.class", cls, Reducer.class);
    }

可以清楚看到設(shè)置combine class時(shí),可以看到多態(tài)的類型設(shè)置就是 Reducer 類型的,從這里也可以更加確定 combiner 的操作和 reducer的就是一樣的。

二、wordcount編程實(shí)例

下面開(kāi)始用wordcount作為例子編寫(xiě)一個(gè)完整的MapReduce程序

1、mapper

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    //setup 和 clean 方法不是必須的
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //最先執(zhí)行
        //System.out.println("this is setup");
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        //執(zhí)行完map之后執(zhí)行
        //System.out.println("this is cleanup");
    }

    //這里創(chuàng)建一個(gè)臨時(shí)對(duì)象,用于保存中間值
    Text k = new Text();
    IntWritable v = new IntWritable();

    /**
     *
     *
     * @param key
     * @param value
     * @param context  用于連接map和reduce上下文,通過(guò)這個(gè)對(duì)象傳遞map的結(jié)果給reduce
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //System.out.println("開(kāi)始map=====================");

        //1.value是讀取到的一行字符串,要將其轉(zhuǎn)換為java中的string進(jìn)行處理,即反序列化
        String line = value.toString();

        //2.切分?jǐn)?shù)據(jù)
        String[] words = line.split(" ");

        //3.輸出map結(jié)構(gòu), <單詞,個(gè)數(shù)>的形式,寫(xiě)入的時(shí)候需將普通類型轉(zhuǎn)為序列化類型
        /**
         * 兩種寫(xiě)法:
         * 1) context.write(new Text(word), new IntWritable(1));
         *     缺點(diǎn):每次都會(huì)創(chuàng)建兩個(gè)對(duì)象,最后會(huì)造成創(chuàng)建了很多臨時(shí)對(duì)象
         *
         * 2)Text k = new Text();
         *    IntWritable v = new IntWritable();
         *
         *    for {
         *       k.set(word);
         *       v.set(1);
         *       context.write(k, v);
         *    }
         *
         *    這種方法好處就是,對(duì)象只創(chuàng)建了一次,后續(xù)只是通過(guò)修改對(duì)象內(nèi)部的值的方式傳遞,無(wú)需重復(fù)創(chuàng)建多個(gè)對(duì)象
         */
        for (String word:words) {
            //轉(zhuǎn)換普通類型為可序列化類型
            k.set(word);
            v.set(1);
            //寫(xiě)入到上下文對(duì)象中
            context.write(k, v);
        }
    }
}

2、reducer

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * 這里的 Iterable<IntWritable> values 之所以是一個(gè)可迭代的對(duì)象,
     * 是因?yàn)閺膍ap傳遞過(guò)來(lái)的數(shù)據(jù)經(jīng)過(guò)合并了,如:
     * (HDFS,1),(HDFS,1)合并成 (HDFS,[1,1]) 這樣的形式,所以value可以通過(guò)迭代方式獲取其中的值
     *
     */
    IntWritable counts = new IntWritable();

    @Override
    protected void reduce(Text key,
                          Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        //1.初始化次數(shù)
        int count = 0;

        //2.匯總同一個(gè)key中的個(gè)數(shù)
        for (IntWritable value: values) {
            count += value.get();
        }

        //3.輸出reduce
        counts.set(count);
        context.write(key, counts);
    }
}

3、driver

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //這里只是方便在ide下直接運(yùn)行,如果是在命令行下直接輸入輸入和輸出文件路徑即可
        args = new String[]{"G:\\test2\\", "G:\\testmap6\\"};

        //1.獲取配置對(duì)象
        Configuration conf = new Configuration();

        //2.獲取job對(duì)象
        Job job = Job.getInstance(conf);

        //3.分別給job指定driver,map,reducer的類
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //4.分別指定map和reduce階段輸出的類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

         //這里可以設(shè)置分區(qū)類,需要額外編寫(xiě)分區(qū)實(shí)現(xiàn)類
//        job.setPartitionerClass(WordCountPartitioner.class);
//        job.setNumReduceTasks(2);

        //設(shè)置預(yù)合并類
        //job.setCombinerClass(WordCountReducer.class);

        //設(shè)置inputFormat類,大量小文件優(yōu)化,不設(shè)置默認(rèn)使用 TextInputFormat
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job,3* 1024 * 1024);
        CombineTextInputFormat.setMinInputSplitSize(job, 2 * 1024 * 1024);

        //5.數(shù)據(jù)輸入來(lái)源以及結(jié)果的輸出位置
        // 輸入的時(shí)候會(huì)根據(jù)數(shù)據(jù)源的情況自動(dòng)map切片,形成切片信息(或者叫切片方案)
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //以上就是將一個(gè)job的配置信息配置完成后,下面就提交job,hadoop將跟就job的配置執(zhí)行job

        //6.提交job任務(wù),這個(gè)方法相當(dāng)于 job.submit()之后,然后等待執(zhí)行完成
        //任務(wù)配置信息是提交至yarn的  MRappmanager
        job.waitForCompletion(true);

    }
}

當(dāng)前標(biāo)題:二、MapReduce基本編程規(guī)范
網(wǎng)站網(wǎng)址:http://jinyejixie.com/article12/ghdddc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站域名注冊(cè)、搜索引擎優(yōu)化、營(yíng)銷型網(wǎng)站建設(shè)、定制開(kāi)發(fā)服務(wù)器托管

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

h5響應(yīng)式網(wǎng)站建設(shè)
吉隆县| 定远县| 临沧市| 宁远县| 老河口市| 新巴尔虎右旗| 贺兰县| 桐乡市| 高安市| 桂东县| 海阳市| 梨树县| 乌兰浩特市| 邹平县| 遂昌县| 太仆寺旗| 赣榆县| 剑川县| 洛隆县| 定陶县| 武清区| 商河县| 昌黎县| 新丰县| 乌拉特中旗| 称多县| 富民县| 辽宁省| 兰考县| 洱源县| 永善县| 马公市| 屏边| 方正县| 昭平县| 隆德县| 林芝县| 纳雍县| 临澧县| 东明县| 卢龙县|