Once those were fixed, it ran beautifully on my local machine, generating hundreds of correct mapper outputs, and ending before the reducer stage.
When we ran it in the cluster, we found another interesting error, however.
So -- its the next day, and we tried to run my map/reduce job in the cluster, and it broke immedeately....
Why ? Because I used apache's commons-io for a few things....So I had to convert the following block of code. Below is a contrast of the original code I used for recursively reading through a text file. versus the exact same logic, implemented without Commons-io, using the Hadoop FileSystem API.
The important take home lesson for me was that hadoop's FileSystem API is compatible with both local and distributed file systems, and so it works great for unit tests and standard, local disk io. However, the additional advantage is that the FileSystem API, by virtue of the Configuration object which it recieves, is modular enough to operate equally effectively on a hadoop cluster, in the hadoop DFS.
The old way :(
/** Old code - only works locally, crashes due to file protocol excption in a cluster ** /
/**
File rootDirectory = new File(rootPath.makeQualified(FileSystem.get(conf)).toUri());
//case where we had a file as input.
if(rootDirectory.isFile())
return Arrays.asList(new Path(rootDirectory.getAbsolutePath()));
//Other case, a directory was the input.
IOFileFilter filter = FileFilterUtils.prefixFileFilter("part-");
IOFileFilter dirFilter = FileFilterUtils.directoryFileFilter();
Collection<File> c1 = FileUtils.listFiles(rootDirectory, filter, dirFilter);
List<Path> paths = Lists.newArrayList();
for(File f1 : c1)
{
System.out.println("\t found json file : "+f1.getAbsolutePath() + " " + f1.length()/1000 + " kb");
paths.add(new Path(f1.getAbsolutePath()));
}
return paths;
*/
}
File rootDirectory = new File(rootPath.makeQualified(FileSystem.get(conf)).toUri());
//case where we had a file as input.
if(rootDirectory.isFile())
return Arrays.asList(new Path(rootDirectory.getAbsolutePath()));
//Other case, a directory was the input.
IOFileFilter filter = FileFilterUtils.prefixFileFilter("part-");
IOFileFilter dirFilter = FileFilterUtils.directoryFileFilter();
Collection<File> c1 = FileUtils.listFiles(rootDirectory, filter, dirFilter);
List<Path> paths = Lists.newArrayList();
for(File f1 : c1)
{
System.out.println("\t found json file : "+f1.getAbsolutePath() + " " + f1.length()/1000 + " kb");
paths.add(new Path(f1.getAbsolutePath()));
}
return paths;
*/
}
The hadoop way ! :)
public static Collection<Path> determineInputs(Path rootPath,Configuration conf) throws IOException
{
if(! FileSystem.get(conf).getFileStatus(rootPath).isDir())
return Arrays.asList(rootPath);
FileStatus[] c1 = FileSystem.get(conf).listStatus(rootPath);
List<Path> paths = Lists.newArrayList();
for(FileStatus f1 : c1)
{
if(f1.isDir())
{
paths.addAll(determineInputs(f1.getPath(),conf));
}
else
{
System.out.println("\t found json file : "+f1 + " " + f1.getLen()/1000 + " kb");
if(f1.getPath().getName().contains("part-") && ! f1.getPath().getName().startsWith("."))
paths.add(f1.getPath());
}
}
return paths;
}{
if(! FileSystem.get(conf).getFileStatus(rootPath).isDir())
return Arrays.asList(rootPath);
FileStatus[] c1 = FileSystem.get(conf).listStatus(rootPath);
List<Path> paths = Lists.newArrayList();
for(FileStatus f1 : c1)
{
if(f1.isDir())
{
paths.addAll(determineInputs(f1.getPath(),conf));
}
else
{
System.out.println("\t found json file : "+f1 + " " + f1.getLen()/1000 + " kb");
if(f1.getPath().getName().contains("part-") && ! f1.getPath().getName().startsWith("."))
paths.add(f1.getPath());
}
}
return paths;
No comments:
Post a Comment