|
| 1 | +import functions_framework |
| 2 | +import os |
| 3 | +import re |
| 4 | +import json |
| 5 | +import hashlib |
| 6 | +import csv |
| 7 | +import json |
| 8 | +import threading |
| 9 | +import requests |
| 10 | +import pandas as pd |
| 11 | +import threading |
| 12 | +from io import StringIO |
| 13 | +from google.cloud import storage |
| 14 | + |
| 15 | + |
| 16 | +def hash_word(word, num_files): |
| 17 | + # Hash the word and get an integer value |
| 18 | + hash_value = int(hashlib.sha256(word.encode('utf-8')).hexdigest(), 16) |
| 19 | + # Use modulo to map the hash value to a file index |
| 20 | + file_index = hash_value % num_files |
| 21 | + return file_index |
| 22 | + |
| 23 | +def create_json_files(mapperid): |
| 24 | + url = f"https://us-central1-piyush-chaudhari-fall2023.cloudfunctions.net/create_json_file{mapperid}" |
| 25 | + parameters = {"mapperid" : mapperid} |
| 26 | + r = requests.post(url, json=parameters) |
| 27 | + r.content.decode() |
| 28 | + |
| 29 | + |
| 30 | +@functions_framework.http |
| 31 | +def groupby(request): |
| 32 | + request_json = request.get_json(silent=True) |
| 33 | + client = storage.Client.from_service_account_json('piyush-chaudhari-fall2023-9ae1ed20a7f3.json') |
| 34 | + # gcs_bucket_name = 'eccmrbucket' |
| 35 | + # bucket = client.get_bucket(gcs_bucket_name) |
| 36 | + |
| 37 | + mapper_bucket_name = "mapper_bucket" |
| 38 | + groupby_bucket_name = "groupby_bucket" |
| 39 | + mapper_bucket = client.get_bucket(mapper_bucket_name) |
| 40 | + groupby_bucket = client.get_bucket(groupby_bucket_name) |
| 41 | + |
| 42 | + number_of_mappers = request_json['number_of_mappers'] # indexing start from 0 {eg. mapper0, mapper1 ...} |
| 43 | + |
| 44 | + print("*************GROUPBY: First Step Started*************") |
| 45 | + # first create individual json files |
| 46 | + |
| 47 | + threads = [] |
| 48 | + for mapperid in range(0, number_of_mappers): |
| 49 | + aggregated_dict = dict() |
| 50 | + folder_name = f"mapper{mapperid}" # mapper folder name |
| 51 | + file_name = f"mapper{mapperid}.csv" # individual mapper csv file |
| 52 | + file_path = f"{folder_name}/{file_name}" |
| 53 | + # print('file_path:', file_path) |
| 54 | + blob = mapper_bucket.blob(file_path) |
| 55 | + if not blob.exists(): |
| 56 | + continue |
| 57 | + thread = threading.Thread(target=create_json_files, args=(mapperid,)) |
| 58 | + threads.append(thread) |
| 59 | + |
| 60 | + # Start all threads |
| 61 | + for thread in threads: |
| 62 | + thread.start() |
| 63 | + |
| 64 | + # Join all threads |
| 65 | + for thread in threads: |
| 66 | + thread.join() |
| 67 | + |
| 68 | + print("*************GROUPBY: First Step Done*************") |
| 69 | + # second merge those created individual json files to a single json file |
| 70 | + groupby = {} |
| 71 | + for mapperid in range(0, number_of_mappers): |
| 72 | + folder_name = f"mapper{mapperid}" # mapper folder name |
| 73 | + file_name = f"mapper{mapperid}.json" # individual mapper json file |
| 74 | + file_path = f"{folder_name}/{file_name}" |
| 75 | + # print('file_path:', file_path) |
| 76 | + blob = mapper_bucket.blob(file_path) |
| 77 | + if not blob.exists(): |
| 78 | + continue |
| 79 | + # Download the content of the file as text |
| 80 | + content_text = blob.download_as_text() |
| 81 | + json_object = json.loads(content_text) |
| 82 | + # print(json_object["electronic"]) |
| 83 | + |
| 84 | + if mapperid == 0: |
| 85 | + groupby = json_object |
| 86 | + continue |
| 87 | + |
| 88 | + # otherwise merging process should start (from mapperid1) |
| 89 | + for ipkey in json_object.keys(): |
| 90 | + if ipkey in groupby.keys(): |
| 91 | + # word found now check for filenames |
| 92 | + for ipfilename in json_object[ipkey].keys(): |
| 93 | + if ipfilename in groupby[ipkey].keys(): |
| 94 | + groupby[ipkey][ipfilename] += json_object[ipkey][ipfilename] |
| 95 | + else: |
| 96 | + groupby[ipkey][ipfilename] = json_object[ipkey][ipfilename] |
| 97 | + else: |
| 98 | + groupby[ipkey] = json_object[ipkey] |
| 99 | + |
| 100 | + # Create a blob (file) in the specified folder |
| 101 | + groupbyblob = groupby_bucket.blob(f"groupby/groupby.json") |
| 102 | + # Convert the JSON data to a string |
| 103 | + json_string = json.dumps(groupby, indent=4) |
| 104 | + # Upload the JSON data to the specified file in Google Cloud Storage |
| 105 | + groupbyblob.upload_from_string(json_string, content_type="application/json") |
| 106 | + |
| 107 | + print("*************GROUPBY: Second Step Done*************") |
| 108 | + # third allocate keys to reducersid by creating dictionary <key: reducerid, value: list of keys []> |
| 109 | + keys_to_reducer = {} |
| 110 | + number_of_reducers = request_json['number_of_reducers'] |
| 111 | + |
| 112 | + # creating empty structure |
| 113 | + for reducerid in range(0, number_of_reducers): |
| 114 | + keys_to_reducer[f"reducer{reducerid}"] = [] |
| 115 | + |
| 116 | + for key in groupby.keys(): |
| 117 | + generatedid = hash_word(key, number_of_reducers) |
| 118 | + reducerid = f"reducer{generatedid}" |
| 119 | + keys_to_reducer[reducerid].append(key) |
| 120 | + |
| 121 | + # Create a blob (file) in the specified folder |
| 122 | + keys_to_reducerblob = groupby_bucket.blob(f"groupby/keys_to_reducer.json") |
| 123 | + # Convert the JSON data to a string |
| 124 | + json_string = json.dumps(keys_to_reducer, indent=4) |
| 125 | + # Upload the JSON data to the specified file in Google Cloud Storage |
| 126 | + keys_to_reducerblob.upload_from_string(json_string, content_type="application/json") |
| 127 | + print(f"groupby OK") |
| 128 | + |
| 129 | + return f"groupby OK" |
| 130 | + |
| 131 | + |
| 132 | + |
| 133 | + |
| 134 | + |
0 commit comments