From 3a7608007a18b4ef191d70033fbd2d6bcdabff30 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 21:28:57 -0500 Subject: [PATCH 01/19] Add matplotlib arxiv workflow --- cluster_kwargs.yaml | 6 ++ tests/conftest.py | 1 + .../workflows/test_embarrassingly_parallel.py | 89 +++++++++++++++++++ 3 files changed, 96 insertions(+) create mode 100644 tests/workflows/test_embarrassingly_parallel.py diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 1b97fa00a3..f09cb691b9 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -31,6 +31,12 @@ parquet_cluster: n_workers: 15 worker_vm_types: [m5.xlarge] # 4 CPU, 16 GiB +# For tests/workflows/test_embarrassingly_parallel.py +embarrassingly_parallel_cluster: + n_workers: 100 + backend_options: + region: "us-east-1" # Same region as dataset + # For test_spill.py spill_cluster: n_workers: 5 diff --git a/tests/conftest.py b/tests/conftest.py index cef605a440..6cd685d230 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -530,6 +530,7 @@ def s3(): return s3fs.S3FileSystem( key=os.environ.get("AWS_ACCESS_KEY_ID"), secret=os.environ.get("AWS_SECRET_ACCESS_KEY"), + requester_pays=True, ) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py new file mode 100644 index 0000000000..11ed8dfd95 --- /dev/null +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -0,0 +1,89 @@ +import io +import tarfile +import uuid + +import coiled +import pandas as pd +import pytest +from dask.distributed import Client, wait + + +@pytest.fixture(scope="module") +def embarrassingly_parallel_cluster( + dask_env_variables, + cluster_kwargs, + gitlab_cluster_tags, +): + with coiled.Cluster( + f"embarrassingly-parallel-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=gitlab_cluster_tags, + **cluster_kwargs["embarrassingly_parallel_cluster"], + ) as cluster: + yield cluster + + +@pytest.fixture +def embarrassingly_parallel_client( + embarrassingly_parallel_cluster, + cluster_kwargs, + upload_cluster_dump, + benchmark_all, +): + n_workers = cluster_kwargs["embarrassingly_parallel_cluster"]["n_workers"] + with Client(embarrassingly_parallel_cluster) as client: + print(f"{client.dashboard_link = }") + embarrassingly_parallel_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): + yield client + + +def test_embarassingly_parallel(embarrassingly_parallel_client, s3): + # How popular is matplotlib? + directories = s3.ls("s3://arxiv/pdf") + + def extract(filename: str, fs): + """Extract and process one directory of arXiv data + + Returns + ------- + filename: str + contains_matplotlib: boolean + """ + out = [] + with fs.open(filename) as f: + bytes_ = f.read() + with io.BytesIO() as bio: + bio.write(bytes_) + bio.seek(0) + with tarfile.TarFile(fileobj=bio) as tf: + for member in tf.getmembers(): + if member.isfile() and member.name.endswith(".pdf"): + data = tf.extractfile(member).read() + out.append((member.name, b"matplotlib" in data.lower())) + return out + + futures = embarrassingly_parallel_client.map(extract, directories, fs=s3) + wait(futures) + # We had one error in one file. Let's just ignore and move on. + good = [future for future in futures if future.status == "finished"] + data = embarrassingly_parallel_client.gather(good) + + # Convert to Pandas + dfs = [pd.DataFrame(d, columns=["filename", "has_matplotlib"]) for d in data] + df = pd.concat(dfs) + + def filename_to_date(filename): + year = int(filename.split("/")[0][:2]) + month = int(filename.split("/")[0][2:4]) + if year > 80: + year = 1900 + year + else: + year = 2000 + year + + return pd.Timestamp(year=year, month=month, day=1) + + df["date"] = df.filename.map(filename_to_date) + df.groupby("date").has_matplotlib.mean() From 5aedaca7c47bcb23f7618a8b966484fb82091281 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 22:21:42 -0500 Subject: [PATCH 02/19] Remove stray print --- tests/workflows/test_embarrassingly_parallel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 11ed8dfd95..de2f263c01 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -32,7 +32,6 @@ def embarrassingly_parallel_client( ): n_workers = cluster_kwargs["embarrassingly_parallel_cluster"]["n_workers"] with Client(embarrassingly_parallel_cluster) as client: - print(f"{client.dashboard_link = }") embarrassingly_parallel_cluster.scale(n_workers) client.wait_for_workers(n_workers) client.restart() From 95585d42656525abf65e0a9f048806cef3268621 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 20 Mar 2023 22:22:59 -0500 Subject: [PATCH 03/19] Update fixture name --- tests/workflows/test_embarrassingly_parallel.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index de2f263c01..be19de4595 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -12,12 +12,12 @@ def embarrassingly_parallel_cluster( dask_env_variables, cluster_kwargs, - gitlab_cluster_tags, + github_cluster_tags, ): with coiled.Cluster( f"embarrassingly-parallel-{uuid.uuid4().hex[:8]}", environ=dask_env_variables, - tags=gitlab_cluster_tags, + tags=github_cluster_tags, **cluster_kwargs["embarrassingly_parallel_cluster"], ) as cluster: yield cluster From 10a529b634e9d1f1989d68373a295931795084bb Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Tue, 21 Mar 2023 12:43:11 -0500 Subject: [PATCH 04/19] Only use requester_pays for test_embarassingly_parallel --- tests/conftest.py | 17 +++++++++++------ tests/workflows/test_embarrassingly_parallel.py | 3 ++- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 7259548068..c9686b46a7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -526,12 +526,17 @@ def s3_storage_options(): @pytest.fixture(scope="session") -def s3(): - return s3fs.S3FileSystem( - key=os.environ.get("AWS_ACCESS_KEY_ID"), - secret=os.environ.get("AWS_SECRET_ACCESS_KEY"), - requester_pays=True, - ) +def s3(s3_storage_options): + return s3fs.S3FileSystem(**s3_storage_options) + + +@pytest.fixture +def s3_factory(s3_storage_options): + def _(**exta_options): + kwargs = {**s3_storage_options, **exta_options} + return s3fs.S3FileSystem(**kwargs) + + return _ @pytest.fixture(scope="session") diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index be19de4595..82a6a09cdf 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -39,8 +39,9 @@ def embarrassingly_parallel_client( yield client -def test_embarassingly_parallel(embarrassingly_parallel_client, s3): +def test_embarassingly_parallel(embarrassingly_parallel_client, s3_factory): # How popular is matplotlib? + s3 = s3_factory(requester_pays=True) directories = s3.ls("s3://arxiv/pdf") def extract(filename: str, fs): From 4504020bd8523dcd74df4f3b53e091fe868a2e7d Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Wed, 22 Mar 2023 16:00:09 -0500 Subject: [PATCH 05/19] Rerun CI From 96e66f5282a78b5b1463057abeec590ad1def9f0 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 23 Mar 2023 13:23:29 -0500 Subject: [PATCH 06/19] Update instance type --- cluster_kwargs.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f09cb691b9..f4a5d1a3dc 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -34,6 +34,9 @@ parquet_cluster: # For tests/workflows/test_embarrassingly_parallel.py embarrassingly_parallel_cluster: n_workers: 100 + # TODO: Remove the `m6i.xlarge` worker specification below + # once it's the default worker instance type + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB backend_options: region: "us-east-1" # Same region as dataset From 0779b48c9b30043e743fa0862300b0ab5bc813d4 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 23 Mar 2023 14:25:23 -0500 Subject: [PATCH 07/19] Run workflows on demand and during nightly cron job --- .github/workflows/tests.yml | 13 ++++++++++++- tests/conftest.py | 11 +++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7f4704a18e..973f7f8ab3 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -90,6 +90,17 @@ jobs: python ci/create_runtime_meta.py source ci/scripts/install_coiled_runtime.sh coiled_software_environment.yaml + - name: Determine if workflows should be run + # Run workflows on PRs with `workflows` label and nightly cron job + if: | + github.event_name == 'schedule' + || (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'workflows')) + run: | + # Put EXTRA_OPTIONS into $GITHUB_ENV so it can be used in subsequent workflow steps + export EXTRA_OPTIONS="--run-workflows" + echo $EXTRA_OPTIONS + echo EXTRA_OPTIONS=$EXTRA_OPTIONS >> $GITHUB_ENV + - name: Run Coiled Runtime Tests id: test env: @@ -100,7 +111,7 @@ jobs: DB_NAME: ${{ matrix.os }}-${{ matrix.runtime-version }}-py${{ matrix.python-version }}.db BENCHMARK: true CLUSTER_DUMP: always - run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ matrix.pytest_args }} + run: bash ci/scripts/run_tests.sh -n 4 --dist loadscope ${{ env.EXTRA_OPTIONS }} ${{ matrix.pytest_args }} - name: Dump coiled.Cluster kwargs run: cat cluster_kwargs.merged.yaml diff --git a/tests/conftest.py b/tests/conftest.py index c9686b46a7..f2ccad68ec 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,16 +57,19 @@ def pytest_addoption(parser): parser.addoption( "--benchmark", action="store_true", help="Collect benchmarking data for tests" ) + parser.addoption("--run-workflows", action="store_true", help="Run workflow tests") def pytest_collection_modifyitems(config, items): - if config.getoption("--run-latest"): - # --run-latest given in cli: do not skip latest coiled-runtime tests - return skip_latest = pytest.mark.skip(reason="need --run-latest option to run") + skip_workflows = pytest.mark.skip(reason="need --run-workflows option to run") for item in items: - if "latest_runtime" in item.keywords: + if not config.getoption("--run-latest") and "latest_runtime" in item.keywords: item.add_marker(skip_latest) + if not config.getoption("--run-workflows") and ( + (TEST_DIR / "workflows") in item.path.parents + ): + item.add_marker(skip_workflows) def get_coiled_runtime_version(): From 7fb67925a2b68849422b1edfcdc79aad5833aaa8 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 27 Mar 2023 12:29:07 -0500 Subject: [PATCH 08/19] Use specific range of years --- tests/workflows/test_embarrassingly_parallel.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 82a6a09cdf..565ab9cc19 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -44,6 +44,15 @@ def test_embarassingly_parallel(embarrassingly_parallel_client, s3_factory): s3 = s3_factory(requester_pays=True) directories = s3.ls("s3://arxiv/pdf") + # We only analyze files from 1991-2022 here in order to have a consistent data volume. + # This is benchmarking purposes only, as this dataset is updated monthly. + years = list(range(91, 100)) + list(range(23)) + directories = [ + d + for d in directories + if d.endswith(".tar") and int(d.split("_")[2][:2]) in years + ] + def extract(filename: str, fs): """Extract and process one directory of arXiv data From e4851df7402efe455cbd3c234602937dd20e2918 Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Mon, 27 Mar 2023 14:54:47 -0500 Subject: [PATCH 09/19] Light asserts --- tests/workflows/test_embarrassingly_parallel.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/workflows/test_embarrassingly_parallel.py b/tests/workflows/test_embarrassingly_parallel.py index 565ab9cc19..444f412b1e 100644 --- a/tests/workflows/test_embarrassingly_parallel.py +++ b/tests/workflows/test_embarrassingly_parallel.py @@ -95,4 +95,9 @@ def filename_to_date(filename): return pd.Timestamp(year=year, month=month, day=1) df["date"] = df.filename.map(filename_to_date) - df.groupby("date").has_matplotlib.mean() + result = df.groupby("date").has_matplotlib.mean() + # Some light validation to ensure results are consistent. + # This is only for benchmarking. + assert result.idxmin() == pd.Timestamp("1991-07-01") # Earliest timestamp + assert result.idxmax() == pd.Timestamp("2022-10-01") # Row with maximum value + assert result.ne(0).idxmax() == pd.Timestamp("2005-06-01") # First non-zero row From 26578859a200556816d7fa8b5f5d49061fece367 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:19:59 -0500 Subject: [PATCH 10/19] add workflow --- cluster_kwargs.yaml | 9 ++ tests/workflows/test_from_csv_to_parquet.py | 127 ++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 tests/workflows/test_from_csv_to_parquet.py diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index f4a5d1a3dc..5668aa64e4 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -58,3 +58,12 @@ test_work_stealing_on_straggling_worker: test_repeated_merge_spill: n_workers: 20 worker_vm_types: [m6i.large] + +# For tests/workflows/test_from_csv_to_parquet.py +from_csv_to_parquet_cluster: + n_workers: 5 + # TODO: Remove the `m6i.xlarge` worker specification below + # once it's the default worker instance type + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB + backend_options: + region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py new file mode 100644 index 0000000000..1b787ffb93 --- /dev/null +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -0,0 +1,127 @@ +import os +import uuid + +import coiled +import dask.dataframe as dd +import pytest +from distributed import Client, LocalCluster, wait # noqa + +LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") + + +@pytest.fixture(scope="module") +def from_csv_to_parquet_cluster( + dask_env_variables, + cluster_kwargs, + github_cluster_tags, +): + if LOCAL_RUN is not None: + with LocalCluster() as cluster: + yield cluster + else: + with coiled.Cluster( + f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=github_cluster_tags, + **cluster_kwargs["from_csv_to_parquet_cluster"], + ) as cluster: + yield cluster + + +@pytest.fixture +def from_csv_to_parquet_client( + from_csv_to_parquet_cluster, + cluster_kwargs, + upload_cluster_dump, + benchmark_all, +): + if LOCAL_RUN is not None: + with Client(from_csv_to_parquet_cluster) as client: + yield client + else: + n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] + with Client(from_csv_to_parquet_cluster) as client: + from_csv_to_parquet_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): + yield client + + +COLUMNSV1 = { + "GlobalEventID": "Int64", + "Day": "Int64", + "MonthYear": "Int64", + "Year": "Int64", + "FractionDate": "Float64", + "Actor1Code": "string[pyarrow]", + "Actor1Name": "string[pyarrow]", + "Actor1CountryCode": "string[pyarrow]", + "Actor1KnownGroupCode": "string[pyarrow]", + "Actor1EthnicCode": "string[pyarrow]", + "Actor1Religion1Code": "string[pyarrow]", + "Actor1Religion2Code": "string[pyarrow]", + "Actor1Type1Code": "string[pyarrow]", + "Actor1Type2Code": "string[pyarrow]", + "Actor1Type3Code": "string[pyarrow]", + "Actor2Code": "string[pyarrow]", + "Actor2Name": "string[pyarrow]", + "Actor2CountryCode": "string[pyarrow]", + "Actor2KnownGroupCode": "string[pyarrow]", + "Actor2EthnicCode": "string[pyarrow]", + "Actor2Religion1Code": "string[pyarrow]", + "Actor2Religion2Code": "string[pyarrow]", + "Actor2Type1Code": "string[pyarrow]", + "Actor2Type2Code": "string[pyarrow]", + "Actor2Type3Code": "string[pyarrow]", + "IsRootEvent": "Int64", + "EventCode": "string[pyarrow]", + "EventBaseCode": "string[pyarrow]", + "EventRootCode": "string[pyarrow]", + "QuadClass": "Int64", + "GoldsteinScale": "Float64", + "NumMentions": "Int64", + "NumSources": "Int64", + "NumArticles": "Int64", + "AvgTone": "Float64", + "Actor1Geo_Type": "Int64", + "Actor1Geo_Fullname": "string[pyarrow]", + "Actor1Geo_CountryCode": "string[pyarrow]", + "Actor1Geo_ADM1Code": "string[pyarrow]", + "Actor1Geo_Lat": "Float64", + "Actor1Geo_Long": "Float64", + "Actor1Geo_FeatureID": "string[pyarrow]", + "Actor2Geo_Type": "Int64", + "Actor2Geo_Fullname": "string[pyarrow]", + "Actor2Geo_CountryCode": "string[pyarrow]", + "Actor2Geo_ADM1Code": "string[pyarrow]", + "Actor2Geo_Lat": "Float64", + "Actor2Geo_Long": "Float64", + "Actor2Geo_FeatureID": "string[pyarrow]", + "ActionGeo_Type": "Int64", + "ActionGeo_Fullname": "string[pyarrow]", + "ActionGeo_CountryCode": "string[pyarrow]", + "ActionGeo_ADM1Code": "string[pyarrow]", + "ActionGeo_Lat": "Float64", + "ActionGeo_Long": "Float64", + "ActionGeo_FeatureID": "string[pyarrow]", + "DATEADDED": "Int64", + "SOURCEURL": "string[pyarrow]", +} + + +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): + s3 = s3_factory(anon=True) + df = dd.read_csv( + "s3://gdelt-open-data/events/*.csv", + names=COLUMNSV1.keys(), + sep="\t", + dtype=COLUMNSV1, + storage_options=s3.storage_options, + ) + + df = df.partitions[-10:] + + result = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + print(result) + assert df.GlobalEventID.dtype == "Int64" From 6eb04d6608a67555ec54e26fa58201092ddc5498 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:51:56 -0500 Subject: [PATCH 11/19] show something with use of pytest -s --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 5668aa64e4..b1aee6d1cc 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -64,6 +64,6 @@ from_csv_to_parquet_cluster: n_workers: 5 # TODO: Remove the `m6i.xlarge` worker specification below # once it's the default worker instance type - worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB + worker_vm_types: [t3.medium] # 2CPU, 4GiB backend_options: region: "us-east-1" # Same region as dataset diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 1b787ffb93..629aa805d0 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -122,6 +122,8 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): df = df.partitions[-10:] - result = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa - print(result) + future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + wait(future) + print(future.result()) + assert df.GlobalEventID.dtype == "Int64" From 222c6957c7c884f871269c30f9dca59d98516887 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:55:20 -0500 Subject: [PATCH 12/19] rm unnecessary noqa comments --- tests/workflows/test_from_csv_to_parquet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 629aa805d0..8c9ab0d1f2 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -4,7 +4,7 @@ import coiled import dask.dataframe as dd import pytest -from distributed import Client, LocalCluster, wait # noqa +from distributed import Client, LocalCluster, wait LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") @@ -122,7 +122,7 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): df = df.partitions[-10:] - future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) # noqa + future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) wait(future) print(future.result()) From fc4068758bfa429f3c1bacb94e5f5b739cafe392 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 11:59:11 -0500 Subject: [PATCH 13/19] var name --- tests/workflows/test_from_csv_to_parquet.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 8c9ab0d1f2..76d8e309b0 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -6,7 +6,7 @@ import pytest from distributed import Client, LocalCluster, wait -LOCAL_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") +LOCAL_WORKFLOW_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") @pytest.fixture(scope="module") @@ -15,7 +15,7 @@ def from_csv_to_parquet_cluster( cluster_kwargs, github_cluster_tags, ): - if LOCAL_RUN is not None: + if LOCAL_WORKFLOW_RUN is not None: with LocalCluster() as cluster: yield cluster else: @@ -35,7 +35,7 @@ def from_csv_to_parquet_client( upload_cluster_dump, benchmark_all, ): - if LOCAL_RUN is not None: + if LOCAL_WORKFLOW_RUN is not None: with Client(from_csv_to_parquet_cluster) as client: yield client else: From 670e3cc54f0c4de0ff3cc694f8462f865087d3d2 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Tue, 28 Mar 2023 13:00:24 -0500 Subject: [PATCH 14/19] adjust tests.yml based on James' suggestion --- .github/workflows/tests.yml | 75 +++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 973f7f8ab3..6d7adfdef1 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -31,44 +31,45 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.9"] - pytest_args: [tests] + # pytest_args: [tests] + pytest_args: [tests/workflows/test_from_csv_to_parquet.py] runtime-version: [upstream, latest, "0.2.1"] - include: - # Run stability tests on Python 3.8 - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.8" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python 3.10 - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: upstream - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: latest - os: ubuntu-latest - - pytest_args: tests/stability - python-version: "3.10" - runtime-version: "0.2.1" - os: ubuntu-latest - # Run stability tests on Python Windows and MacOS (latest py39 only) - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: windows-latest - - pytest_args: tests/stability - python-version: "3.9" - runtime-version: latest - os: macos-latest + # include: + # # Run stability tests on Python 3.8 + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.8" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python 3.10 + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: upstream + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: latest + # os: ubuntu-latest + # - pytest_args: tests/stability + # python-version: "3.10" + # runtime-version: "0.2.1" + # os: ubuntu-latest + # # Run stability tests on Python Windows and MacOS (latest py39 only) + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: windows-latest + # - pytest_args: tests/stability + # python-version: "3.9" + # runtime-version: latest + # os: macos-latest steps: - name: Checkout From 16b0277573741da90b47584ba13b0a1f772234b4 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Wed, 29 Mar 2023 18:00:43 -0500 Subject: [PATCH 15/19] write some parquet to s3 --- tests/workflows/test_from_csv_to_parquet.py | 25 ++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 76d8e309b0..9e408dd1ac 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -110,7 +110,11 @@ def from_csv_to_parquet_client( } -def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): +def drop_dupe_per_partition(df): + return df.drop_duplicates(subset=["SOURCEURL"], keep="first") + + +def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) df = dd.read_csv( "s3://gdelt-open-data/events/*.csv", @@ -121,9 +125,20 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory): ) df = df.partitions[-10:] + df = df.map_partitions(drop_dupe_per_partition) + national_paper = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) + df["national_paper"] = national_paper + df = df[df["national_paper"]] - future = from_csv_to_parquet_client.compute(df.GoldsteinScale.mean()) - wait(future) - print(future.result()) + if LOCAL_WORKFLOW_RUN: + output = "test-output" + else: + output = s3_url + "/from-csv-to-parquet/" - assert df.GlobalEventID.dtype == "Int64" + + to_pq = df.to_parquet(output, compute=False) + + future = from_csv_to_parquet_client.compute(to_pq) + wait(future) + newdf = dd.read_parquet(output) + assert "DATEADDED" in list(newdf.columns) From b557bc541ed588a6374532137dcb64dc6e0c76ec Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 6 Apr 2023 12:14:58 -0500 Subject: [PATCH 16/19] this version actually passes --- tests/workflows/test_from_csv_to_parquet.py | 59 +++++++-------------- 1 file changed, 20 insertions(+), 39 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 9e408dd1ac..5590eb64ce 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -15,17 +15,13 @@ def from_csv_to_parquet_cluster( cluster_kwargs, github_cluster_tags, ): - if LOCAL_WORKFLOW_RUN is not None: - with LocalCluster() as cluster: - yield cluster - else: - with coiled.Cluster( - f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", - environ=dask_env_variables, - tags=github_cluster_tags, - **cluster_kwargs["from_csv_to_parquet_cluster"], - ) as cluster: - yield cluster + with coiled.Cluster( + f"from-csv-to-parquet-{uuid.uuid4().hex[:8]}", + environ=dask_env_variables, + tags=github_cluster_tags, + **cluster_kwargs["from_csv_to_parquet_cluster"], + ) as cluster: + yield cluster @pytest.fixture @@ -35,17 +31,13 @@ def from_csv_to_parquet_client( upload_cluster_dump, benchmark_all, ): - if LOCAL_WORKFLOW_RUN is not None: - with Client(from_csv_to_parquet_cluster) as client: + n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] + with Client(from_csv_to_parquet_cluster) as client: + from_csv_to_parquet_cluster.scale(n_workers) + client.wait_for_workers(n_workers) + client.restart() + with upload_cluster_dump(client), benchmark_all(client): yield client - else: - n_workers = cluster_kwargs["from_csv_to_parquet_cluster"]["n_workers"] - with Client(from_csv_to_parquet_cluster) as client: - from_csv_to_parquet_cluster.scale(n_workers) - client.wait_for_workers(n_workers) - client.restart() - with upload_cluster_dump(client), benchmark_all(client): - yield client COLUMNSV1 = { @@ -110,10 +102,6 @@ def from_csv_to_parquet_client( } -def drop_dupe_per_partition(df): - return df.drop_duplicates(subset=["SOURCEURL"], keep="first") - - def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) df = dd.read_csv( @@ -122,23 +110,16 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): sep="\t", dtype=COLUMNSV1, storage_options=s3.storage_options, + on_bad_lines="skip", ) df = df.partitions[-10:] - df = df.map_partitions(drop_dupe_per_partition) - national_paper = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) - df["national_paper"] = national_paper + df = df.map_partitions( + lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") + ) + df["national_paper"] = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) df = df[df["national_paper"]] - if LOCAL_WORKFLOW_RUN: - output = "test-output" - else: - output = s3_url + "/from-csv-to-parquet/" - - - to_pq = df.to_parquet(output, compute=False) + output = s3_url + "/from-csv-to-parquet/" - future = from_csv_to_parquet_client.compute(to_pq) - wait(future) - newdf = dd.read_parquet(output) - assert "DATEADDED" in list(newdf.columns) + df.to_parquet(output) From ccedaf8b5aa91e1ff09b6a29559bf1c482aee145 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Thu, 6 Apr 2023 12:22:37 -0500 Subject: [PATCH 17/19] check if read works --- tests/workflows/test_from_csv_to_parquet.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index 5590eb64ce..fb4e235bad 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -123,3 +123,5 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): output = s3_url + "/from-csv-to-parquet/" df.to_parquet(output) + df = dd.read_parquet(output) + df.compute() From 37120c344042a317c539358d138b08464bb42a69 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 10 Apr 2023 20:13:54 -0500 Subject: [PATCH 18/19] works with some excluded files --- cluster_kwargs.yaml | 2 +- tests/workflows/test_from_csv_to_parquet.py | 27 +++++++++++++++++---- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index b1aee6d1cc..98c7b2fe21 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -61,7 +61,7 @@ test_repeated_merge_spill: # For tests/workflows/test_from_csv_to_parquet.py from_csv_to_parquet_cluster: - n_workers: 5 + n_workers: 30 # TODO: Remove the `m6i.xlarge` worker specification below # once it's the default worker instance type worker_vm_types: [t3.medium] # 2CPU, 4GiB diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index fb4e235bad..fea804be57 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -104,8 +104,26 @@ def from_csv_to_parquet_client( def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): s3 = s3_factory(anon=True) + + bad_files = [ + "gdelt-open-data/events/20161004.export.csv", + "gdelt-open-data/events/20170106.export.csv", + "gdelt-open-data/events/20170422.export.csv", + "gdelt-open-data/events/20170802.export.csv", + "gdelt-open-data/events/20170920.export.csv", + "gdelt-open-data/events/20171021.export.csv", + "gdelt-open-data/events/20180415.export.csv", + "gdelt-open-data/events/20180416.export.csv", + "gdelt-open-data/events/20180613.export.csv", + "gdelt-open-data/events/20180806.export.csv", + "gdelt-open-data/events/20190217.export.csv", + "gdelt-open-data/events/20190613.export.csv", + ] + files = s3.ls("s3://gdelt-open-data/events/")[120:] + files = [f"s3://{f}" for f in files if f not in bad_files] + df = dd.read_csv( - "s3://gdelt-open-data/events/*.csv", + files, names=COLUMNSV1.keys(), sep="\t", dtype=COLUMNSV1, @@ -113,15 +131,14 @@ def test_from_csv_to_parquet(from_csv_to_parquet_client, s3_factory, s3_url): on_bad_lines="skip", ) - df = df.partitions[-10:] df = df.map_partitions( lambda xdf: xdf.drop_duplicates(subset=["SOURCEURL"], keep="first") ) - df["national_paper"] = df.SOURCEURL.str.contains("washingtonpost|nytimes", regex=True) + df["national_paper"] = df.SOURCEURL.str.contains( + "washingtonpost|nytimes", regex=True + ) df = df[df["national_paper"]] output = s3_url + "/from-csv-to-parquet/" df.to_parquet(output) - df = dd.read_parquet(output) - df.compute() From b3cfbaa136b9a215bf46a9b32f086a8c1e69a198 Mon Sep 17 00:00:00 2001 From: Doug Davis Date: Mon, 10 Apr 2023 20:30:04 -0500 Subject: [PATCH 19/19] rm unnecessary line --- tests/workflows/test_from_csv_to_parquet.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/workflows/test_from_csv_to_parquet.py b/tests/workflows/test_from_csv_to_parquet.py index fea804be57..b2c8e861a1 100644 --- a/tests/workflows/test_from_csv_to_parquet.py +++ b/tests/workflows/test_from_csv_to_parquet.py @@ -6,8 +6,6 @@ import pytest from distributed import Client, LocalCluster, wait -LOCAL_WORKFLOW_RUN = os.environ.get("LOCAL_WORKFLOW_RUN") - @pytest.fixture(scope="module") def from_csv_to_parquet_cluster(