Wednesday, 20 August 2014

Oozie Sqoop incrementally Imports Daily Data

For basic functions like timestamp(), without 'wf' as prefix 'wf:timestamp()'.
Unfortunately, timestamp() returns UTC current date and time (YYYY-MM-DDThh:mm:ss.sZ), which will return error to be used as a hdfs folder name.
So we can't use timestamp() to get system date in our case.


1. Get current cluster time in coordinator.xml. Today will load the data generated from yesterday.

<coordinator-app name="project-scheduler" xmlns="uri:oozie:coordinator:0.2"
         frequency="${coord:days(1)}" start="${startDay}" end= "${endDay}"
         timezone="America/New_York">
<action>
    <workflow>
       <app-path>${appPath}</app-path>
         <configuration>
                <property>
                    <name>reportDate</name>
                    <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,
                           'DAY'), "yyyy/MM/dd")}
                    </value>
                </property>
                <property>
                    <name>yesterday</name>
                    <value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,
                           'DAY'), "yyyy-MM-dd")}
                    </value>
                </property>
                <property>
                    <name>today</name>
                    <value>${coord:formatTime(coord:nominalTime(), "yyyy-MM-dd")}
                    </value>
                </property>

            </configuration>
        </workflow>
</action>

</coordinator-app>


2. Use 'reportDate' as the new file name, and pass into both sqoop output and pig input.
In workflow.xml

 <action name="Import_Sqoop">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path='${DataPath}/${reportDate}'/>
            </prepare>
            <job-xml>conf/hive-site.xml</job-xml>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <arg>--options-file</arg>
            <arg>${optionFile}</arg>
           <arg>--target-dir</arg>
           <arg>${DataPath}/${reportDate}</arg>
            <arg>--where</arg>
            <arg>${where_condition}</arg>
            <file>${optionFile}</file>
 
        </sqoop>
        <ok to="Pig_Action"/>
        <error to="Kill"/>
    </action>
 

    <action name="Pig_Action">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
           <prepare>
                  <delete path='${OutputPath}/${reportDate}'/>
            </prepare>
        <configuration>
                <property>
                  <name>mapred.job.queue.name</name>
                  <value>${queueName}</value>
                </property>
         </configuration>
<script>${PigScriptPath}</script>
<param>input=${DataPath}/${reportDate}</param>
<param>output=${OutputPath}/${reportDate}</param>
<file>lib/hive-site.xml</file>
        </pig>
        <ok to="End"/>
        <error to="Kill"/>
    </action>

3. ${reportDate} can also be pasted into Sqoop SQL through "job.properties" file.

where_condition= DateTime > '${yesterday}' and DateTime < '${today}'


Reference:
1. WF functions: http://oozie.apache.org/docs/3.3.2/WorkflowFunctionalSpec.html
2. Coordinator functions: http://oozie.apache.org/docs/3.3.2/CoordinatorFunctionalSpec.html#a6.7.3._coord:nominalTime_EL_Function
https://oozie.apache.org/docs/3.1.3-incubating/CoordinatorFunctionalSpec.html


No comments:

Post a Comment