From 9cc4f563fe21ff6e8466b4ebd72526223c0702ca Mon Sep 17 00:00:00 2001 From: praspaliauskas Date: Tue, 8 May 2018 09:54:00 -0700 Subject: [PATCH 1/8] Added CloudFormation and SSM Parameter Store support --- README.md | 65 ++------ cf_template.yaml | 253 +++++++++++++++++++++++++++++++ create-biglambda-role.py | 37 ----- delete-biglambda-role.py | 17 --- policy.json | 94 ------------ src/python/driver.py | 78 ++++------ src/python/driverconfig.json | 24 --- src/python/jobinfo.json | 7 - src/python/lambdautils.py | 49 +++++- src/python/mapper.py | 5 +- src/python/reducer.py | 6 +- src/python/reducerCoordinator.py | 28 ++-- 12 files changed, 356 insertions(+), 307 deletions(-) create mode 100644 cf_template.yaml delete mode 100644 create-biglambda-role.py delete mode 100644 delete-biglambda-role.py delete mode 100755 policy.json delete mode 100644 src/python/driverconfig.json delete mode 100644 src/python/jobinfo.json diff --git a/README.md b/README.md index 90c25a3..915c41b 100755 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ By leveraging this framework, you can build a cost-effective pipeline to run ad * Cloudwatch log access (logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents) * X-Ray write access (xray:PutTraceSegments, xray:PutTelemetryRecords) -Check policy.json for a sample that you can use or extend. +Check cf_template.yaml that you can extend as needed. * To execute the driver locally, make sure that you configure your AWS profile with access to: * [S3](http://docs.aws.amazon.com/AmazonS3/latest/dev/example-policies-s3.html) @@ -33,62 +33,17 @@ Check policy.json for a sample that you can use or extend. To run the example, you must have the AWS CLI set up. Your credentials must have access to create and invoke Lambda and access to list, read, and write to a S3 bucket. -1. Create your S3 bucket to store the intermediaries and result -(remember to use your own bucket name due to S3 namespace) +1. Start CLoudFormation console and create new stack using cf_template.yaml. CloudFormation will create S3 bucket for the results, biglambda_role IAM role for AWS Lambda execution and appropriate inline policy, SSM Parameter Store parameters used by the Lambda functions. - $ aws s3 mb s3://biglambda-s3-bucket +2. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. -2. Update the policy.json with your S3 bucket name - - $ sed -i 's/s3:::MY-S3-BUCKET/s3:::biglambda-s3-bucket/' policy.json - -3. Create the IAM role with respective policy - - $ python create-biglambda-role.py - -4. Use the output ARN from the script. Set the serverless_mapreduce_role environment variable: - - $ export serverless_mapreduce_role=arn:aws:iam::MY-ACCOUNT-ID:role/biglambda_role - -5. Make edits to driverconfig.json and verify - - $ cat driverconfig.json - -6. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. - -7. Run the driver +3. Run the driver $ python driver.py -### Modifying the Job (driverconfig.json) - -For the jobBucket field, enter an S3 bucket in your account that you wish to use for the example. Make changes to the other fields if you have different source data, or if you have renamed the files. - -``` +### Modifying the Job -{ - "bucket": "big-data-benchmark", - "prefix": "pavlo/text/1node/uservisits/", - "jobBucket": "biglambda-s3-bucket", - "concurrentLambdas": 100, - "mapper": { - "name": "mapper.py", - "handler": "mapper.lambda_handler", - "zip": "mapper.zip" - }, - "reducer":{ - "name": "reducer.py", - "handler": "reducer.lambda_handler", - "zip": "reducer.zip" - }, - "reducerCoordinator":{ - "name": "reducerCoordinator.py", - "handler": "reducerCoordinator.lambda_handler", - "zip": "reducerCoordinator.zip" - }, -} - -``` +If needed you can modify cf_template.yaml and update CloudFormation stack ### Outputs @@ -111,10 +66,8 @@ smallya$ head –n 3 result To remove all resources created by this example, do the following: 1. Delete all objects from the S3 bucket listed in `jobBucket` created by the job. -1. Delete the Cloudwatch log groups for each of the Lambda functions created by the job. -1. Delete the created IAM role - - $ python delete-biglambda-role.py +2. Delete CloudFormation stack created for the job +3. Delete the Cloudwatch log groups for each of the Lambda functions created by the job. ## Languages * Python 2.7 (active development) @@ -179,4 +132,4 @@ Serverless MapReduce Cost: ``` ## License -This reference architecture sample is licensed under the Amazon Software License. +This reference architecture sample is licensed under the Amazon Software License. diff --git a/cf_template.yaml b/cf_template.yaml new file mode 100644 index 0000000..67b308f --- /dev/null +++ b/cf_template.yaml @@ -0,0 +1,253 @@ +AWSTemplateFormatVersion: 2010-09-09 +Resources: + SSMJobID: + Type: 'AWS::SSM::Parameter' + Properties: + Value: bl-release + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'jobId' + SSMMapCount: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 0 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'mapCount' + SSMReducerFunction: + Type: 'AWS::SSM::Parameter' + Properties: + Value: ' ' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducerFunction' + SSMBotoMaxConnections: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1000 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'boto_max_connections' + SSMBucket: + Type: 'AWS::SSM::Parameter' + Properties: + Value: big-data-benchmark + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'bucket' + SSMBucketPrefix: + Type: 'AWS::SSM::Parameter' + Properties: + Value: pavlo/text/1node/uservisits/ + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'prefix' + SSMJobBucket: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !Ref JobBucket + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'jobBucket' + SSMRegion: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !Ref Region + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'region' + SSMLambdaMemory: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1536 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaMemory' + SSMConcurrentLambdas: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 1000 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'concurrentLambdas' + SSMMapper: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "mapper.py","handler": "mapper.lambda_handler","zip": "mapper.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'mapper' + SSMReducer: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "reducer.py","handler": "reducer.lambda_handler","zip": "reducer.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducer' + SSMReducerCoordinator: + Type: 'AWS::SSM::Parameter' + Properties: + Value: '{ "name": "reducerCoordinator.py","handler": "reducerCoordinator.lambda_handler","zip": "reducerCoordinator.zip"}' + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'reducerCoordinator' + SSMLambdaReadTimeout: + Type: 'AWS::SSM::Parameter' + Properties: + Value: 300 + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaReadTimeout' + SSMLambdaExecutionRole: + Type: 'AWS::SSM::Parameter' + Properties: + Value: !GetAtt BigLambdaRole.Arn + Type: String + Name: !Join + - '/' + - - !Ref SSMPrefix + - 'lambdaExecutionRole' + BigLambdaRole: + Type: 'AWS::IAM::Role' + Properties: + RoleName: biglambda_role + AssumeRolePolicyDocument: + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: + - sts:AssumeRole + Policies: + - + PolicyName: BigLambdaPolicy + PolicyDocument: + Version: "2012-10-17" + Statement: + - + Action: + - 'lambda:AddPermission' + - 'lambda:CreateEventSourceMapping' + - 'lambda:CreateFunction' + - 'lambda:DeleteEventSourceMapping' + - 'lambda:DeleteFunction' + - 'lambda:GetEventSourceMapping' + - 'lambda:InvokeAsync' + - 'lambda:InvokeFunction' + - 'lambda:ListEventSourceMappings' + - 'lambda:RemovePermission' + - 'lambda:UpdateEventSourceMapping' + - 'lambda:UpdateFunctionCode' + - 'lambda:UpdateFunctionConfiguration' + Effect: Allow + Resource: "*" + - + Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:DeleteLogGroup' + - 'logs:DeleteLogStream' + - 'logs:GetLogEvents' + - 'logs:PutLogEvents' + Effect: Allow + Resource: "*" + - + Action: + - 'xray:PutTraceSegments' + - 'xray:PutTelemetryRecords' + Effect: Allow + Resource: "*" + - + Action: + - 'ssm:DescribeParameters' + Effect: Allow + Resource: "*" + - + Action: + - 's3:GetObject' + - 's3:ListBucket' + Effect: Allow + Resource: "*" + - + Action: + - 's3:DeleteObject' + - 's3:DeleteObjectVersion' + - 's3:GetBucketAcl' + - 's3:GetBucketLocation' + - 's3:GetBucketLogging' + - 's3:GetBucketNotification' + - 's3:GetBucketPolicy' + - 's3:GetObjectAcl' + - 's3:GetObjectVersion' + - 's3:GetObjectVersionAcl' + - 's3:ListAllMyBuckets' + - 's3:PutBucketNotification' + - 's3:PutObject' + - 's3:PutObjectAcl' + - 's3:PutObjectVersionAcl' + Effect: Allow + Resource: !Join + - '' + - - 'arn:aws:s3:::' + - !Ref JobBucket + - '/*' + - + Action: + - 'ssm:GetParameters' + - 'ssm:GetParametersByPath' + - 'ssm:GetParameter' + Effect: Allow + Resource: !Join + - '' + - - 'arn:aws:ssm:' + - !Ref Region + - ':' + - !Ref AWS::AccountId + - ':parameter' + - !Ref SSMPrefix + - '/*' + S3JobBucket: + Type: 'AWS::S3::Bucket' + Properties: + BucketName: !Ref JobBucket +Parameters: + SSMPrefix: + Type: String + Default: /biglambda + Description: Enter SSM parameter prefix. Must start with / (default is /biglambda). You will need to update SSM_PATH constant in lambdautils.py file with the value entered here. + JobBucket: + Type: String + Default: ENTER YOUR BUCKET HERE + Description: S3 bucket for results + Region: + Type: String + Default: us-west-2 + Description: AWS Region to use for AWS Lambda diff --git a/create-biglambda-role.py b/create-biglambda-role.py deleted file mode 100644 index 846f222..0000000 --- a/create-biglambda-role.py +++ /dev/null @@ -1,37 +0,0 @@ -import boto3,json,botocore -client = boto3.client('iam') - -trust_role = { - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "", - "Effect": "Allow", - "Principal": { - "Service": "lambda.amazonaws.com" - }, - "Action": "sts:AssumeRole" - } - ] -} - -rn='biglambda_role' -rp='biglambda_policy' - -try: - response = client.create_role(RoleName=rn,AssumeRolePolicyDocument=json.dumps(trust_role)) - print response['Role']['Arn'] - print "Success: done creating role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - -try: - with open('policy.json') as json_data: - response = client.put_role_policy(RoleName=rn,PolicyName=rp, - PolicyDocument=json.dumps(json.load(json_data)) - ) - print "Success: done adding inline policy to role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - - diff --git a/delete-biglambda-role.py b/delete-biglambda-role.py deleted file mode 100644 index 3129879..0000000 --- a/delete-biglambda-role.py +++ /dev/null @@ -1,17 +0,0 @@ -import boto3,botocore -client = boto3.client('iam') - -rn = 'biglambda_role' -rp = 'biglambda_policy' - -try: - response = client.delete_role_policy(RoleName=rn,PolicyName=rp) - print "Success: done deleting role policy" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) - -try: - response = client.delete_role(RoleName=rn) - print "Success: done deleting role" -except botocore.exceptions.ClientError as e: - print "Error: {0}".format(e) diff --git a/policy.json b/policy.json deleted file mode 100755 index 598949d..0000000 --- a/policy.json +++ /dev/null @@ -1,94 +0,0 @@ -{ - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "Stmt1475879631000", - "Effect": "Allow", - "Action": [ - "lambda:AddPermission", - "lambda:CreateEventSourceMapping", - "lambda:CreateFunction", - "lambda:DeleteEventSourceMapping", - "lambda:DeleteFunction", - "lambda:GetEventSourceMapping", - "lambda:InvokeAsync", - "lambda:InvokeFunction", - "lambda:ListEventSourceMappings", - "lambda:RemovePermission", - "lambda:UpdateEventSourceMapping", - "lambda:UpdateFunctionCode", - "lambda:UpdateFunctionConfiguration" - ], - "Resource": [ - "*" - ] - }, - - { - "Sid": "Stmt1475879730000", - "Effect": "Allow", - "Action": [ - "logs:CreateLogGroup", - "logs:CreateLogStream", - "logs:DeleteLogGroup", - "logs:DeleteLogStream", - "logs:GetLogEvents", - "logs:PutLogEvents" - ], - "Resource": [ - "*" - ] - }, - -{ - "Sid": "Stmt1475879783000", - "Effect": "Allow", - "Action": [ - "s3:DeleteObject", - "s3:DeleteObjectVersion", - "s3:GetBucketAcl", - "s3:GetBucketLocation", - "s3:GetBucketLogging", - "s3:GetBucketNotification", - "s3:GetBucketPolicy", - "s3:GetObjectAcl", - "s3:GetObjectVersion", - "s3:GetObjectVersionAcl", - "s3:ListAllMyBuckets", - "s3:PutBucketNotification", - "s3:PutObject", - "s3:PutObjectAcl", - "s3:PutObjectVersionAcl" - ], - "Resource": [ - "arn:aws:s3:::MY-S3-BUCKET/*" - ] - }, - - { - "Sid": "StmXrayAllow", - "Effect": "Allow", - "Action": [ - "xray:PutTraceSegments", - "xray:PutTelemetryRecords" - ], - "Resource": [ - "*" - ] - }, - - { - "Effect": "Allow", - "Action": [ - "s3:GetObject", - "s3:ListBucket" - ], - "Resource": [ - "*" - ] - } - - - - ] -} diff --git a/src/python/driver.py b/src/python/driver.py index 9e3509e..c16b368 100755 --- a/src/python/driver.py +++ b/src/python/driver.py @@ -40,69 +40,59 @@ logging.basicConfig(level='WARNING') logging.getLogger('aws_xray_sdk').setLevel(logging.ERROR) # collect all tracing samples -SAMPLING_RULES = {"version": 1, "default": {"fixed_target": 1, "rate": 1}} +SAMPLING_RULES={"version": 1, "default": {"fixed_target": 1,"rate": 1}} xray_recorder.configure(sampling_rules=SAMPLING_RULES) xray_recorder.begin_segment('Map Reduce Driver') # create an S3 session s3 = boto3.resource('s3') s3_client = boto3.client('s3') - -JOB_INFO = 'jobinfo.json' +ssm_client = boto3.client('ssm') ### UTILS #### @xray_recorder.capture('zipLambda') def zipLambda(fname, zipname): # faster to zip with shell exec - subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob(JOB_INFO) + - glob.glob("lambdautils.py")) + subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob("lambdautils.py")) @xray_recorder.capture('write_to_s3') def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) -@xray_recorder.capture('write_job_config') -def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): - fname = "jobinfo.json"; - with open(fname, 'w') as f: - data = json.dumps({ - "jobId": job_id, - "jobBucket" : job_bucket, - "mapCount": n_mappers, - "reducerFunction": r_func, - "reducerHandler": r_handler - }, indent=4); - f.write(data) - - ######### MAIN ############# -## JOB ID -job_id = "bl-release" - # Config -config = json.loads(open('driverconfig.json', 'r').read()) +config = lambdautils.load_config() # 1. Get all keys to be processed xray_recorder.begin_subsegment('Get all keys to be processed') + # init +job_id = config["jobId"] bucket = config["bucket"] job_bucket = config["jobBucket"] region = config["region"] -lambda_memory = config["lambdaMemory"] -concurrent_lambdas = config["concurrentLambdas"] -lambda_read_timeout = config["lambda_read_timeout"] +lambda_memory = int(config["lambdaMemory"]) +concurrent_lambdas = int(config["concurrentLambdas"]) +mapper_config = json.loads(config["mapper"]) +reducer_config = json.loads(config["reducer"]) +reducerCoordinator_config = json.loads(config["reducerCoordinator"]) +lambda_env = {'Variables': {'ssmPath': config['ssmPath']}} +lambda_read_timeout = int(config["lambdaReadTimeout"]) +ssm_path = config["ssmPath"] +lambda_execution_role = config["lambdaExecutionRole"] +boto_max_connections = int(config["boto_max_connections"]) # Setting longer timeout for reading lambda results and larger connections pool -lambda_config = Config(read_timeout=lambda_read_timeout, max_pool_connections=50) -lambda_client = boto3.client('lambda', config=lambda_config) +lambda_config=Config(read_timeout=lambda_read_timeout, max_pool_connections=boto_max_connections) +lambda_client = boto3.client('lambda',config=lambda_config) # Fetch all the keys that match the prefix all_keys = [] for obj in s3.Bucket(bucket).objects.filter(Prefix=config["prefix"]).all(): all_keys.append(obj) -bsize = lambdautils.compute_batch_size(all_keys, lambda_memory) +bsize = lambdautils.compute_batch_size(all_keys, lambda_memory, concurrent_lambdas) batches = lambdautils.batch_creator(all_keys, bsize) n_mappers = len(batches) document = xray_recorder.current_subsegment() @@ -120,31 +110,32 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): rc_lambda_name = L_PREFIX + "-rc-" + job_id; # write job config -write_job_config(job_id, job_bucket, n_mappers, reducer_lambda_name, config["reducer"]["handler"]); +ssm_client.put_parameter(Name=ssm_path+'mapCount',Value=str(n_mappers),Type='String',Overwrite=True) +ssm_client.put_parameter(Name=ssm_path+'reducerFunction',Value=reducer_lambda_name,Type='String',Overwrite=True) -zipLambda(config["mapper"]["name"], config["mapper"]["zip"]) -zipLambda(config["reducer"]["name"], config["reducer"]["zip"]) -zipLambda(config["reducerCoordinator"]["name"], config["reducerCoordinator"]["zip"]) +zipLambda(mapper_config["name"], mapper_config["zip"]) +zipLambda(reducer_config["name"], reducer_config["zip"]) +zipLambda(reducerCoordinator_config["name"], reducerCoordinator_config["zip"]) xray_recorder.end_subsegment() #Prepare Lambda functions # mapper xray_recorder.begin_subsegment('Create mapper Lambda function') -l_mapper = lambdautils.LambdaManager(lambda_client, s3_client, region, config["mapper"]["zip"], job_id, - mapper_lambda_name, config["mapper"]["handler"]) +l_mapper = lambdautils.LambdaManager(lambda_client, s3_client, region, mapper_config["zip"], job_id, + mapper_lambda_name, mapper_config["handler"], lambda_execution_role, lambda_memory) l_mapper.update_code_or_create_on_noexist() xray_recorder.end_subsegment() #Create mapper Lambda function # Reducer func xray_recorder.begin_subsegment('Create reducer Lambda function') -l_reducer = lambdautils.LambdaManager(lambda_client, s3_client, region, config["reducer"]["zip"], job_id, - reducer_lambda_name, config["reducer"]["handler"]) +l_reducer = lambdautils.LambdaManager(lambda_client, s3_client, region, reducer_config["zip"], job_id, + reducer_lambda_name, reducer_config["handler"], lambda_execution_role, lambda_memory) l_reducer.update_code_or_create_on_noexist() xray_recorder.end_subsegment() #Create reducer Lambda function # Coordinator xray_recorder.begin_subsegment('Create reducer coordinator Lambda function') -l_rc = lambdautils.LambdaManager(lambda_client, s3_client, region, config["reducerCoordinator"]["zip"], job_id, - rc_lambda_name, config["reducerCoordinator"]["handler"]) +l_rc = lambdautils.LambdaManager(lambda_client, s3_client, region, reducerCoordinator_config["zip"], job_id, + rc_lambda_name, reducerCoordinator_config["handler"], lambda_execution_role, lambda_memory) l_rc.update_code_or_create_on_noexist() # Add permission to the coordinator @@ -162,7 +153,7 @@ def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler): "totalS3Files": len(all_keys), "startTime": time.time() }) -xray_recorder.current_subsegment().put_metadata("Job data: ", data, "Write job data to S3") +xray_recorder.current_subsegment().put_metadata("Job data: ", data, "Write job data to S3"); write_to_s3(job_bucket, j_key, data, {}) xray_recorder.end_subsegment() #Write job data to S3 @@ -177,11 +168,8 @@ def invoke_lambda(batches, m_id): ''' lambda invoke function ''' - - #batch = [k['Key'] for k in batches[m_id-1]] batch = [k.key for k in batches[m_id-1]] - xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch)) - #print "invoking", m_id, len(batch) + xray_recorder.current_segment().put_annotation("batch_for_mapper_"+str(m_id), str(batch)); resp = lambda_client.invoke( FunctionName = mapper_lambda_name, InvocationType = 'RequestResponse', @@ -209,7 +197,7 @@ def invoke_lambda(batches, m_id): nm = min(concurrent_lambdas, n_mappers) results = pool.map(invoke_lambda_partial, Ids[mappers_executed: mappers_executed + nm]) mappers_executed += nm - xray_recorder.current_subsegment().put_metadata("Mapper lambdas executed: ", mappers_executed, "Invoke mappers") + xray_recorder.current_subsegment().put_metadata("Mapper lambdas executed: ", mappers_executed, "Invoke mappers"); pool.close() pool.join() diff --git a/src/python/driverconfig.json b/src/python/driverconfig.json deleted file mode 100644 index e9ac6ba..0000000 --- a/src/python/driverconfig.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "bucket": "big-data-benchmark", - "prefix": "pavlo/text/1node/uservisits/", - "jobBucket": "smallya-useast-1", - "region": "us-east-1", - "lambdaMemory": 1536, - "concurrentLambdas": 100, - "lambda_read_timeout": 300, - "mapper": { - "name": "mapper.py", - "handler": "mapper.lambda_handler", - "zip": "mapper.zip" - }, - "reducer":{ - "name": "reducer.py", - "handler": "reducer.lambda_handler", - "zip": "reducer.zip" - }, - "reducerCoordinator":{ - "name": "reducerCoordinator.py", - "handler": "reducerCoordinator.lambda_handler", - "zip": "reducerCoordinator.zip" - } -} diff --git a/src/python/jobinfo.json b/src/python/jobinfo.json deleted file mode 100644 index b49dd7b..0000000 --- a/src/python/jobinfo.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "jobBucket": "smallya-useast-1", - "mapCount": 29, - "reducerFunction": "BL-reducer-bl-release", - "reducerHandler": "reducer.lambda_handler", - "jobId": "bl-release" -} \ No newline at end of file diff --git a/src/python/lambdautils.py b/src/python/lambdautils.py index 76a59e9..314ea04 100755 --- a/src/python/lambdautils.py +++ b/src/python/lambdautils.py @@ -16,8 +16,10 @@ import botocore import os +SSM_PATH = '/biglambda/' + class LambdaManager(object): - def __init__ (self, l, s3, region, codepath, job_id, fname, handler, lmem=1536): + def __init__ (self, l, s3, region, codepath, job_id, fname, handler, role, lmem=1536): self.awslambda = l; self.region = "us-east-1" if region is None else region self.s3 = s3 @@ -25,7 +27,7 @@ def __init__ (self, l, s3, region, codepath, job_id, fname, handler, lmem=1536): self.job_id = job_id self.function_name = fname self.handler = handler - self.role = os.environ.get('serverless_mapreduce_role') + self.role = role self.memory = lmem self.timeout = 300 self.function_arn = None # set after creation @@ -45,7 +47,7 @@ def create_lambda_function(self): Description = self.function_name, MemorySize = self.memory, Timeout = self.timeout, - TracingConfig={'Mode':'Active'} + TracingConfig={'Mode':'PassThrough'} ) self.function_arn = response['FunctionArn'] print response @@ -127,7 +129,7 @@ def cleanup_logs(cls, func_name): response = log_client.delete_log_group(logGroupName='/aws/lambda/' + func_name) return response -def compute_batch_size(keys, lambda_memory, gzip=False): +def compute_batch_size(keys, lambda_memory, concurrent_lambdas): max_mem_for_data = 0.6 * lambda_memory * 1000 * 1000; size = 0.0 for key in keys: @@ -137,8 +139,11 @@ def compute_batch_size(keys, lambda_memory, gzip=False): size += key.size avg_object_size = size/len(keys) print "Dataset size: %s, nKeys: %s, avg: %s" %(size, len(keys), avg_object_size) - b_size = int(round(max_mem_for_data/avg_object_size)) - return b_size + if avg_object_size < max_mem_for_data and len(keys) < concurrent_lambdas: + b_size = 1 + else: + b_size = int(round(max_mem_for_data/avg_object_size)) + return b_size def batch_creator(all_keys, batch_size): ''' @@ -156,3 +161,35 @@ def batch_creator(all_keys, batch_size): if len(batch): batches.append(batch) return batches + +def load_config(): + ssm_client = boto3.client('ssm') + config_dict={} + + # based on https://gist.github.com/sonodar/b3c80c8b9e60f4e6dcda9108c46a6089 + def read_params(NextToken = None): + params = { + 'Path': SSM_PATH, + 'Recursive': False, + 'WithDecryption': False + } + if NextToken is not None: + params['NextToken'] = NextToken + return ssm_client.get_parameters_by_path(**params) + def parameters(): + NextToken = None + while True: + response = read_params(NextToken) + parameters = response['Parameters'] + if len(parameters) == 0: + break + for parameter in parameters: + yield parameter + if 'NextToken' not in response: + break + NextToken = response['NextToken'] + + config_dict['ssmPath']=SSM_PATH + for parameter in parameters(): + config_dict[parameter.get('Name').replace(SSM_PATH,'')]= parameter.get('Value') + return config_dict diff --git a/src/python/mapper.py b/src/python/mapper.py index b707986..ede6576 100644 --- a/src/python/mapper.py +++ b/src/python/mapper.py @@ -35,6 +35,7 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() + print event job_bucket = event['jobBucket'] src_bucket = event['bucket'] @@ -64,12 +65,8 @@ def lambda_handler(event, context): output[srcIp] += float(data[3]) except Exception, e: print e - #err += '%s' % e time_in_secs = (time.time() - start_time) - #timeTaken = time_in_secs * 1000000000 # in 10^9 - #s3DownloadTime = 0 - #totalProcessingTime = 0 pret = [len(src_keys), line_count, time_in_secs, err] mapper_fname = "%s/%s%s" % (job_id, TASK_MAPPER_PREFIX, mapper_id) metadata = { diff --git a/src/python/reducer.py b/src/python/reducer.py index 2e9dcbf..5b15908 100644 --- a/src/python/reducer.py +++ b/src/python/reducer.py @@ -24,7 +24,7 @@ import urllib2 import time -# create an S3 & Dynamo session +# create an S3 session s3 = boto3.resource('s3') s3_client = boto3.client('s3') @@ -39,6 +39,7 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() + print event job_bucket = event['jobBucket'] bucket = event['bucket'] @@ -69,9 +70,6 @@ def lambda_handler(event, context): print e time_in_secs = (time.time() - start_time) - #timeTaken = time_in_secs * 1000000000 # in 10^9 - #s3DownloadTime = 0 - #totalProcessingTime = 0 pret = [len(reducer_keys), line_count, time_in_secs] print "Reducer ouputput", pret diff --git a/src/python/reducerCoordinator.py b/src/python/reducerCoordinator.py index 9723cbc..5153466 100644 --- a/src/python/reducerCoordinator.py +++ b/src/python/reducerCoordinator.py @@ -23,6 +23,8 @@ import StringIO import time import urllib +import os + DEFAULT_REGION = "us-east-1"; @@ -37,6 +39,14 @@ s3_client = boto3.client('s3') lambda_client = boto3.client('lambda') +config = lambdautils.load_config() + +job_id = config["jobId"] +map_count = int(config["mapCount"]) +r_function_name = config["reducerFunction"] +lambda_memory = int(config["lambdaMemory"]) +concurrent_lambdas = int(config["concurrentLambdas"]) + # Write to S3 Bucket def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) @@ -59,8 +69,7 @@ def get_mapper_files(files): return ret def get_reducer_batch_size(keys): - #TODO: Paramertize memory size - batch_size = lambdautils.compute_batch_size(keys, 1536) + batch_size = lambdautils.compute_batch_size(keys, lambda_memory, concurrent_lambdas) return max(batch_size, 2) # At least 2 in a batch - Condition for termination def check_job_done(files): @@ -70,7 +79,7 @@ def check_job_done(files): return True return False -def get_reducer_state_info(files, job_id, job_bucket): +def get_reducer_state_info(files, job_bucket): reducers = []; max_index = 0; @@ -122,19 +131,12 @@ def lambda_handler(event, context): # Job Bucket. We just got a notification from this bucket bucket = event['Records'][0]['s3']['bucket']['name'] - #key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) - - config = json.loads(open('./jobinfo.json', "r").read()) - - job_id = config["jobId"] - map_count = config["mapCount"] - r_function_name = config["reducerFunction"] - r_handler = config["reducerHandler"] - ### Get Mapper Finished Count ### # Get job files files = s3_client.list_objects(Bucket=bucket, Prefix=job_id)["Contents"] + print "Listed files:" + print files if check_job_done(files) == True: print "Job done!!! Check the result file" @@ -148,7 +150,7 @@ def lambda_handler(event, context): if map_count == len(mapper_keys): # All the mappers have finished, time to schedule the reducers - stepInfo = get_reducer_state_info(files, job_id, bucket) + stepInfo = get_reducer_state_info(files, bucket) print "stepInfo", stepInfo From f15ed83faa243b62cd515f72392796b71eca56da Mon Sep 17 00:00:00 2001 From: Giedrius Praspaliauskas Date: Tue, 8 May 2018 10:19:17 -0700 Subject: [PATCH 2/8] Updated README.md to explain CloudFormation changes --- README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 915c41b..f433845 100755 --- a/README.md +++ b/README.md @@ -33,7 +33,10 @@ Check cf_template.yaml that you can extend as needed. To run the example, you must have the AWS CLI set up. Your credentials must have access to create and invoke Lambda and access to list, read, and write to a S3 bucket. -1. Start CLoudFormation console and create new stack using cf_template.yaml. CloudFormation will create S3 bucket for the results, biglambda_role IAM role for AWS Lambda execution and appropriate inline policy, SSM Parameter Store parameters used by the Lambda functions. +1. Start CloudFormation console and create new stack using cf_template.yaml. CloudFormation will create: +* S3 bucket for the results, +* biglambda_role IAM role for AWS Lambda execution with appropriate inline policy, +* SSM Parameter Store parameters used by the Lambda functions. 2. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. @@ -43,7 +46,7 @@ To run the example, you must have the AWS CLI set up. Your credentials must have ### Modifying the Job -If needed you can modify cf_template.yaml and update CloudFormation stack +You can modify cf_template.yaml and update CloudFormation stack. ### Outputs From 39050e77f67cb34997467937fd9eb174d48b867a Mon Sep 17 00:00:00 2001 From: praspaliauskas Date: Tue, 8 May 2018 13:34:33 -0700 Subject: [PATCH 3/8] Updates for SSM support --- src/python/driver.py | 24 +++++++++++++++++++++--- src/python/mapper.py | 5 ++++- src/python/reducer.py | 6 ++++-- src/python/reducerCoordinator.py | 31 +++++++++++++++---------------- 4 files changed, 44 insertions(+), 22 deletions(-) diff --git a/src/python/driver.py b/src/python/driver.py index c16b368..543efc6 100755 --- a/src/python/driver.py +++ b/src/python/driver.py @@ -49,16 +49,35 @@ s3_client = boto3.client('s3') ssm_client = boto3.client('ssm') +JOB_INFO = 'jobinfo.json' + ### UTILS #### @xray_recorder.capture('zipLambda') def zipLambda(fname, zipname): # faster to zip with shell exec - subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob("lambdautils.py")) + subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob(JOB_INFO) + + glob.glob("lambdautils.py")) @xray_recorder.capture('write_to_s3') def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) +@xray_recorder.capture('write_job_config') +def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler, lambdaMemory, concurrent_lambdas): + fname = "jobinfo.json"; + with open(fname, 'w') as f: + data = json.dumps({ + "jobId": job_id, + "jobBucket" : job_bucket, + "mapCount": n_mappers, + "reducerFunction": r_func, + "reducerHandler": r_handler, + "lambdaMemory": lambdaMemory, + "concurrentLambdas": concurrent_lambdas + }, indent=4); + f.write(data) + + ######### MAIN ############# # Config @@ -110,8 +129,7 @@ def write_to_s3(bucket, key, data, metadata): rc_lambda_name = L_PREFIX + "-rc-" + job_id; # write job config -ssm_client.put_parameter(Name=ssm_path+'mapCount',Value=str(n_mappers),Type='String',Overwrite=True) -ssm_client.put_parameter(Name=ssm_path+'reducerFunction',Value=reducer_lambda_name,Type='String',Overwrite=True) +write_job_config(job_id, job_bucket, n_mappers, reducer_lambda_name, reducer_config["handler"], lambda_memory, concurrent_lambdas) zipLambda(mapper_config["name"], mapper_config["zip"]) zipLambda(reducer_config["name"], reducer_config["zip"]) diff --git a/src/python/mapper.py b/src/python/mapper.py index ede6576..b707986 100644 --- a/src/python/mapper.py +++ b/src/python/mapper.py @@ -35,7 +35,6 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() - print event job_bucket = event['jobBucket'] src_bucket = event['bucket'] @@ -65,8 +64,12 @@ def lambda_handler(event, context): output[srcIp] += float(data[3]) except Exception, e: print e + #err += '%s' % e time_in_secs = (time.time() - start_time) + #timeTaken = time_in_secs * 1000000000 # in 10^9 + #s3DownloadTime = 0 + #totalProcessingTime = 0 pret = [len(src_keys), line_count, time_in_secs, err] mapper_fname = "%s/%s%s" % (job_id, TASK_MAPPER_PREFIX, mapper_id) metadata = { diff --git a/src/python/reducer.py b/src/python/reducer.py index 5b15908..2e9dcbf 100644 --- a/src/python/reducer.py +++ b/src/python/reducer.py @@ -24,7 +24,7 @@ import urllib2 import time -# create an S3 session +# create an S3 & Dynamo session s3 = boto3.resource('s3') s3_client = boto3.client('s3') @@ -39,7 +39,6 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() - print event job_bucket = event['jobBucket'] bucket = event['bucket'] @@ -70,6 +69,9 @@ def lambda_handler(event, context): print e time_in_secs = (time.time() - start_time) + #timeTaken = time_in_secs * 1000000000 # in 10^9 + #s3DownloadTime = 0 + #totalProcessingTime = 0 pret = [len(reducer_keys), line_count, time_in_secs] print "Reducer ouputput", pret diff --git a/src/python/reducerCoordinator.py b/src/python/reducerCoordinator.py index 5153466..5e0b66e 100644 --- a/src/python/reducerCoordinator.py +++ b/src/python/reducerCoordinator.py @@ -23,8 +23,6 @@ import StringIO import time import urllib -import os - DEFAULT_REGION = "us-east-1"; @@ -39,14 +37,6 @@ s3_client = boto3.client('s3') lambda_client = boto3.client('lambda') -config = lambdautils.load_config() - -job_id = config["jobId"] -map_count = int(config["mapCount"]) -r_function_name = config["reducerFunction"] -lambda_memory = int(config["lambdaMemory"]) -concurrent_lambdas = int(config["concurrentLambdas"]) - # Write to S3 Bucket def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) @@ -68,7 +58,7 @@ def get_mapper_files(files): ret.append(mf) return ret -def get_reducer_batch_size(keys): +def get_reducer_batch_size(keys, lambda_memory, concurrent_lambdas): batch_size = lambdautils.compute_batch_size(keys, lambda_memory, concurrent_lambdas) return max(batch_size, 2) # At least 2 in a batch - Condition for termination @@ -79,7 +69,7 @@ def check_job_done(files): return True return False -def get_reducer_state_info(files, job_bucket): +def get_reducer_state_info(files, job_id, job_bucket): reducers = []; max_index = 0; @@ -131,12 +121,21 @@ def lambda_handler(event, context): # Job Bucket. We just got a notification from this bucket bucket = event['Records'][0]['s3']['bucket']['name'] + #key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) + + config = json.loads(open('./jobinfo.json', "r").read()) + + job_id = config["jobId"] + map_count = config["mapCount"] + r_function_name = config["reducerFunction"] + r_handler = config["reducerHandler"] + lambda_memory = config["lambdaMemory"] + concurrent_lambdas = config["concurrentLambdas"] + ### Get Mapper Finished Count ### # Get job files files = s3_client.list_objects(Bucket=bucket, Prefix=job_id)["Contents"] - print "Listed files:" - print files if check_job_done(files) == True: print "Job done!!! Check the result file" @@ -150,7 +149,7 @@ def lambda_handler(event, context): if map_count == len(mapper_keys): # All the mappers have finished, time to schedule the reducers - stepInfo = get_reducer_state_info(files, bucket) + stepInfo = get_reducer_state_info(files, job_id, bucket) print "stepInfo", stepInfo @@ -162,7 +161,7 @@ def lambda_handler(event, context): return # Compute this based on metadata of files - r_batch_size = get_reducer_batch_size(reducer_keys); + r_batch_size = get_reducer_batch_size(reducer_keys, lambda_memory, concurrent_lambdas); print "Starting the the reducer step", step_number print "Batch Size", r_batch_size From 4b806423f506d3bc8880454598acee19da981dde Mon Sep 17 00:00:00 2001 From: Giedrius Praspaliauskas Date: Tue, 8 May 2018 13:35:53 -0700 Subject: [PATCH 4/8] Updates for SSM support --- src/python/driver.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/python/driver.py b/src/python/driver.py index c16b368..543efc6 100755 --- a/src/python/driver.py +++ b/src/python/driver.py @@ -49,16 +49,35 @@ s3_client = boto3.client('s3') ssm_client = boto3.client('ssm') +JOB_INFO = 'jobinfo.json' + ### UTILS #### @xray_recorder.capture('zipLambda') def zipLambda(fname, zipname): # faster to zip with shell exec - subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob("lambdautils.py")) + subprocess.call(['zip', zipname] + glob.glob(fname) + glob.glob(JOB_INFO) + + glob.glob("lambdautils.py")) @xray_recorder.capture('write_to_s3') def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) +@xray_recorder.capture('write_job_config') +def write_job_config(job_id, job_bucket, n_mappers, r_func, r_handler, lambdaMemory, concurrent_lambdas): + fname = "jobinfo.json"; + with open(fname, 'w') as f: + data = json.dumps({ + "jobId": job_id, + "jobBucket" : job_bucket, + "mapCount": n_mappers, + "reducerFunction": r_func, + "reducerHandler": r_handler, + "lambdaMemory": lambdaMemory, + "concurrentLambdas": concurrent_lambdas + }, indent=4); + f.write(data) + + ######### MAIN ############# # Config @@ -110,8 +129,7 @@ def write_to_s3(bucket, key, data, metadata): rc_lambda_name = L_PREFIX + "-rc-" + job_id; # write job config -ssm_client.put_parameter(Name=ssm_path+'mapCount',Value=str(n_mappers),Type='String',Overwrite=True) -ssm_client.put_parameter(Name=ssm_path+'reducerFunction',Value=reducer_lambda_name,Type='String',Overwrite=True) +write_job_config(job_id, job_bucket, n_mappers, reducer_lambda_name, reducer_config["handler"], lambda_memory, concurrent_lambdas) zipLambda(mapper_config["name"], mapper_config["zip"]) zipLambda(reducer_config["name"], reducer_config["zip"]) From 00e862a12600f4dd00b81d900b27b0b08fea3ec4 Mon Sep 17 00:00:00 2001 From: Giedrius Praspaliauskas Date: Tue, 8 May 2018 13:37:18 -0700 Subject: [PATCH 5/8] Updates for SSM support --- src/python/mapper.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/python/mapper.py b/src/python/mapper.py index ede6576..b707986 100644 --- a/src/python/mapper.py +++ b/src/python/mapper.py @@ -35,7 +35,6 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() - print event job_bucket = event['jobBucket'] src_bucket = event['bucket'] @@ -65,8 +64,12 @@ def lambda_handler(event, context): output[srcIp] += float(data[3]) except Exception, e: print e + #err += '%s' % e time_in_secs = (time.time() - start_time) + #timeTaken = time_in_secs * 1000000000 # in 10^9 + #s3DownloadTime = 0 + #totalProcessingTime = 0 pret = [len(src_keys), line_count, time_in_secs, err] mapper_fname = "%s/%s%s" % (job_id, TASK_MAPPER_PREFIX, mapper_id) metadata = { From 4aa439b4a1f5e1822669f256650fe31edae964bb Mon Sep 17 00:00:00 2001 From: Giedrius Praspaliauskas Date: Tue, 8 May 2018 13:37:45 -0700 Subject: [PATCH 6/8] Updates for SSM support --- src/python/reducer.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/python/reducer.py b/src/python/reducer.py index 5b15908..2e9dcbf 100644 --- a/src/python/reducer.py +++ b/src/python/reducer.py @@ -24,7 +24,7 @@ import urllib2 import time -# create an S3 session +# create an S3 & Dynamo session s3 = boto3.resource('s3') s3_client = boto3.client('s3') @@ -39,7 +39,6 @@ def write_to_s3(bucket, key, data, metadata): def lambda_handler(event, context): start_time = time.time() - print event job_bucket = event['jobBucket'] bucket = event['bucket'] @@ -70,6 +69,9 @@ def lambda_handler(event, context): print e time_in_secs = (time.time() - start_time) + #timeTaken = time_in_secs * 1000000000 # in 10^9 + #s3DownloadTime = 0 + #totalProcessingTime = 0 pret = [len(reducer_keys), line_count, time_in_secs] print "Reducer ouputput", pret From 2389120e2975cbe497fd5832bf3e521172466fce Mon Sep 17 00:00:00 2001 From: Giedrius Praspaliauskas Date: Tue, 8 May 2018 13:38:24 -0700 Subject: [PATCH 7/8] Updates for SSM support --- src/python/reducerCoordinator.py | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/src/python/reducerCoordinator.py b/src/python/reducerCoordinator.py index 5153466..5e0b66e 100644 --- a/src/python/reducerCoordinator.py +++ b/src/python/reducerCoordinator.py @@ -23,8 +23,6 @@ import StringIO import time import urllib -import os - DEFAULT_REGION = "us-east-1"; @@ -39,14 +37,6 @@ s3_client = boto3.client('s3') lambda_client = boto3.client('lambda') -config = lambdautils.load_config() - -job_id = config["jobId"] -map_count = int(config["mapCount"]) -r_function_name = config["reducerFunction"] -lambda_memory = int(config["lambdaMemory"]) -concurrent_lambdas = int(config["concurrentLambdas"]) - # Write to S3 Bucket def write_to_s3(bucket, key, data, metadata): s3.Bucket(bucket).put_object(Key=key, Body=data, Metadata=metadata) @@ -68,7 +58,7 @@ def get_mapper_files(files): ret.append(mf) return ret -def get_reducer_batch_size(keys): +def get_reducer_batch_size(keys, lambda_memory, concurrent_lambdas): batch_size = lambdautils.compute_batch_size(keys, lambda_memory, concurrent_lambdas) return max(batch_size, 2) # At least 2 in a batch - Condition for termination @@ -79,7 +69,7 @@ def check_job_done(files): return True return False -def get_reducer_state_info(files, job_bucket): +def get_reducer_state_info(files, job_id, job_bucket): reducers = []; max_index = 0; @@ -131,12 +121,21 @@ def lambda_handler(event, context): # Job Bucket. We just got a notification from this bucket bucket = event['Records'][0]['s3']['bucket']['name'] + #key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'].encode('utf8')) + + config = json.loads(open('./jobinfo.json', "r").read()) + + job_id = config["jobId"] + map_count = config["mapCount"] + r_function_name = config["reducerFunction"] + r_handler = config["reducerHandler"] + lambda_memory = config["lambdaMemory"] + concurrent_lambdas = config["concurrentLambdas"] + ### Get Mapper Finished Count ### # Get job files files = s3_client.list_objects(Bucket=bucket, Prefix=job_id)["Contents"] - print "Listed files:" - print files if check_job_done(files) == True: print "Job done!!! Check the result file" @@ -150,7 +149,7 @@ def lambda_handler(event, context): if map_count == len(mapper_keys): # All the mappers have finished, time to schedule the reducers - stepInfo = get_reducer_state_info(files, bucket) + stepInfo = get_reducer_state_info(files, job_id, bucket) print "stepInfo", stepInfo @@ -162,7 +161,7 @@ def lambda_handler(event, context): return # Compute this based on metadata of files - r_batch_size = get_reducer_batch_size(reducer_keys); + r_batch_size = get_reducer_batch_size(reducer_keys, lambda_memory, concurrent_lambdas); print "Starting the the reducer step", step_number print "Batch Size", r_batch_size From 3bdd4095a9fca7c67f1b76ce24f97b97d087983d Mon Sep 17 00:00:00 2001 From: praspaliauskas Date: Tue, 8 May 2018 16:31:15 -0700 Subject: [PATCH 8/8] Added Cloud9 IDE support --- README.md | 19 +++++++++++++++++++ cf_template.yaml | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/README.md b/README.md index f433845..d1a87a8 100755 --- a/README.md +++ b/README.md @@ -37,6 +37,7 @@ To run the example, you must have the AWS CLI set up. Your credentials must have * S3 bucket for the results, * biglambda_role IAM role for AWS Lambda execution with appropriate inline policy, * SSM Parameter Store parameters used by the Lambda functions. +* (Optionally) AWS Cloud9 IDE environment 2. [Run AWS X-Ray Daemon locally](https://docs.aws.amazon.com/xray/latest/devguide/xray-daemon-local.html), otherwise you will not be able to see traces from the local driver in AWS X-Ray console. However, traces from Reducer Coordinator Lambda functions will be present. @@ -44,6 +45,24 @@ To run the example, you must have the AWS CLI set up. Your credentials must have $ python driver.py +### AWS Cloud9 IDE +You can select AWS Cloud9 IDE instance type while creating CloudFormation stack. By default it is set to "None" (does not create IDE). After CloudFormation stack with instance type selected is created check Outputs section of the stack description for Cloud9 IDE URL. Code from this Git repository will be pulled to that instance already. You will need to install Boto3 and X-Ray Python SDK by running folowing commands in the IDE Bash tab: + + $ sudo python -m pip install boto3 + $ sudo python -m pip install aws-xray-sdk + +Navigate to the code location + + $ cd lambda-refarch-mapreduce/src/python + +Run the driver + + $ python driver.py + +If you'd like to run code from IDE directly make sure to update current working directory (CWD) in the default Runner or create new [Runner](https://docs.aws.amazon.com/cloud9/latest/user-guide/build-run-debug.html) + +Note that deleting CloudFormation stack will also delete Cloud9 IDE created as part of it. + ### Modifying the Job You can modify cf_template.yaml and update CloudFormation stack. diff --git a/cf_template.yaml b/cf_template.yaml index 67b308f..f301a00 100644 --- a/cf_template.yaml +++ b/cf_template.yaml @@ -238,7 +238,31 @@ Resources: Type: 'AWS::S3::Bucket' Properties: BucketName: !Ref JobBucket + Cloud9IDE: + Condition: CreateCloud9IDE + Type: AWS::Cloud9::EnvironmentEC2 + Properties: + Repositories: + - RepositoryUrl: https://github.com/giedri/lambda-refarch-mapreduce.git + PathComponent: lambda-refarch-mapreduce + Description: Lmabda MapReduce Cloud9 IDE + InstanceType: t2.medium + AutomaticStopTimeMinutes: 30 + Name: + Ref: AWS::StackName +Conditions: + CreateCloud9IDE: !Not [!Equals [ !Ref Cloud9InstanceType, None ]] Parameters: + Cloud9InstanceType: + Description: Select Cloud9 IDE instance type or None if you do not want IDE to be created + Default: None + Type: String + AllowedValues: + - None + - t2.micro + - t2.small + - t2.large + ConstraintDescription: Must specify instance type or select None SSMPrefix: Type: String Default: /biglambda @@ -251,3 +275,15 @@ Parameters: Type: String Default: us-west-2 Description: AWS Region to use for AWS Lambda +Outputs: + Cloud9URL: + Condition: CreateCloud9IDE + Value: + Fn::Join: + - '' + - - https:// + - Ref: Region + - .console.aws.amazon.com/cloud9/home/environments/ + - Ref: Cloud9IDE + Description: Lambda Map/Reduce Cloud9 environment +