這篇文章將為大家詳細(xì)講解有關(guān)如何將nutch2.3的bin/crawl腳本改寫為java類,小編覺得挺實(shí)用的,因此分享給大家做個(gè)參考,希望大家閱讀完這篇文章后可以有所收獲。
在斗門等地區(qū),都構(gòu)建了全面的區(qū)域性戰(zhàn)略布局,加強(qiáng)發(fā)展的系統(tǒng)性、市場(chǎng)前瞻性、產(chǎn)品創(chuàng)新能力,以專注、極致的服務(wù)理念,為客戶提供成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站 網(wǎng)站設(shè)計(jì)制作按需求定制開發(fā),公司網(wǎng)站建設(shè),企業(yè)網(wǎng)站建設(shè),高端網(wǎng)站設(shè)計(jì),成都全網(wǎng)營銷推廣,外貿(mào)網(wǎng)站制作,斗門網(wǎng)站建設(shè)費(fèi)用合理。
nutch2.8以后,以前的主控代碼org.apache.nutch.crawl.Crawl
類沒了,只剩下對(duì)應(yīng)的控制腳本bin/crawl
,感覺在IDEA里面調(diào)試不方便,所以我了解了下shell腳本,根據(jù)nutch3.3的bin/crawl
和bin/nutch
腳本,把bin/crawl
翻譯成了java的Crawl類以便在IDEA里面調(diào)試
我參考了nutch2.7的crawl
類,nutch3.3的bin/crawl
和bin/nutch
,盡量按照shell腳本的原組織結(jié)構(gòu)和邏輯進(jìn)行翻譯,有些地方不能直接使用的,就稍作了修改。
主要的業(yè)務(wù)邏輯在public int run(String[] args)
方法里
程序主入口是main
,調(diào)用ToolRunner.run(NutchConfiguration.create(), new Crawl(), args);
執(zhí)行上面的run
方法
public void binNutch5j(String jobName,String commandLine,String options)
相當(dāng)于bin/crawl
腳本里函數(shù)__bin_nutch
的功能
public int runJob(String jobName,String commandLine,String options)
相當(dāng)于腳本bin/nutch的功能,這里沒有像腳本中那樣用if-else
,也沒有使用switch-case
,而是采用反射創(chuàng)建相應(yīng)的job
public void preConfig(Configuration conf,String options)
用于根據(jù)帶-D
參數(shù) commonOptions等指令設(shè)置每個(gè)Job的配置項(xiàng)
CLASS_MAP
是靜態(tài)(static
)屬性,一個(gè)記錄JobName和對(duì)應(yīng)的類名的映射關(guān)系的哈希表(HashMap
)
我之前是在每個(gè)job是按照腳本使用batchId參數(shù)的,遇到了下面這個(gè)問題:
Gora MongoDB Exception, can't serialize Utf8
貌似是序列化問題,好像gora-0.6版本解決了這個(gè)BUG,但我的nutch代碼是gora-0.5的,不會(huì)升級(jí),所以就簡(jiǎn)單的把-batchId
參數(shù)去掉,使用-all
參數(shù)就行了,這點(diǎn)在代碼里可以看到。
關(guān)于升級(jí)到gora-0.6,有空再研究好了。
通過這個(gè)腳本的改寫,我了解了腳本的基本使用,同時(shí)對(duì)之前看的java反射等知識(shí)進(jìn)行了實(shí)踐,并對(duì)nutch的完整爬取流程、主要控制邏輯有了深刻的印象。主要是前面那個(gè)gora的BUG卡了我?guī)滋?,我還以為自己翻譯的有問題,看來調(diào)試能力還需要加強(qiáng)。
這段代碼是翻譯nutch3.3的bin/crawl
和bin/nutch
腳本
Crawl
類加到在org.apache.nutch.crawl
包下,源碼如下:
package org.apache.nutch.crawl; /** * Created by brianway on 2016/1/19. * @author brianway * @site brianway.github.io * org.apache.nutch.crawl.Crawl; */ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.nutch.fetcher.FetcherJob; import org.apache.nutch.util.NutchConfiguration; import org.apache.nutch.util.NutchTool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.Map; import java.util.Random; // Commons Logging imports //import org.apache.hadoop.fs.*; //import org.apache.hadoop.mapred.*; //import org.apache.nutch.util.HadoopFSUtil; //import org.apache.nutch.util.NutchJob; //import org.apache.nutch.crawl.InjectorJob; //import org.apache.nutch.crawl.GeneratorJob; //import org.apache.nutch.fetcher.FetcherJob; //import org.apache.nutch.parse.ParserJob; //import org.apache.nutch.crawl.DbUpdaterJob; //import org.apache.nutch.indexer.IndexingJob; //import org.apache.nutch.indexer.solr.SolrDeleteDuplicates; public class Crawl extends NutchTool implements Tool{ public static final Logger LOG = LoggerFactory.getLogger(Crawl.class); /* Perform complete crawling and indexing (to Solr) given a set of root urls and the -solr parameter respectively. More information and Usage parameters can be found below. */ public static void main(String args[]) throws Exception { int res = ToolRunner.run(NutchConfiguration.create(), new Crawl(), args); System.exit(res); } //為了編譯過 @Override public Map<String, Object> run(Map<String, Object> args) throws Exception { return null; } @Override public int run(String[] args) throws Exception { if (args.length < 1) { System.out.println ("Usage: Crawl -urls <urlDir> -crawlId <crawlID> -solr <solrURL> [-threads n] [-depth i] [-topN N]"); // ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"); return -1; } // ------------check args--------- /* //??!由腳本直譯的,感覺少參數(shù),所以注釋掉,換下面的方式 String seedDir = args[1]; String crawlID = args[2]; String solrUrl=null; int limit=1; if(args.length-1 == 3){ limit = Integer.parseInt(args[3]); }else if(args.length-1 == 4){ solrUrl = args[3]; limit = Integer.parseInt(args[4]); }else{ System.out.println("Unknown # of arguments "+(args.length-1)); System.out.println ("Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"); return -1; //"Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]" //"Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"; } */ String seedDir = null; String crawlID = null; String solrUrl=null; int limit = 0; long topN = Long.MAX_VALUE; int threads = getConf().getInt("fetcher.threads.fetch", 10); //parameter-format in crawl class is // like nutch2.7 "Usage: Crawl <urlDir> -solr <solrURL> [-dir d] [-threads n] [-depth i] [-topN N]" //not like nutch3.3 "Usage: crawl <seedDir> <crawlID> [<solrUrl>] <numberOfRounds>"; for (int i = 0; i < args.length; i++) { if ("-urls".equals(args[i])) { seedDir = args[++i]; } else if ("-crawlId".equals(args[i])) { crawlID = args[++i]; } else if ("-threads".equals(args[i])) { threads = Integer.parseInt(args[++i]); } else if ("-depth".equals(args[i])) { limit = Integer.parseInt(args[++i]); } else if ("-topN".equals(args[i])) { topN = Long.parseLong(args[++i]); } else if ("-solr".equals(args[i])) { solrUrl = args[++i]; i++; } else { System.err.println("Unrecognized arg " + args[i]); return -1; } } if(StringUtils.isEmpty(seedDir)){ System.out.println("Missing seedDir : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } if(StringUtils.isEmpty(crawlID)){ System.out.println("Missing crawlID : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } if(StringUtils.isEmpty(solrUrl)){ System.out.println("No SOLRURL specified. Skipping indexing."); } if(limit == 0) { System.out.println("Missing numberOfRounds : crawl <seedDir> <crawlID> [<solrURL>] <numberOfRounds>"); return -1; } /** * MODIFY THE PARAMETERS BELOW TO YOUR NEEDS */ //set the number of slaves nodes int numSlaves = 1; //and the total number of available tasks // sets Hadoop parameter "mapred.reduce.tasks" int numTasks = numSlaves<<1; // number of urls to fetch in one iteration // 250K per task? //!!這里使用topN long sizeFetchlist = topN;//numSlaves *5; // time limit for feching int timeLimitFetch=180; //Adds <days> to the current time to facilitate //crawling urls already fetched sooner then //db.default.fetch.interval. int addDays=0; // note that some of the options listed here could be set in the // corresponding hadoop site xml param file String commonOptions="-D mapred.reduce.tasks="+numTasks+" -D mapred.child.java.opts=-Xmx1000m -D mapred.reduce.tasks.speculative.execution=false -D mapred.map.tasks.speculative.execution=false -D mapred.compress.map.output=true "; preConfig(getConf(),commonOptions); //initial injection System.out.println("Injecting seed URLs"); String inject_args = seedDir+" -crawlId "+crawlID; binNutch5j("inject",inject_args,commonOptions); for(int a=1;a<=limit;a++){ //-----------generating------------- System.out.println("Generating batchId"); String batchId = System.currentTimeMillis()+"-"+new Random().nextInt(32767); System.out.println("Generating a new fetchlist"); String generate_args = "-topN "+ sizeFetchlist +" -noNorm -noFilter -adddays "+addDays+" -crawlId "+crawlID+" -batchId "+batchId; //String generate_options = commonOptions; int res = runJob("generate",generate_args,commonOptions); System.out.println("binNutch5j generate "+generate_args); if(res==0){ }else if(res == 1){ System.out.println("Generate returned 1 (no new segments created)"); System.out.println("Escaping loop: no more URLs to fetch now"); break; }else{ System.out.println("Error running:"); System.out.println("binNutch5j generate "+generate_args); System.out.println("Failed with exit value "+res); return res; } //--------fetching----------- System.out.println("Fetching : "); //String fetch_args = batchId+" -crawlId "+crawlID+" -threads "+threads; String fetch_args = "-all"+" -crawlId "+crawlID+" -threads "+threads; String fetch_options = commonOptions+" -D fetcher.timelimit.mins="+timeLimitFetch; //10 threads binNutch5j("fetch",fetch_args,fetch_options); //----------parsing-------------- // parsing the batch // if(!getConf().getBoolean(FetcherJob.PARSE_KEY, false)){ System.out.println("Parsing : "); //enable the skipping of records for the parsing so that a dodgy document // so that it does not fail the full task //String parse_args = batchId+" -crawlId "+crawlID; String parse_args = "-all"+" -crawlId "+crawlID; String skipRecordsOptions=" -D mapred.skip.attempts.to.start.skipping=2 -D mapred.skip.map.max.skip.records=1"; binNutch5j("parse",parse_args,commonOptions+skipRecordsOptions); } //----------updatedb------------ // updatedb with this batch System.out.println("CrawlDB update for "+crawlID); // String updatedb_args = batchId+" -crawlId "+crawlID; String updatedb_args = "-all"+" -crawlId "+crawlID; binNutch5j("updatedb",updatedb_args,commonOptions); if(!StringUtils.isEmpty(solrUrl)){ System.out.println("Indexing "+ crawlID+ " on SOLR index -> " +solrUrl); String index_args = batchId+" -all -crawlId "+crawlID; String index_options = commonOptions+" -D solr.server.url="+solrUrl; binNutch5j("index",index_args,index_options); System.out.println("SOLR dedup -> "+solrUrl); binNutch5j("solrdedup",solrUrl,commonOptions); }else{ System.out.println("Skipping indexing tasks: no SOLR url provided."); } } return 0; } /** * 相當(dāng)于bin/crawl的函數(shù)__bin_nutch的功能 * @param jobName job * @param commandLine */ public void binNutch5j(String jobName,String commandLine,String options)throws Exception{ int res = runJob(jobName,commandLine,options); if(res!=0) { System.out.println("Error running:"); System.out.println(jobName + " " + commandLine); System.out.println("Error running:"); System.exit(res); } } /** * 相當(dāng)于腳本bin/nutch的功能 * * @param jobName * @param commandLine * @return */ public int runJob(String jobName,String commandLine,String options)throws Exception{ //這里為了方便,沒有像腳本那樣用多個(gè)if-elif語句,也沒有用switch-case,直接用了反射來完成 Configuration conf = NutchConfiguration.create(); if(!StringUtils.isEmpty(options)){ preConfig(conf,options); } String[] args = commandLine.split("\\s+"); String className = CLASS_MAP.get(jobName); Class<?> jobClass = Class.forName(className); Constructor c = jobClass.getConstructor(); Tool job =(Tool) c.newInstance(); System.out.println("---------------runJob: "+jobClass.getName()+"----------------------"); return ToolRunner.run(conf, job, args); } /** * 設(shè)置每個(gè)job的配置 * @param conf * @param options */ public void preConfig(Configuration conf,String options){ String [] equations = options.split("\\s*-D\\s+"); System.out.println("options:"+options); // i start from 1 not 0, skip the empty string "" for (int i=1;i<equations.length;i++) { String equation = equations[i]; String [] pair = equation.split("="); //System.out.println(pair[0]+":"+pair[1]); conf.set(pair[0],pair[1]); //System.out.println("conf print: "+pair[0]+" "+conf.get(pair[0])); } } /** * the map to store the mapping relations jobName->ClassName */ public static HashMap<String,String> CLASS_MAP = new HashMap<String,String>(); /** * init the CLASS_MAP,refer to "bin/nutch" */ static { CLASS_MAP.put("inject","org.apache.nutch.crawl.InjectorJob"); CLASS_MAP.put("generate","org.apache.nutch.crawl.GeneratorJob"); CLASS_MAP.put("fetch","org.apache.nutch.fetcher.FetcherJob"); CLASS_MAP.put("parse","org.apache.nutch.parse.ParserJob"); CLASS_MAP.put("updatedb","org.apache.nutch.crawl.DbUpdaterJob"); CLASS_MAP.put("readdb","org.apache.nutch.crawl.WebTableReader"); CLASS_MAP.put("elasticindex","org.apache.nutch.indexer.elastic.ElasticIndexerJob"); CLASS_MAP.put("index","org.apache.nutch.indexer.IndexingJob"); CLASS_MAP.put("solrdedup","org.apache.nutch.indexer.solr.SolrDeleteDuplicates"); } }
關(guān)于“如何將nutch2.3的bin/crawl腳本改寫為java類”這篇文章就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,使各位可以學(xué)到更多知識(shí),如果覺得文章不錯(cuò),請(qǐng)把它分享出去讓更多的人看到。
網(wǎng)站名稱:如何將nutch2.3的bin/crawl腳本改寫為java類
本文網(wǎng)址:http://jinyejixie.com/article38/gpgopp.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、做網(wǎng)站、企業(yè)建站、外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站維護(hù)、App開發(fā)
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)