From http://ehukai.com/2011/06/14/using-oozie-to-process-daily-logs/
Oozie has an FS action and java action. Java actions are run by a single mapper that executes it main() method. The Java action support passing arguments and setting java opts.
Oozie has an FS action and java action. Java actions are run by a single mapper that executes it main() method. The Java action support passing arguments and setting java opts.
I knew there was a method, moveToLocalFile(), on the Hadoop FileSystem class. However, the method only appears to work for a single file. FileUtil class that has a static method called copyMerge(). The method, copyMerge(), takes a source filesystem, source path, destination file system, destination path, and a flag to delete the source path on success. 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
20 
21 
22 
23 
24 
25 
26 
27 
28 
29 
30 
31 
32 
33 
34 
35 
36 
37 
38 
39 
40 
41 
42 
43 
44 
 | package com.edmunds.dwh.processor;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FileUtil;import org.apache.hadoop.fs.Path;import java.io.IOException;/** * This class moves a file or directory from HDFS to a local file or directory. * Copyright (C) 2011 Edmunds.com * <p/> * Date: 6/9/11:10:27 AM * * @author phannon */public final class HdfsToLocalFileMove {    private final FileSystem sourceFileSystem;    private final FileSystem destFileSystem;    private final Configuration conf;    public HdfsToLocalFileMove(Configuration conf, FileSystem sourceFileSystem, FileSystem destFileSystem) throws IOException {        this.sourceFileSystem = sourceFileSystem;        this.destFileSystem = destFileSystem;        this.conf = conf;    }    public void move(Path source, Path destination) throws IOException {        if (destFileSystem.exists(destination)) {            destFileSystem.delete(destination, true);        }        if (sourceFileSystem.getFileStatus(source).isDir()) {            FileUtil.copyMerge(sourceFileSystem, source, destFileSystem, destination, true, conf, null);        } else {            sourceFileSystem.moveToLocalFile(source, destination);        }    }    public static void main(String[] args) throws IOException {        Path source = new Path(args[0]);        Path dest = new Path(args[1]);        Configuration conf = new Configuration();        FileSystem fsSource = source.getFileSystem(conf);        FileSystem fsDest = FileSystem.getLocal(conf);        HdfsToLocalFileMove move = new HdfsToLocalFileMove(conf, fsSource, fsDest);        move.move(source, dest);    }} | 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 
11 
12 
13 
14 
15 
16 
17 
18 
19 
 | <action name="move-and-merge">   <java>   <job-tracker>${jobTracker}</job-tracker>   <name-node>${nameNode}</name-node>   <configuration>    <property>     <name>mapred.job.queue.name</name>       <value>${queueName}</value>    </property>   </configuration>    <main-class>com.edmunds.dwh.processor.HdfsToLocalFileMove</main-class>     <arg>${outputs}</arg>     <arg>${localDest}</arg>   <capture-output/>   </java>  <ok to="end"/> <error to="move-and-merge"/></action> | 
No comments:
Post a Comment