对于java来说,读取本地文件再正常不过。但是对于mapreduce程序来说,读取本地文件常常会陷入误区。本地明明有这个文件,在本地运行jar包,mapreduce为什么读不到?因为我们知道,mapreduce程序本来就不是在本地执行的,程序会分布式的在各个机器上执行,你当然读不到文件,那所谓的“本地文件”就不叫“本地文件”,当然只有一个例外:你的hadoop集群是伪集群。比如下面的示例:package test;import java.io.BufferedReader;import java.io.File;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FileTest{ public static void main(String args[]) { int mr = 0; try { mr = ToolRunner .run(new Configuration(), new FileTestDriver(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(mr); }}class FileTestDriver extends Configured implements Tool{ @Override public int run(String[] arg0) throws Exception { Configuration config = getConf(); JobConf conf = new JobConf(config, FileTestDriver.class); String[] otherArgs = new GenericOptionsParser(config, arg0) .getRemainingArgs(); String input = otherArgs[0]; String ouput = otherArgs[1]; conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); conf.set("mapred.task.timeout", "6000000"); conf.setMapperClass(FileTestMapper.class); conf.setReducerClass(FileTestReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(input)); FileOutputFormat.setOutputPath(conf, new Path(ouput)); JobClient.runJob(conf); return 0; }}class FileTestMapper extends MapReduceBase implements Mapper{ private String filepath = ""; public void configure(JobConf job) { filepath = job.get("files"); } public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String url = "qq.com"; String host = getTop100DomainTest(url, filepath); output.collect(new Text(url + "\t" + host), new Text("")); } public String getTop100DomainTest(String url, String filepath) { try { BufferedReader reader = new BufferedReader(new FileReader(new File( filepath))); String line = ""; while ((line = reader.readLine()) != null) { // splitLine[0]为host 后面跟着域名 line = line.replaceAll("( )+", " "); String[] splitLine = line.split(" "); for (int i = 1; i < splitLine.length; i++) { String host = splitLine[i]; if (url.equals(host)) { return splitLine[0]; } } } return ""; } catch (FileNotFoundException e) { return ""; } catch (IOException e) { return ""; } }}class FileTestReducer extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { output.collect(key, new Text("")); }} public String getTop100DomainTest(String url, String filepath)方法读取文件,并根据url返回url的domain。将上述程序打包test.jar后,运行命令:hadoop jar test.jar test.FileTest -D files="/opt/top100.txt" /test/test /test/test1如果您是伪集群,那么恭喜,程序成功运行,如果您是分布式,那么程序很可能运行不成功?我们知道原理后,这段代码在分布式的情况下,也可以运行成功,怎么办?那就把集群的所有机器都拷贝top100.txt到/opt下!程序运行成功了吧?但其实是很老土的。当你集群数多,你要一一拷贝,那是多么麻烦的一件事,而且所有的配置文件必须在同样的文件夹下,如果你能忍受,那go ahead。实际上mapreduce提供了一个缓存方法DistributedCache。只需在配置阶段加入:DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf);即可,但此处的"/test/top100.txt"为hdfs的路径。然后在mapper 的public void configure(JobConf job)方法中加入public void configure(JobConf job) { try { localFiles = DistributedCache.getLocalCacheFiles(job); } catch (IOException e) { e.printStackTrace(); } }即可。map中引用,通过 path.toUri().getPath()即可访问到file。