как да получите достъп до текстовете от текстов файл, присъстващ в кеша на hadoop

Имам 2 файла, които трябва да бъдат достъпни от клъстера hadoop. Тези два файла са съответно good.txt и bad.txt. Първо, тъй като и двата файла трябва да бъдат достъпни от различни възли, поставям тези два файла в разпределения кеш в класа на драйвера, както следва

Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/user/training/Rakshith/good.txt"),conf);
DistributedCache.addCacheFile(new URI("/user/training/Rakshith/bad.txt"),conf);
Job job = new Job(conf);

Сега както добрите, така и лошите файлове се поставят в разпределен кеш. Имам достъп до разпределения кеш в класа на картографа, както следва

public class LetterMapper extends Mapper<LongWritable,Text,LongWritable,Text> {
private Path[]files;

@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {

files=DistributedCache.getLocalCacheFiles(new Configuration(context.getConfiguration()));

    }

Трябва да проверя дали дадена дума присъства в good.txt или bad.txt. Така че използвам нещо подобно

File file=new File(files[0].toString()); //to access good.txt
BufferedReader br=new BufferedReader(new FileReader(file));
StringBuider sb=new StringBuilder();
String input=null;
while((input=br.readLine())!=null){
     sb.append(input);
     }
input=sb.toString();

трябва да получа съдържанието на добър файл в моята входна променлива. Но не го разбирам. Пропуснал ли съм нещо??


person Rakshith    schedule 17.02.2014    source източник
comment
възможен дубликат на Проблеми с настройката и достъпа до Distributed Cache   -  person Jakub Kotowski    schedule 17.02.2014


Отговори (2)


Работата завършва ли успешно? Maptask може да се провали, защото използвате JobConf в този ред

files=DistributedCache.getLocalCacheFiles(new JobConf(context.getConfiguration()));

Ако го промените по този начин, трябва да работи, не виждам никакъв проблем с оставащия код, който сте публикували.

files=DistributedCache.getLocalCacheFiles(context.getConfiguration());

or

files=DistributedCache.getLocalCacheFiles(new Configuration(context.getConfiguration()));
person rVr    schedule 17.02.2014
comment
Задачата за карта завършва успешно - person Rakshith; 18.02.2014
comment
Можете ли да споделите пълен код за map,reduce,driver class? - person rVr; 18.02.2014

@rVr това е моята шофьорска класа

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class AvgWordLength {

  public static void main(String[] args) throws Exception {


if (args.length !=2) {
  System.out.printf("Usage: AvgWordLength <input dir> <output dir>\n");
  System.exit(-1);
}



Configuration conf = new Configuration();
DistributedCache.addCacheFile(new URI("/user/training/Rakshith/good.txt"),conf);
DistributedCache.addCacheFile(new URI("/user/training/Rakshith/bad.txt"),conf);
Job job = new Job(conf);


job.setJarByClass(AvgWordLength.class);


job.setJobName("Average Word Length");
FileInputFormat.setInputPaths(job,new Path(args[0]));
FileOutputFormat.setOutputPath(job,new Path(args[1]));
job.setMapperClass(LetterMapper.class);

job.setMapOutputKeyClass(LongWritable.class);
job.setOutputValueClass(Text.class);

boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
  }
}

И моят клас картограф е

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;



public class LetterMapper extends Mapper<LongWritable,Text,LongWritable,Text> {
private Path[]files;

@Override
protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
        throws IOException, InterruptedException {
    files=DistributedCache.getLocalCacheFiles(new Configuration(context.getConfiguration()));
    System.out.println("in setup()"+files.toString());


}

 @Override
  public void map(LongWritable key, Text value, Context context)throws IOException,InterruptedException{
     int i=0;
     System.out.println("in map----->>"+files.toString());//added just to view logs



    HashMap<String,String> h=new HashMap<String,String>();
    String negword=null;
     String input=value.toString();
                 if(isPresent(input,files[0].toString()){
                     h.put(input,"good");
                      }
                 else
                      if(isPresent(input,files[1].toString()){
                           h.put(input,"bad");
                                   }


 }
 public static boolean isPresent(String n,Path files2) throws IOException{
     File file=new File(files2.toString());
        BufferedReader br=new BufferedReader(new FileReader(file));
        StringBuilder sb=new StringBuilder();
        String input=null;
        while((input=br.readLine().toString())!=null){
            sb.append(input.toString());
        }
        input=sb.toString();
        //System.out.println(input);
        Pattern pattern=Pattern.compile(n);
        Matcher matcher=pattern.matcher(input);
        if(matcher.find()){
            return true;
        }
        else
            return false;

    }
     }
person Rakshith    schedule 18.02.2014