REGISTER lib/piggybank.jar;
REGISTER lib/datafu-1.2.0.jar;
DEFINE UnixToISO org.apache.pig.piggybank.evaluation.datetime.convert.UnixToISO();
DEFINE Sessionize datafu.pig.sessions.Sessionize('10m');
pv = LOAD 'session_test/clicks.csv' USING PigStorage(',') AS (memberId:int, time:long, url:chararray);
--need to put time in the first position of the tuple
pv = FOREACH pv GENERATE time, memberId, url;
--if one session only contains one domain, group by (memberId, url)
pv_sessionized = FOREACH (GROUP pv BY (memberId,url)) {
ordered = ORDER pv BY time;
GENERATE FLATTEN(Sessionize(ordered)) AS (time, memberId, url, sessionId);
};
-- compute length of each session in minutes
sessionID, memberID, timestamp,Domain,pageviews, sessionLen
session_times = FOREACH (GROUP pv_sessionized BY (sessionId, memberId, url))
GENERATE group.sessionId, group.memberId, group.url, UnixToISO(MIN(pv_sessionized.time)), SIZE(pv_sessionized.url),(MAX(pv_sessionized.time)-MIN(pv_sessionized.time))/1000 as session_length;
STORE session_times into 'session_results' USING PigStorage(',');
If need to consider End of Day splitting, we need to check the current datetime is not in the same day with the last datetime in sessionize UDF.
Reference:
http://stackoverflow.com/questions/13094321/sessionized-web-logs-get-previous-and-next-domain
http://hortonworks.com/blog/datafu/
http://datafu.incubator.apache.org/docs/datafu/1.2.0/
No comments:
Post a Comment