Load files from s3 based on file path lists into local nodes, then upload into hdfs.
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys
import os
import time
import zipimport
import datetime
import shutil
import getpass
importer = zipimport.zipimporter('boto-2.32.1.zip')
boto = importer.load_module('boto')
from boto.s3.connection import S3Connection
from boto.utils import parse_ts
ACCESSKEYID = 'xxxx'
SECRETACCESSKEY = 'xxxx'
BUCKETNAME = 'mybucket'
HDFS_PATH = "hdfs://localhost.localdomain:8020/user/cloudera/xxx/"
local_dir = os.getcwd()+'/data/'
if os.path.exists(local_dir):
shutil.rmtree(local_dir)
os.makedirs(local_dir)
os.chdir(local_dir)
conn = S3Connection(ACCESSKEYID,SECRETACCESSKEY)
bucket = conn.get_bucket(BUCKETNAME, validate=False)
#read input from stdin
for line in sys.stdin:
f_path = line.strip()
for key in bucket.list(prefix=f_path):
file_path = os.path.basename(key.name)
if not os.path.exists(local_dir+file_path):
key.get_contents_to_filename(local_dir+file_path)
#set job user as submitter, otherwise, the default is "mapred"
user = getpass.getuser()
os.putenv("HADOOP_USER_NAME", user)
#decrpt gpg files in every folders
for dirpath, dirnames, files in os.walk(local_dir):
for f_name in files:
file_date = datetime.datetime.fromtimestamp(long(f_name[:10]))
hdfs_dir = HDFS_PATH + str(file_date.year)
exec_test = "hadoop fs -test -d " + hdfs_dir
if os.system(exec_test):
exec_mkdir = "hadoop fs -mkdir -p " + hdfs_dir #recursively create folders
os.system(exec_mkdir)
exec_put = "hadoop fs -put "+f_name.rstrip(".gpg")+ " "+hdfs_dir
os.system(exec_put)
shutil.rmtree(local_dir)
No comments:
Post a Comment