這篇文章主要講解了“Hadoop的整文件讀取方法”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“Hadoop的整文件讀取方法”吧!
成都一家集口碑和實(shí)力的網(wǎng)站建設(shè)服務(wù)商,擁有專業(yè)的企業(yè)建站團(tuán)隊(duì)和靠譜的建站技術(shù),十載企業(yè)及個(gè)人網(wǎng)站建設(shè)經(jīng)驗(yàn) ,為成都1000多家客戶提供網(wǎng)頁(yè)設(shè)計(jì)制作,網(wǎng)站開發(fā),企業(yè)網(wǎng)站制作建設(shè)等服務(wù),包括成都營(yíng)銷型網(wǎng)站建設(shè),成都品牌網(wǎng)站建設(shè),同時(shí)也為不同行業(yè)的客戶提供網(wǎng)站建設(shè)、成都網(wǎng)站設(shè)計(jì)的服務(wù),包括成都電商型網(wǎng)站制作建設(shè),裝修行業(yè)網(wǎng)站制作建設(shè),傳統(tǒng)機(jī)械行業(yè)網(wǎng)站建設(shè),傳統(tǒng)農(nóng)業(yè)行業(yè)網(wǎng)站制作建設(shè)。在成都做網(wǎng)站,選網(wǎng)站制作建設(shè)服務(wù)商就選創(chuàng)新互聯(lián)建站。
寫Hadoop程序時(shí),有時(shí)候需要讀取整個(gè)文件,而不是分片讀取,但默認(rèn)的為分片讀取,所以,只有編寫自己的整文件讀取類。
需要編寫的有:
WholeInputFormat類,繼承自FileInputFormat類
WholeRecordReader類,繼承自RecordReader類
其中,用于讀取的類是WholeRecordReader類。以下代碼以Text為key值類型,BytesWritable為value的類型,因?yàn)榇蠖鄶?shù)格式在hadoop中都沒有相應(yīng)的類型支持,比如jpg,sdf,png等等,在hadoop中都沒有相應(yīng)的類,但是都可以轉(zhuǎn)換為byte[]字節(jié)流,然后在轉(zhuǎn)化為BytesWritable類型,最后在Map或者Reduce再轉(zhuǎn)換成java中的相應(yīng)類型。
代碼如下,解釋見 :
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException { return new WholeRecordReader(); } @Override //判斷是否分片,false表示不分片,true表示分片。 //其實(shí)這個(gè)不寫也可以,因?yàn)樵赪holeRecordReader中一次性全部讀完 protected boolean isSplitable(JobContext context,Path file) { return false; } }
下面是WholeRecordReader類:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<Text,BytesWritable> { //Hadoop中處理文件的類 private FileSplit fileSplit; private FSDataInputStream in = null; private BytesWritable value = null; private Text key = null; //用于判斷文件是否讀取完成 //也就是因?yàn)檫@個(gè),所以WholeInputFormat中的isSplitable方法可以不用寫 private boolean processed = false; @Override public void close() throws IOException { //do nothing } @Override public Text getCurrentKey() throws IOException, InterruptedException { return this.key; } @Override public BytesWritable getCurrentValue() throws IOException,InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? fileSplit.getLength() : 0; } @Override public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException { //打開一個(gè)文件輸入流 fileSplit = (FileSplit)split; Configuration job = context.getConfiguration(); Path file = fileSplit.getPath(); FileSystem temp = file.getFileSystem(job); in = temp.open(file); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(key == null) { key = new Text(); } if(value == null) { value = new BytesWritable(); } if(!processed) { //申請(qǐng)一個(gè)字節(jié)數(shù)組保存將從文件中讀取的內(nèi)容 byte[] content = new byte[(int)fileSplit.getLength()]; Path file = fileSplit.getPath(); //以文件的名字作為傳遞給Map函數(shù)的key值,可以自行設(shè)置 key.set(file.getName()); try{ //讀取文件中的內(nèi)容 IOUtils.readFully(in,content,0,content.length); //將value的值設(shè)置為byte[]中的值 value.set(new BytesWritable(content)); }catch(IOException e) { e.printStackTrace(); }finally{ //關(guān)閉輸入流 IOUtils.closeStream(in); } //將processed設(shè)置成true,表示讀取文件完成,以后不再讀取 processed = true; return true; } return false; } }
當(dāng)把這些寫好后,在main()函數(shù)或者run()函數(shù)里面將job的輸入格式設(shè)置成WholeInputFormat,如下:
job.setInputFormatClass(WholeInputFormat.class);
現(xiàn)在,可以整個(gè)文件讀取了。其中,key,value的類型可以換成大家需要的類型。不過(guò),當(dāng)在Hadoop中找不到對(duì)應(yīng)類型的時(shí)候建議用BytesWritable類型,然后用byte[]作為中間類型轉(zhuǎn)化為java可以處理的類型。
感謝各位的閱讀,以上就是“Hadoop的整文件讀取方法”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)Hadoop的整文件讀取方法這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
分享標(biāo)題:Hadoop的整文件讀取方法
本文URL:http://jinyejixie.com/article28/ipipcp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站內(nèi)鏈、面包屑導(dǎo)航、網(wǎng)頁(yè)設(shè)計(jì)公司、網(wǎng)站收錄、品牌網(wǎng)站建設(shè)、定制開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)