For basic functions like timestamp(), without 'wf' as prefix 'wf:timestamp()'.
Unfortunately, timestamp() returns UTC current date and time (YYYY-MM-DDThh:mm:ss.sZ), which will return error to be used as a hdfs folder name.
So we can't use timestamp() to get system date in our case.
1. Get current cluster time in coordinator.xml. Today will load the data generated from yesterday.
<coordinator-app name="project-scheduler" xmlns="uri:oozie:coordinator:0.2"
frequency="${coord:days(1)}" start="${startDay}" end= "${endDay}"
timezone="America/New_York">
<action>
<workflow>
<app-path>${appPath}</app-path>
<configuration>
<property>
<name>reportDate</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,
'DAY'), "yyyy/MM/dd")}
</value>
</property>
<property>
<name>yesterday</name>
<value>${coord:formatTime(coord:dateOffset(coord:nominalTime(), -1,
'DAY'), "yyyy-MM-dd")}
</value>
</property>
<property>
<name>today</name>
<value>${coord:formatTime(coord:nominalTime(), "yyyy-MM-dd")}
</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
2. Use 'reportDate' as the new file name, and pass into both sqoop output and pig input.
In workflow.xml
<action name="Import_Sqoop">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path='${DataPath}/${reportDate}'/>
</prepare>
<job-xml>conf/hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<arg>--options-file</arg>
<arg>${optionFile}</arg>
<arg>--target-dir</arg>
<arg>${DataPath}/${reportDate}</arg>
<arg>--where</arg>
<arg>${where_condition}</arg>
<file>${optionFile}</file>
</sqoop>
<ok to="Pig_Action"/>
<error to="Kill"/>
</action>
<action name="Pig_Action">
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path='${OutputPath}/${reportDate}'/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>${PigScriptPath}</script>
<param>input=${DataPath}/${reportDate}</param>
<param>output=${OutputPath}/${reportDate}</param>
<file>lib/hive-site.xml</file>
</pig>
<ok to="End"/>
<error to="Kill"/>
</action>
3. ${reportDate} can also be pasted into Sqoop SQL through "job.properties" file.
where_condition= DateTime > '${yesterday}' and DateTime < '${today}'
Reference:
1. WF functions: http://oozie.apache.org/docs/3.3.2/WorkflowFunctionalSpec.html
2. Coordinator functions: http://oozie.apache.org/docs/3.3.2/CoordinatorFunctionalSpec.html#a6.7.3._coord:nominalTime_EL_Function
https://oozie.apache.org/docs/3.1.3-incubating/CoordinatorFunctionalSpec.html
No comments:
Post a Comment