Friday, 3 October 2014

Load Files from S3 to HDFS by Hadoop Streaming

Load files from s3 based on file path lists into local nodes, then upload into hdfs.

#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import os
import time
import zipimport
import datetime
import shutil
import getpass

importer = zipimport.zipimporter('boto-2.32.1.zip')
boto = importer.load_module('boto')

from boto.s3.connection import S3Connection
from boto.utils import parse_ts

ACCESSKEYID = 'xxxx'
SECRETACCESSKEY = 'xxxx'
BUCKETNAME = 'mybucket'

HDFS_PATH = "hdfs://localhost.localdomain:8020/user/cloudera/xxx/"

local_dir = os.getcwd()+'/data/'

if os.path.exists(local_dir):
    shutil.rmtree(local_dir)

os.makedirs(local_dir)
os.chdir(local_dir)

conn = S3Connection(ACCESSKEYID,SECRETACCESSKEY)
bucket = conn.get_bucket(BUCKETNAME, validate=False)

#read input from stdin
for line in sys.stdin:
    f_path = line.strip()
    for key in bucket.list(prefix=f_path):
        file_path = os.path.basename(key.name)
        if not os.path.exists(local_dir+file_path):
            key.get_contents_to_filename(local_dir+file_path)


#set job user as submitter, otherwise, the default is "mapred"
user = getpass.getuser()
os.putenv("HADOOP_USER_NAME", user)

#decrpt gpg files in every folders 
for dirpath, dirnames, files in os.walk(local_dir): for f_name in files: file_date = datetime.datetime.fromtimestamp(long(f_name[:10])) hdfs_dir = HDFS_PATH + str(file_date.year) exec_test = "hadoop fs -test -d " + hdfs_dir if os.system(exec_test): exec_mkdir = "hadoop fs -mkdir -p " + hdfs_dir #recursively create folders os.system(exec_mkdir) exec_put = "hadoop fs -put "+f_name.rstrip(".gpg")+ " "+hdfs_dir os.system(exec_put) shutil.rmtree(local_dir)

No comments:

Post a Comment