diff --git a/create-biglambda-role.py b/create-biglambda-role.py index 846f222..f2cff9c 100644 --- a/create-biglambda-role.py +++ b/create-biglambda-role.py @@ -20,18 +20,18 @@ try: response = client.create_role(RoleName=rn,AssumeRolePolicyDocument=json.dumps(trust_role)) - print response['Role']['Arn'] - print "Success: done creating role" + print(response['Role']['Arn']) + print("Success: done creating role") except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) + print("Error: {0}".format(e)) try: with open('policy.json') as json_data: response = client.put_role_policy(RoleName=rn,PolicyName=rp, PolicyDocument=json.dumps(json.load(json_data)) ) - print "Success: done adding inline policy to role" + print("Success: done adding inline policy to role") except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) + print("Error: {0}".format(e)) diff --git a/delete-biglambda-role.py b/delete-biglambda-role.py index 3129879..3461f54 100644 --- a/delete-biglambda-role.py +++ b/delete-biglambda-role.py @@ -6,12 +6,12 @@ try: response = client.delete_role_policy(RoleName=rn,PolicyName=rp) - print "Success: done deleting role policy" + print("Success: done deleting role policy") except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) + print("Error: {0}".format(e)) try: response = client.delete_role(RoleName=rn) - print "Success: done deleting role" + print("Success: done deleting role") except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) + print("Error: {0}".format(e)) diff --git a/src/python/driver.py b/src/python/driver.py index cc313e1..f7ad88c 100755 --- a/src/python/driver.py +++ b/src/python/driver.py @@ -11,7 +11,7 @@ import math import random import re -import StringIO +from io import StringIO import sys import time @@ -172,7 +172,7 @@ def invoke_lambda(batches, m_id): #batch = [k['Key'] for k in batches[m_id-1]] batch = [k.key for k in batches[m_id-1]] xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch)) - #print "invoking", m_id, len(batch) + #print("invoking", m_id, len(batch)) resp = lambda_client.invoke( FunctionName = mapper_lambda_name, InvocationType = 'RequestResponse', @@ -186,10 +186,10 @@ def invoke_lambda(batches, m_id): ) out = eval(resp['Payload'].read()) mapper_outputs.append(out) - print "mapper output", out + print("mapper output", out) xray_recorder.end_segment() # Exec Parallel -print "# of Mappers ", n_mappers +print("# of Mappers ", n_mappers) pool = ThreadPool(n_mappers) Ids = [i+1 for i in range(n_mappers)] invoke_lambda_partial = partial(invoke_lambda, batches) @@ -205,7 +205,7 @@ def invoke_lambda(batches, m_id): pool.close() pool.join() -print "all the mappers finished" +print("all the mappers finished") xray_recorder.end_subsegment() #Invoke mappers # Delete Mapper function @@ -241,11 +241,11 @@ def invoke_lambda(batches, m_id): keys = [jk["Key"] for jk in job_keys] total_s3_size = sum([jk["Size"] for jk in job_keys]) - print "check to see if the job is done" + print("check to see if the job is done") # check job done if job_id + "/result" in keys: - print "job done" + print("job done") reducer_lambda_time += float(s3.Object(job_bucket, job_id + "/result").metadata['processingtime']) for key in keys: if "task/reducer" in key: @@ -268,14 +268,14 @@ def invoke_lambda(batches, m_id): lambda_cost = total_lambda_secs * 0.00001667 * lambda_memory/ 1024.0 s3_cost = (s3_get_cost + s3_put_cost + s3_storage_hour_cost) -# Print costs -print "Reducer L", reducer_lambda_time * 0.00001667 * lambda_memory/ 1024.0 -print "Lambda Cost", lambda_cost -print "S3 Storage Cost", s3_storage_hour_cost -print "S3 Request Cost", s3_get_cost + s3_put_cost -print "S3 Cost", s3_cost -print "Total Cost: ", lambda_cost + s3_cost -print "Total Lines:", total_lines +# printcosts +print("Reducer L", reducer_lambda_time * 0.00001667 * lambda_memory/ 1024.0) +print("Lambda Cost", lambda_cost) +print("S3 Storage Cost", s3_storage_hour_cost) +print("S3 Request Cost", s3_get_cost + s3_put_cost) +print("S3 Cost", s3_cost) +print("Total Cost: ", lambda_cost + s3_cost) +print("Total Lines:", total_lines) xray_recorder.end_subsegment() #Calculate cost # Delete Reducer function diff --git a/src/python/lambdautils.py b/src/python/lambdautils.py index 45479d0..dc90fdb 100755 --- a/src/python/lambdautils.py +++ b/src/python/lambdautils.py @@ -38,7 +38,7 @@ def create_lambda_function(self): TracingConfig={'Mode':'PassThrough'} ) self.function_arn = response['FunctionArn'] - print response + print(response) def update_function(self): ''' @@ -53,7 +53,7 @@ def update_function(self): # parse arn and remove the release number (:n) arn = ":".join(updated_arn.split(':')[:-1]) self.function_arn = arn - print response + print(response) def update_code_or_create_on_noexist(self): ''' @@ -73,7 +73,7 @@ def add_lambda_permission(self, sId, bucket): StatementId = '%s' % sId, SourceArn = 'arn:aws:s3:::' + bucket ) - print resp + print(resp) def create_s3_eventsource_notification(self, bucket, prefix=None): if not prefix: @@ -126,7 +126,7 @@ def compute_batch_size(keys, lambda_memory, concurrent_lambdas): else: size += key.size avg_object_size = size/len(keys) - print "Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size) + print("Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size)) if avg_object_size < max_mem_for_data and len(keys) < concurrent_lambdas: b_size = 1 else: diff --git a/src/python/mapper.py b/src/python/mapper.py index 1145aa4..0b12863 100644 --- a/src/python/mapper.py +++ b/src/python/mapper.py @@ -9,7 +9,7 @@ import json import random import resource -import StringIO +from io import StringIO import time # create an S3 session @@ -53,7 +53,7 @@ def lambda_handler(event, context): output[srcIp] = 0 output[srcIp] += float(data[3]) except Exception, e: - print e + print(e) #err += '%s' % e time_in_secs = (time.time() - start_time) @@ -68,7 +68,7 @@ def lambda_handler(event, context): "memoryUsage": '%s' % resource.getrusage(resource.RUSAGE_SELF).ru_maxrss } - print "metadata", metadata + print("metadata", metadata) write_to_s3(job_bucket, mapper_fname, json.dumps(output), metadata) return pret diff --git a/src/python/reducer.py b/src/python/reducer.py index a5fcc75..11d7ccc 100644 --- a/src/python/reducer.py +++ b/src/python/reducer.py @@ -9,7 +9,7 @@ import json import random import resource -import StringIO +from io import StringIO import urllib2 import time @@ -55,14 +55,14 @@ def lambda_handler(event, context): results[srcIp] = 0 results[srcIp] += float(val) except Exception, e: - print e + print(e) time_in_secs = (time.time() - start_time) #timeTaken = time_in_secs * 1000000000 # in 10^9 #s3DownloadTime = 0 #totalProcessingTime = 0 pret = [len(reducer_keys), line_count, time_in_secs] - print "Reducer ouputput", pret + print("Reducer ouputput", pret) if n_reducers == 1: # Last reducer file, final result diff --git a/src/python/reducerCoordinator.py b/src/python/reducerCoordinator.py index 2247715..1ceb7ef 100644 --- a/src/python/reducerCoordinator.py +++ b/src/python/reducerCoordinator.py @@ -10,7 +10,7 @@ import lambdautils import random import re -import StringIO +from io import StringIO import time import urllib @@ -127,33 +127,33 @@ def lambda_handler(event, context): files = s3_client.list_objects(Bucket=bucket, Prefix=job_id)["Contents"] if check_job_done(files) == True: - print "Job done!!! Check the result file" + print("Job done!!! Check the result file") # TODO: Delete reducer and coordinator lambdas return else: ### Stateless Coordinator logic mapper_keys = get_mapper_files(files) - print "Mappers Done so far ", len(mapper_keys) + print("Mappers Done so far ", len(mapper_keys)) if map_count == len(mapper_keys): # All the mappers have finished, time to schedule the reducers stepInfo = get_reducer_state_info(files, job_id, bucket) - print "stepInfo", stepInfo + print("stepInfo", stepInfo) step_number = stepInfo[0]; reducer_keys = stepInfo[1]; if len(reducer_keys) == 0: - print "Still waiting to finish Reducer step ", step_number + print("Still waiting to finish Reducer step ", step_number) return # Compute this based on metadata of files r_batch_size = get_reducer_batch_size(reducer_keys); - print "Starting the the reducer step", step_number - print "Batch Size", r_batch_size + print("Starting the the reducer step", step_number) + print("Batch Size", r_batch_size) # Create Batch params for the Lambda function r_batch_params = lambdautils.batch_creator(reducer_keys, r_batch_size); @@ -180,13 +180,13 @@ def lambda_handler(event, context): "reducerId": i }) ) - print resp + print(resp) # Now write the reducer state fname = "%s/reducerstate.%s" % (job_id, step_id) write_reducer_state(n_reducers, n_s3, bucket, fname) else: - print "Still waiting for all the mappers to finish .." + print("Still waiting for all the mappers to finish ..") ''' ev = { diff --git a/src/python/s3_download_benchmark.py b/src/python/s3_download_benchmark.py index 22af5ed..3fc5cb5 100644 --- a/src/python/s3_download_benchmark.py +++ b/src/python/s3_download_benchmark.py @@ -25,8 +25,8 @@ def lambda_handler(event, context): contents = response['Body'].read() time_in_secs = (time.time() - start_time) - print "Time taken (s)", time_in_secs - print "Size (MB)", total_bytes / 1024/1024 + print("Time taken (s)", time_in_secs) + print("Size (MB)", total_bytes / 1024/1024) return time_in_secs '''