Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions de-projects/22_airflow_intro/dags/adzuna_etl/adzuna_etl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.models import Variable
from datetime import datetime
from adzuna_etl.src.utils import get_adzuna_raw_data


"""
Home work

1. Split into modules and import them inside dag

2. Second approach (offload heavy job from airflow to outside)
create a new repo
inside repo, there should be docker container with repo code (ingestion / traformation logic)
save docker image into docker registry
inside dag, need to use docker executor to pull the image and start the container (and tasks are executed inside that container)

"""

# Shared folders mounted to all services to store raw and transformed data files
RAW_DATA_UNPROCESSED_FOLDER = "/opt/airflow/shared-data/raw_data/to_process/"

ADZUNA_APP_ID = Variable.get("ADZUNA_APP_ID")
ADZUNA_APP_KEY = Variable.get("ADZUNA_APP_KEY")


default_args = {
"owner": "Maksim",
"depends_on_past": False,
"start_date": datetime(2025, 3, 30),
}


with DAG(
dag_id = "adzuna_ingest_raw_data",
default_args = default_args,
description="Download data from Adzuna API",
schedule= None #timedelta(hours=1) to run every hour; timedelta(days=1) to run every 24 hours
) as dag:

extract_raw_data_task = PythonOperator(
task_id = "extract_raw_data_task",
python_callable = get_adzuna_raw_data,
op_args = [ADZUNA_APP_ID, ADZUNA_APP_KEY, RAW_DATA_UNPROCESSED_FOLDER]
)

extract_raw_data_task
72 changes: 72 additions & 0 deletions de-projects/22_airflow_intro/dags/adzuna_etl/src/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datetime import datetime
import pandas as pd
import requests
import json
import math
import os


def get_adzuna_raw_data(ADZUNA_APP_ID, ADZUNA_APP_KEY, RAW_DATA_UNPROCESSED_FOLDER):

# Define the API endpoint and base parameters
url = "https://api.adzuna.com/v1/api/jobs/ca/search/"
base_params = {
'app_id': ADZUNA_APP_ID,
'app_key': ADZUNA_APP_KEY,
'results_per_page': 50, # Maximum allowed results per page
'what_phrase': "data engineer",
'max_days_old': 2,
'sort_by': "date"
}
print("Adzuna Function triggered to extract raw json data from Adzuna API.")

# Initialize a list to store all job postings
all_job_postings = []

# Make the first request to determine the total number of pages
print("Making the first request to determine the total number of pages")
response = requests.get(f"{url}1", params=base_params)

if response.status_code != 200:
error_message = f"Error fetching page 1: {response.status_code}, {response.text}"
print(error_message)

data = response.json() # Parse the JSON response
total_results = data.get('count', 0)
results_per_page = base_params['results_per_page']

# Calculate the total number of pages
total_pages = math.ceil(total_results / results_per_page)
print(f"Total number of pages = {total_pages}")

# Store the results from the first page
all_job_postings.extend(data.get('results', []))

# Loop through the remaining pages and request data from each
print("Looping through the remaining pages to request data from each")
for page in range(2, total_pages + 1): # Start from page 2
response = requests.get(f"{url}{page}", params=base_params)
if response.status_code == 200:
page_data = response.json()
all_job_postings.extend(page_data.get('results', []))
else:
print(f"Error fetching page {page}: {response.status_code}, {response.text}")

print(f"Total jobs retrieved: {len(all_job_postings)}")

raw_json_data = json.dumps({"items": all_job_postings})
raw_json_bytes = raw_json_data.encode('utf-8')

# Generate a filename with the current timestamp to store raw data
# Create RAW_DATA_UNPROCESSED_FOLDER if it doesn't exist
os.makedirs(RAW_DATA_UNPROCESSED_FOLDER, exist_ok=True)
current_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"adzuna_raw_data_{current_timestamp}.json"
file_path = RAW_DATA_UNPROCESSED_FOLDER + file_name
print(f"File name to store raw data: {file_path}")

with open(file_path, "wb") as file:
file.write(raw_json_bytes)
print("Done")

return file_path
1 change: 1 addition & 0 deletions de-projects/22_airflow_intro/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ x-airflow-common:
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
- ${AIRFLOW_PROJ_DIR:-.}/raw_data:/opt/airflow/shared-data/raw_data/to_process/
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
Expand Down
Empty file.
Loading