博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce如何使用本地文件
阅读量:7206 次
发布时间:2019-06-29

本文共 5221 字,大约阅读时间需要 17 分钟。

hot3.png

对于java来说,读取本地文件再正常不过。但是对于mapreduce程序来说,读取本地文件常常会陷入误区。本地明明有这个文件,在本地运行jar包,mapreduce为什么读不到?因为我们知道,mapreduce程序本来就不是在本地执行的,程序会分布式的在各个机器上执行,你当然读不到文件,那所谓的“本地文件”就不叫“本地文件”,当然只有一个例外:你的hadoop集群是伪集群。比如下面的示例:package test;import java.io.BufferedReader;import java.io.File;import java.io.FileNotFoundException;import java.io.FileReader;import java.io.IOException;import java.net.URI;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.filecache.DistributedCache;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.FileOutputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.MapReduceBase;import org.apache.hadoop.mapred.Mapper;import org.apache.hadoop.mapred.OutputCollector;import org.apache.hadoop.mapred.Reducer;import org.apache.hadoop.mapred.Reporter;import org.apache.hadoop.mapred.TextInputFormat;import org.apache.hadoop.mapred.TextOutputFormat;import org.apache.hadoop.mapreduce.Mapper.Context;import org.apache.hadoop.util.GenericOptionsParser;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FileTest{    public static void main(String args[])    {        int mr = 0;        try        {            mr = ToolRunner                    .run(new Configuration(), new FileTestDriver(), args);        }        catch (Exception e)        {            e.printStackTrace();        }                System.exit(mr);    }}class FileTestDriver extends Configured implements Tool{    @Override    public int run(String[] arg0) throws Exception    {        Configuration config = getConf();        JobConf conf = new JobConf(config, FileTestDriver.class);        String[] otherArgs = new GenericOptionsParser(config, arg0)                .getRemainingArgs();        String input = otherArgs[0];        String ouput = otherArgs[1];        conf.setOutputKeyClass(Text.class);        conf.setOutputValueClass(Text.class);        conf.set("mapred.task.timeout", "6000000");        conf.setMapperClass(FileTestMapper.class);        conf.setReducerClass(FileTestReducer.class);        conf.setInputFormat(TextInputFormat.class);        conf.setOutputFormat(TextOutputFormat.class);        FileInputFormat.setInputPaths(conf, new Path(input));        FileOutputFormat.setOutputPath(conf, new Path(ouput));        JobClient.runJob(conf);        return 0;    }}class FileTestMapper extends MapReduceBase implements        Mapper
{    private String filepath = "";        public void configure(JobConf job)    {        filepath = job.get("files");    }    public void map(LongWritable key, Text value,            OutputCollector
output, Reporter reporter)            throws IOException    {        String url = "qq.com";        String host = getTop100DomainTest(url, filepath);        output.collect(new Text(url + "\t" + host), new Text(""));           }    public String getTop100DomainTest(String url, String filepath)    {        try        {            BufferedReader reader = new BufferedReader(new FileReader(new File(                    filepath)));            String line = "";            while ((line = reader.readLine()) != null)            {                // splitLine[0]为host 后面跟着域名                line = line.replaceAll("( )+", " ");                String[] splitLine = line.split(" ");                for (int i = 1; i < splitLine.length; i++)                {                    String host = splitLine[i];                    if (url.equals(host))                    {                                return splitLine[0];                    }                }            }            return "";        }        catch (FileNotFoundException e)        {            return "";        }        catch (IOException e)        {            return "";        }    }}class FileTestReducer extends MapReduceBase implements        Reducer
{    public void reduce(Text key, Iterator
values,            OutputCollector
output, Reporter reporter)            throws IOException    {        output.collect(key, new Text(""));    }} public String getTop100DomainTest(String url, String filepath)方法读取文件,并根据url返回url的domain。将上述程序打包test.jar后,运行命令:hadoop jar test.jar test.FileTest -D files="/opt/top100.txt"  /test/test /test/test1如果您是伪集群,那么恭喜,程序成功运行,如果您是分布式,那么程序很可能运行不成功?我们知道原理后,这段代码在分布式的情况下,也可以运行成功,怎么办?那就把集群的所有机器都拷贝top100.txt到/opt下!程序运行成功了吧?但其实是很老土的。当你集群数多,你要一一拷贝,那是多么麻烦的一件事,而且所有的配置文件必须在同样的文件夹下,如果你能忍受,那go ahead。实际上mapreduce提供了一个缓存方法DistributedCache。只需在配置阶段加入:DistributedCache.addCacheFile(new URI("/test/top100.txt"), conf);即可,但此处的"/test/top100.txt"为hdfs的路径。然后在mapper 的public void configure(JobConf job)方法中加入public void configure(JobConf job)    {        try        {            localFiles = DistributedCache.getLocalCacheFiles(job);        }        catch (IOException e)        {            e.printStackTrace();        }    }即可。map中引用,通过 path.toUri().getPath()即可访问到file。

转载于:https://my.oschina.net/u/3005325/blog/3001413

你可能感兴趣的文章
10.cadence.自定义焊盘的创建[原创]
查看>>
shell编程总结
查看>>
Docker源码分析(七):Docker Container网络 (上)
查看>>
一些旁门左道
查看>>
Common Pitfalls In Machine Learning Projects
查看>>
Android内存泄漏分析及调试
查看>>
todoing
查看>>
[Cocos2d-x]Cocos2d-x 3.2 学习笔记
查看>>
进程调度
查看>>
使用代码为TextView设置drawableLeft
查看>>
Android开发(十八)——头部、中部、底部布局技巧
查看>>
Egret 集成第三方库 记录
查看>>
同源策略——浏览器安全卫士
查看>>
c/c++ 基金会(七) 功能覆盖,虚函数,纯虚函数控制
查看>>
CodeForces 484B Maximum Value
查看>>
strong vs copy
查看>>
Codeforces Round #313 (Div. 1) C. Gerald and Giant Chess DP
查看>>
基于jQuery商城网站全屏图片切换代码
查看>>
Android开发之注解式框架ButterKnife在ADT中的设置
查看>>
JAVA学习篇--Java类加载
查看>>