diff --git a/environment/environment.yml b/environment/environment.yml index 3de64b3..a1bd8a7 100644 --- a/environment/environment.yml +++ b/environment/environment.yml @@ -2,34 +2,31 @@ name: pyciam channels: - conda-forge dependencies: - - bottleneck=1.3 - - bokeh=2.4.3 # for use of dask dashboard - - cartopy=0.21.1 - - cloudpathlib=0.13 - - distributed=2023.3.1 - - flox=0.6.8 - - fsspec=2023.3.0 - - geopandas=0.12.2 - - gitpython=3.1.31 # unmarked dependency of rhg_compute_tools - - jupyterlab=3.6.1 - - matplotlib=3.7.1 - - numpy=1.24.2 - - numpy_groupies=0.9.20 - - oct2py=5.6.0 - - octave=7.3.0 - - openpyxl=3.1.1 - - pandas=1.5.3 - - papermill=2.3.4 - - pint-xarray=0.3 - - pip=23.0.1 - - python=3.11.0 - - requests=2.28.2 - - scikit-learn=1.2.2 - - scipy=1.10.1 - - tqdm=4.65.0 - - xarray=2023.2.0 - - zarr=2.14.2 + - bottleneck + - bokeh # for use of dask dashboard + - cartopy + - cloudpathlib + - distributed + - flox + - fsspec + - geopandas + - gitpython # unmarked dependency of rhg_compute_tools + - jupyterlab + - matplotlib + - numpy + - openpyxl + - pandas + - papermill + - pint-xarray + - pip + - python=3.14 + - requests + - scikit-learn + - scipy + - tqdm + - xarray + - zarr - pip: - - python-ciam==1.1 - - rhg_compute_tools==1.3 - - parameterize_jobs==0.1 \ No newline at end of file + - python-ciam + - rhg_compute_tools + - parameterize_jobs \ No newline at end of file diff --git a/pyCIAM/io.py b/pyCIAM/io.py index bd211ac..b556ab3 100644 --- a/pyCIAM/io.py +++ b/pyCIAM/io.py @@ -22,7 +22,7 @@ from fsspec import FSTimeoutError from fsspec.implementations.zip import ZipFileSystem -from pyCIAM.utils import copy +from pyCIAM.utils import _get_exp_year, copy from pyCIAM.utils import spherical_nearest_neighbor as snn from .utils import _s2d @@ -36,7 +36,7 @@ def prep_sliiders( selectors={}, calc_popdens_with_wetland_area=True, expand_exposure=True, - storage_options={}, + storage_options=None, ): """Import the SLIIDERS dataset (or a different dataset formatted analogously). @@ -102,7 +102,10 @@ def prep_sliiders( """ inputs_all = xr.open_zarr( str(input_store), chunks=None, storage_options=storage_options - ).sel(selectors, drop=True) + ) + inputs_all = inputs_all.sel( + {k: v for k, v in selectors.items() if k in inputs_all.dims}, drop=True + ) inputs = inputs_all.sel({seg_var: seg_vals}) inputs = _s2d(inputs).assign(constants) @@ -124,19 +127,16 @@ def prep_sliiders( * (inputs.ypcc / ref_income) ** inputs.vsl_inc_elast ) - if expand_exposure and "pop" not in inputs.data_vars: - exp_year = [ - v for v in inputs.data_vars if v.startswith("pop_") and "scale" not in v - ] - assert len(exp_year) == 1, exp_year - exp_year = exp_year[0].split("_")[1] - pop_var = "pop_" + exp_year - inputs["pop"] = inputs[pop_var] * inputs.pop_scale - inputs = inputs.drop(pop_var) - if expand_exposure and "K" not in inputs.data_vars: - K_var = "K_" + exp_year - inputs["K"] = inputs[K_var] * inputs.K_scale - inputs = inputs.drop(K_var) + if expand_exposure: + exp_year = _get_exp_year(inputs) + if "pop" not in inputs.data_vars: + pop_var = f"pop_{exp_year}" + inputs["pop"] = inputs[pop_var] * inputs.pop_scale + inputs = inputs.drop(pop_var) + if "K" not in inputs.data_vars: + K_var = f"K_{exp_year}" + inputs["K"] = inputs[K_var] * inputs.K_scale + inputs = inputs.drop(K_var) if "dfact" not in inputs.data_vars and "npv_start" in inputs.data_vars: inputs["dfact"] = (1 / (1 + inputs.dr)) ** (inputs.year - inputs.npv_start) @@ -196,30 +196,28 @@ def _load_scenario_mc( include_cc=True, quantiles=None, ncc_name="ncc", - storage_options={}, + storage_options=None, ): scen_mc_filter = xr.open_zarr( str(slr_store), chunks=None, storage_options=storage_options )[["scenario", mc_dim]] if quantiles is not None: if mc_dim == "quantile": - scen_mc_filter = scen_mc_filter.sel(quantile=quantiles) + scen_mc_filter = scen_mc_filter.sel(quantile=quantiles).sortby( + ["scenario", "quantile"] + ) else: - scen_mc_filter = scen_mc_filter.quantile(quantiles, dim=mc_dim) - - scen_mc_filter = ( - scen_mc_filter.to_dataframe().sort_values(["scenario", mc_dim]).index - ) + scen_mc_filter = scen_mc_filter.scenario.sortby("scenario") + scen_mc_filter = scen_mc_filter.to_dataframe().index if include_ncc: - scen_mc_filter = scen_mc_filter.append( - pd.MultiIndex.from_product( - ( - [ncc_name], - scen_mc_filter.get_level_values(mc_dim).unique().sort_values(), - ), - names=["scenario", mc_dim], - ) + scen_mc_filter = scen_mc_filter.union( + pd.DataFrame(index=scen_mc_filter) + .reset_index() + .assign(scenario=ncc_name) + .drop_duplicates() + .set_index(scen_mc_filter.names) + .index ) if not include_cc: @@ -239,9 +237,10 @@ def _load_lslr_for_ciam( mc_dim="mc_sample_id", lsl_var="lsl_msl05", lsl_ncc_var="lsl_ncc_msl05", + site_id_dim="site_id", ncc_name="ncc", slr_0_year=2005, - storage_options={}, + storage_options=None, quantiles=None, ): if scen_mc_filter is None: @@ -256,7 +255,12 @@ def _load_lslr_for_ciam( ) wcc = scen_mc_filter.get_level_values("scenario") != ncc_name - scen_mc_ncc = scen_mc_filter[~wcc].droplevel("scenario").values + + # it's possible that the scen_mc_filter only filters scenarios not monte carlos + if mc_dim in scen_mc_filter.names: + scen_mc_ncc = scen_mc_filter[~wcc].droplevel("scenario").values + else: + scen_mc_ncc = None scen_mc_xr_wcc = ( scen_mc_filter[wcc] .to_frame() @@ -269,45 +273,56 @@ def _load_lslr_for_ciam( # select the nearest SLR locations to the passed locations slr = _s2d( - slr.sel(site_id=get_nearest_slrs(slr, lonlats).to_xarray()).drop("site_id") + slr.sel({site_id_dim: get_nearest_slrs(slr, lonlats).to_xarray()}).drop( + site_id_dim + ) ).drop(["lat", "lon"], errors="ignore") # select only the scenarios we wish to model if len(scen_mc_xr_wcc.scen_mc): slr_out = ( slr[lsl_var] - .sel({"scenario": scen_mc_xr_wcc.scenario, mc_dim: scen_mc_xr_wcc[mc_dim]}) - .set_index(scen_mc=["scenario", mc_dim]) + .sel({k: scen_mc_xr_wcc[k] for k in scen_mc_xr_wcc.data_vars}) + .set_index(scen_mc=list(scen_mc_xr_wcc.data_vars.keys())) ) else: slr_out = xr.DataArray( [], dims=("scen_mc",), coords={ - "scen_mc": pd.MultiIndex.from_tuples([], names=["scenario", mc_dim]) + "scen_mc": pd.MultiIndex.from_tuples( + [], names=list(scen_mc_xr_wcc.data_vars.keys()) + ) + if mc_dim in scen_mc_xr_wcc.data_vars + else pd.Index([], name="scen_mc") }, ) - if len(scen_mc_ncc): - slr_ncc = ( - slr[lsl_ncc_var] - .sel({mc_dim: scen_mc_ncc}) - .expand_dims(scenario=[ncc_name]) - .stack(scen_mc=["scenario", mc_dim]) - ) - slr_out = xr.concat((slr_out, slr_ncc), dim="scen_mc").sel( - scen_mc=scen_mc_filter + if include_ncc: + slr_ncc = slr[lsl_ncc_var] + stack_dims = ["scenario"] + if scen_mc_ncc is not None: + slr_ncc = slr_ncc.sel({mc_dim: scen_mc_ncc}) + stack_dims.append(mc_dim) + + slr_ncc = slr_ncc.expand_dims(scenario=[ncc_name]) + if len(stack_dims) > 1: + slr_ncc = slr_ncc.stack(scen_mc=stack_dims) + else: + slr_ncc = slr_ncc.rename({stack_dims[0]: "scen_mc"}) + slr_out = xr.concat( + (slr_out, slr_ncc), dim="scen_mc", combine_attrs="no_conflicts" + ).sel(scen_mc=scen_mc_filter) + + if quantiles is not None and mc_dim != "quantile": + slr_out = slr_out.quantile(quantiles, dim=mc_dim) + slr_out = slr_out.rename(scen_mc="scenario").stack( + scen_mc=["scenario", "quantile"] ) + # convert to meters if "units" in slr_out.attrs: - ix_names = slr_out.indexes["scen_mc"].names - # hack to avoid pint destroying multi-indexed coords - slr_out = ( - slr_out.pint.quantify() - .pint.to("meters") - .pint.dequantify() - .set_index(scen_mc=ix_names) - ) + slr_out = slr_out.pint.quantify().pint.to("meters").pint.dequantify() # add on base year where slr is 0 slr_out = slr_out.reindex( @@ -350,14 +365,19 @@ def create_template_dataarray(dims, coords, chunks, dtype="float32", name=None): An empty dask-backed DataArray. """ lens = {k: len(v) for k, v in coords.items()} - return xr.DataArray( - da.empty( - [lens[k] for k in dims], chunks=[chunks[k] for k in dims], dtype=dtype - ), + out = xr.DataArray( + da.empty([lens[k] for k in dims], chunks=[-1] * len(dims), dtype=dtype), dims=dims, coords={k: v for k, v in coords.items() if k in dims}, name=name, ) + out.encoding["chunks"] = [chunks[k] for k in dims] + if np.issubdtype(np.dtype(dtype), np.integer): + fill_value = np.iinfo(dtype).max + else: + fill_value = "NaN" + out.encoding["fill_value"] = fill_value + return out def create_template_dataset(var_dims, coords, chunks, dtypes): @@ -403,7 +423,7 @@ def check_finished_zarr_workflow( varname=None, final_selector={}, mask=None, - storage_options={}, + storage_options=None, ): """Check if a workflow that writes to a particular region of a zarr store has already run. This is useful when running pyCIAM in "probabilistic" mode across a @@ -487,80 +507,6 @@ def check_finished_zarr_workflow( return finished -def save_to_zarr_region(ds_in, store, already_aligned=False, storage_options={}): - """Wrapper around :py:method:`xarray.Dataset.to_zarr` when specifying the `region` - kwarg. This function allows you to avoid boilerplate to figure out the integer slice - objects needed to pass as `region` when calling `:py:meth:xarray.Dataset.to_zarr`. - - Parameters - ---------- - ds_in : :py:class:`xarray.Dataset` or :py:class:`xarray.DataArray` - Dataset or DataArray to save to a specific region of a Zarr store - store : Path-like - Path to Zarr store - already_aligned : bool, default False - If True, assume that the coordinates of `ds_in` are already ordered the same - way as those of `store`. May save some computation, but will miss-attribute - values to coordinates if set to True when coords are not aligned. - storage_options : dict, optional - Passed to :py:function:`xarray.open_zarr` - - Returns - ------- - None : - No return value but `ds_in` is saved to the appropriate region of `store`. - - Raises - ------ - ValueError - If `ds_in` is an unnamed DataArray and `store` has more than one variable. - AssertionError - If any coordinate values of `ds_in` are not contiguous within `store`. - """ - ds_out = xr.open_zarr(str(store), chunks=None, storage_options=storage_options) - - # convert dataarray to dataset if needed - if isinstance(ds_in, xr.DataArray): - if ds_in.name is not None: - ds_in = ds_in.to_dataset() - else: - if len(ds_out.data_vars) != 1: - raise ValueError( - "``ds_in`` is an unnamed DataArray and ``store`` has more than one " - "variable." - ) - ds_in = ds_in.to_dataset(name=list(ds_out.data_vars)[0]) - - # align - for v in ds_in.data_vars: - ds_in[v] = ds_in[v].transpose(*ds_out[v].dims).astype(ds_out[v].dtype) - - # find appropriate regions - alignment_dims = {} - regions = {} - for r in ds_in.dims: - if len(ds_in[r]) == len(ds_out[r]): - alignment_dims[r] = ds_out[r].values - continue - alignment_dims[r] = [v for v in ds_out[r].values if v in ds_in[r].values] - valid_ixs = np.arange(len(ds_out[r]))[ds_out[r].isin(alignment_dims[r]).values] - n_valid = len(valid_ixs) - st = valid_ixs[0] - end = valid_ixs[-1] - assert end - st == n_valid - 1, ( - f"Indices are not continuous along dimension {r}" - ) - regions[r] = slice(st, end + 1) - - # align coords - if not already_aligned: - ds_in = ds_in.sel(alignment_dims) - - ds_in.drop_vars(ds_in.coords).to_zarr( - str(store), region=regions, storage_options=storage_options - ) - - def get_nearest_slrs(slr_ds, lonlats, x1="seg_lon", y1="seg_lat"): unique_lonlats = lonlats[[x1, y1]].drop_duplicates() slr_lonlat = slr_ds[["lon", "lat"]].to_dataframe() @@ -585,9 +531,11 @@ def load_ciam_inputs( input_store, slr_store, params, - seg_vals, + selectors={}, slr_names=None, seg_var="seg", + lsl_var="lsl_msl05", + slr_site_id_dim="site_id", surge_lookup_store=None, ssp=None, iam=None, @@ -596,7 +544,7 @@ def load_ciam_inputs( include_cc=True, mc_dim="mc_sample_id", quantiles=None, - storage_options={}, + storage_options=None, ): """Load, process, and format all inputs needed to run pyCIAM. @@ -611,9 +559,8 @@ def load_ciam_inputs( params : dict Dictionary of model parameters, typically loaded from a JSON file. See :file:`../params.json` for an example of the required parameters. - seg_vals : list of str - Defines the subset of regions (along dimension `seg_var`) that the function - will prep. Subsets are used to run CIAM in parallel. + selectors : list of str + Defines the subset of regions (along dimension `seg_var`) and/or scenario that the function will prep. Subsets are used to run CIAM in parallel. slr_names : list of str, optional If `slr_store` is a list of multiple SLR datasets, this must be a list of the same length providing names for each SLR dataset. This is used as a suffix for @@ -623,6 +570,10 @@ def load_ciam_inputs( seg_var : str, default "seg_var" The name of the dimension in `input_store` along which the function will subset using `seg_vals` + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_store`` containing local SLR values + slr_site_id_dim : str, default "site_id" + The name of the location dimension in ``slr_store``. surge_lookup_store : Path-like, optional If not None, will also load and process data from an ESL impacts lookup table (see `lookup.create_surge_lookup`). If included in a call to @@ -681,14 +632,14 @@ def load_ciam_inputs( If `ssp` or `iam` is specified and the corresponding variables are not present in the Zarr store located at `input_store`. """ - selectors = {"year": slice(params.model_start, None)} + selectors = {"year": slice(params.model_start, None), **selectors} if ssp is not None: selectors["ssp"] = ssp if iam is not None: selectors["iam"] = iam inputs = prep_sliiders( input_store, - seg_vals, + selectors[seg_var], # dropping the "refA_scenario_selectors" b/c this doesn't need to be added to # the input dataset object constants=params[params.map(type) != dict].to_dict(), # noqa: E721 @@ -707,7 +658,7 @@ def load_ciam_inputs( xr.open_zarr( str(surge_lookup_store), chunks=None, storage_options=storage_options ) - .sel({seg_var: seg_vals}) + .sel({seg_var: selectors["seg"]}) .load() ) if seg_var != "seg": @@ -733,20 +684,31 @@ def load_ciam_inputs( include_ncc=include_ncc, include_cc=include_cc, ncc_name=ncc_names[sx], + lsl_var=lsl_var, mc_dim=mc_dim, quantiles=quantiles, + site_id_dim=slr_site_id_dim, storage_options=storage_options, ) for sx, s in enumerate(slr_store) ], dim="scen_mc", ) + if scen_mc_filter is None: + slr = slr.unstack("scen_mc") + + slr = slr.sel({k: v for k, v in selectors.items() if k in slr.dims}) return inputs, slr, surge def load_diaz_inputs( - input_store, seg_vals, params, include_ncc=True, include_cc=True, storage_options={} + input_store, + seg_vals, + params, + include_ncc=True, + include_cc=True, + storage_options=None, ): """Load the original inputs used in Diaz 2016. diff --git a/pyCIAM/run.py b/pyCIAM/run.py index 861d438..4dd18c2 100644 --- a/pyCIAM/run.py +++ b/pyCIAM/run.py @@ -15,8 +15,9 @@ import pandas as pd import xarray as xr from cloudpathlib import AnyPath, CloudPath -from distributed import Client -from rhg_compute_tools.xarray import dataarray_from_delayed +from distributed import Client, as_completed +from numpy.dtypes import StringDType +from rhg_compute_tools.xarray import dataset_from_delayed from pyCIAM.constants import CASE_DICT, CASES, COSTTYPES, PLIST, RLIST, SOLVCASES from pyCIAM.io import ( @@ -24,7 +25,6 @@ create_template_dataarray, load_ciam_inputs, load_diaz_inputs, - save_to_zarr_region, ) from pyCIAM.surge import damage_funcs, lookup from pyCIAM.surge._calc import ( @@ -783,7 +783,8 @@ def caseify(ds, prefix): "protection": xr.zeros_like(abandonment), } ), - ) + ), + compat="no_conflicts", ).to_array("costtype") del abandonment, relocation_r, wetland_r_noadapt @@ -800,7 +801,8 @@ def caseify(ds, prefix): "protection": protection, } ), - ) + ), + compat="no_conflicts", ).to_array("costtype") del protection, wetland_p @@ -854,7 +856,7 @@ def select_optimal_case( seg_regions, eps=1, region_var="seg_adm", - storage_options={}, + storage_options=None, ): """Calculate the least-cost adaptation path for a given `region`, which is nested within a given coastal segment. All regions within a segment must take the same @@ -936,17 +938,23 @@ def execute_pyciam( tmp_output_path=AnyPath("pyciam_tmp_results.zarr"), remove_tmpfile=True, overwrite=False, + no_surge_check=False, mc_dim="quantile", + slr_site_id_dim="site_id", seg_var="seg_adm", seg_var_subset=None, adm_var="adm1", + lsl_var="lsl_msl05", + scen_mc_filter=None, quantiles=[0.5], + refA_quantile=0.5, extra_attrs={}, econ_input_seg_chunksize=100, surge_batchsize=700, surge_seg_chunksize=5, refA_seg_chunksize=500, pyciam_seg_chunksize=3, + other_chunksizes={}, diaz_inputs=False, diaz_config=False, dask_client_func=Client, @@ -1019,11 +1027,16 @@ def execute_pyciam( want to examine seg-adm level results. ovewrwrite : bool, default False If True, overwrite all intermediate output files + no_surge_check : bool, default False + If True, assume surge lookup tables are complete and do not load to check. mc_dim : str, default "quantile" - The dimension of the sea level rise datasets specified at `slr_input_paths` that - indexes different simulations within the same scenario. This could reflect Monte - Carlo simulations *or* different quantiles of SLR. Ignored if + The dimension of the sea level rise datasets specified at ``slr_input_paths`` + that indexes different simulations within the same scenario. This could reflect + Monte Carlo simulations *or* different quantiles of SLR. Ignored if `diaz_inputs=True`. + slr_site_id_dim : str, default "site_id" + The dimension of the SLR datasets specified at ``slr_input_paths`` that indexes + location of the estimate. seg_var : str, default "seg_adm" The coordinate of the socioeconomic input data specified at `econ_input_path` that indexes the intersection of coastal segments and administrative units. @@ -1035,6 +1048,12 @@ def execute_pyciam( The coordinate of the socioeconomic input data specified at `econ_input_path` that specifies the administrative unit associated with each admin/seg intersection (`seg_var`). Ignored if `seg_var=="seg"` + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_input_paths`` containing local SLR values + scen_mc_filter : :py:class:`pandas.MultiIndex`, optional + A Index of paired ``scenario`` (str) and ``mc_sample_id`` (int) values that + specify a subset of the individual SLR trajectories contained in the zarr stores + at ``slr_input_paths``. If None, run all scenario X MC sample combinations. quantiles : Optional[Iterable[float]], default [0.5] The quantiles of the sea level rise datasets specified at `slr_input_paths` that will be used to estimate costs within this function. If `mc_dim=="quantile"`, it @@ -1042,6 +1061,9 @@ def execute_pyciam( values of the "quantile" coordinate in each SLR dataset. If not, then quantiles over the simulations indexed by `mc_dim` will be calculated on-the-fly. If None, all simulations indexed by `mc_dim` will be used. + refA_quantile : float, default 0.5 + Quantile of SLR for "no-climate-change" scenario to use for defining refA (the + initial adaptation height). extra_attrs : dict[str, str], optional Additional attributes to write to the output dataset. econ_input_seg_chunksize : int, default 100 @@ -1079,12 +1101,12 @@ def execute_pyciam( some environment variables in order for the :py:mod:`cloudpathlib` objects to function correctly. For example, if your data exists on Google Cloud Storage and requires authentication, you would need to set the - `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the same path as - reflected in `storage_options["token"]`. Other cloud storage providers will have + ``GOOGLE_APPLICATION_CREDENTIALS`` environment variable to the same path as + reflected in ``storage_options["token"]``. Other cloud storage providers will have different authentication methods and have not yet been tested with this function. params_override : dict, default {} - Used to override params specified in `params_path` + Used to override params specified in ``params_path``. **model_kwargs Passed directly to :py:func:`pyCIAM.calc_costs` """ @@ -1160,6 +1182,7 @@ def execute_pyciam( econ_input_path_seg = econ_input_path else: if overwrite or not econ_input_path_seg.is_dir(): + print("Creating seg-level econ input dataset...") collapse_econ_inputs_to_seg( econ_input_path, econ_input_path_seg, @@ -1172,18 +1195,20 @@ def execute_pyciam( ######################################## # create surge lookup table if necessary ######################################## - surge_futs = {} + + surge_futs = [] + print("Creating ESL impacts lookup table if it does not exist...") for var, path in surge_input_paths.items(): if path is None: continue - if overwrite or not path.is_dir(): + if not no_surge_check: if var == seg_var: this_econ_input = econ_input_path elif var == "seg": this_econ_input = econ_input_path_seg else: raise ValueError(var) - surge_futs[var] = lookup.create_surge_lookup( + surge_futs += lookup.create_surge_lookup( this_econ_input, slr_input_paths, path, @@ -1199,13 +1224,18 @@ def execute_pyciam( slr_0_years=params.slr_0_year, client=client, client_kwargs={"batch_size": surge_batchsize}, - force_overwrite=True, + force_overwrite=overwrite, seg_chunksize=surge_seg_chunksize, mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + lsl_var=lsl_var, storage_options=storage_options, ) # block on this calculation - client.gather(surge_futs) + for f in as_completed(surge_futs, raise_errors=False): + if f.status != "finished": + f.result() + surge_futs.pop(f) ############################### # define temporary output store @@ -1228,96 +1258,119 @@ def execute_pyciam( econ_input_path, slr_input_paths, params, - [this_seg], + selectors={seg_var: [this_seg]}, slr_names=slr_names, seg_var=seg_var, + lsl_var=lsl_var, surge_lookup_store=None, mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + scen_mc_filter=scen_mc_filter, quantiles=quantiles, storage_options=storage_options, ) - slr = test_inputs[1].unstack("scen_mc") - test_inputs = test_inputs[0] + test_inputs, slr, _ = test_inputs + if scen_mc_filter is None: + coords = {"scenario": slr.scenario.values} + if quantiles is not None: + coords["quantiles"] = quantiles + else: + coords[mc_dim] = slr[mc_dim] + else: + if quantiles is not None: + raise ValueError( + "Cannot specify both ``scen_mc_filter`` and ``quantiles``." + ) + coords = {"scen_mc": slr["scen_mc"]} + + coords = OrderedDict( + { + "case": np.array(CASES, dtype=StringDType), + "costtype": np.array(COSTTYPES, dtype=StringDType), + seg_var: ciam_in[seg_var].values, + "year": np.arange(params.model_start, ciam_in.year.max().item() + 1), + **coords, + **{ + dim: ciam_in[dim].values + for dim in ["ssp", "iam"] + if dim in ciam_in.dims + }, + } + ) - if output_path is not None: - coords = OrderedDict( - { - "case": CASES, - "costtype": COSTTYPES, - seg_var: ciam_in[seg_var].values, - "scenario": slr.scenario.astype("unicode"), - "quantile": quantiles, - "year": np.arange(params.model_start, ciam_in.year.max().item() + 1), - **{ - dim: ciam_in[dim].values - for dim in ["ssp", "iam"] - if dim in ciam_in.dims - }, - } - ) + chunks = {seg_var: 1, "case": len(coords["case"]) - 1, **other_chunksizes} + chunks = {k: len(coords[k]) if k not in chunks else chunks[k] for k in coords} - chunks = {seg_var: 1, "case": len(coords["case"]) - 1} - chunks = {k: -1 if k not in chunks else chunks[k] for k in coords} + # create arrays + cost_dims = coords.keys() + costs = create_template_dataarray(cost_dims, coords, chunks) - # create arrays - cost_dims = coords.keys() + nonpvdims = ["year", "costtype"] + nooptcasedims = nonpvdims + ["case"] - out_ds = create_template_dataarray(cost_dims, coords, chunks).to_dataset( - name="costs" - ) - out_ds["npv"] = out_ds.costs.isel(year=0, costtype=0, drop=True).astype( - "float64" + def _create_smaller_array(nodims): + return create_template_dataarray( + [k for k in cost_dims if k not in nodims], + coords, + chunks, ) - out_ds["optimal_case"] = out_ds.npv.isel(case=0, drop=True).astype("uint8") - # add attrs - out_ds.attrs.update(attr_dict) - out_ds = add_attrs_to_result(out_ds) + npv = _create_smaller_array(nonpvdims) + optcase = _create_smaller_array(nooptcasedims) - if overwrite or not tmp_output_path.is_dir(): - out_ds.to_zarr( - str(tmp_output_path), - compute=False, - mode="w", - storage_options=storage_options, - encoding={ - "costs": {"fill_value": "NaN"}, - "optimal_case": {"fill_value": 255}, - }, - ) + out_ds = xr.Dataset({"costs": costs, "npv": npv, "optimal_case": optcase}) + + # add attrs + out_ds.attrs.update(attr_dict) + out_ds = add_attrs_to_result(out_ds, seg_var, mc_dim=mc_dim) + + if output_path is not None and (overwrite or not tmp_output_path.is_dir()): + print("Creating output skeleton...") + out_ds.to_zarr( + str(tmp_output_path), + compute=False, + mode="w", + storage_options=storage_options, + safe_chunks=False, + ) #################################################### # Create initial adaptaion heights dataset if needed #################################################### if overwrite or not refA_path.is_dir(): + print("Estimating baseline adaptation RPs...") segs = np.unique(ciam_in.seg) - seg_grps = [ - segs[i : i + refA_seg_chunksize] + seg_groups = [ + slice(segs[i], segs[min(i + refA_seg_chunksize - 1, len(segs) - 1)]) for i in range(0, len(segs), refA_seg_chunksize) ] - ( - dataarray_from_delayed( - client.map( - get_refA, - seg_grps, - econ_input_path=econ_input_path_seg, - slr_input_path=slr_input_paths[0], - params=params, - surge_input_path=surge_input_paths["seg"], - mc_dim=mc_dim, - storage_options=storage_options, - quantiles=quantiles, - diaz_inputs=diaz_inputs, - eps=eps, - **model_kwargs, - ), - dim="seg", - ) - .to_dataset(name="refA") - .chunk({"seg": -1}) - .to_zarr(str(refA_path), storage_options=storage_options, mode="w") + refA_futs = client.map( + get_refA, + seg_groups, + econ_input_path=econ_input_path_seg, + slr_input_path=slr_input_paths[0], + params=params, + surge_input_path=surge_input_paths["seg"], + mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + lsl_var=lsl_var, + storage_options=storage_options, + quantile=refA_quantile, + scen_mc_filter=scen_mc_filter, + diaz_inputs=diaz_inputs, + eps=eps, + **model_kwargs, ) + refA_ds = dataset_from_delayed(refA_futs, dim="seg") + for v in refA_ds.data_vars: + refA_ds[v].attrs = {} + refA_ds = add_attrs_to_result( + refA_ds.rename(case="optimal_case"), "seg", mc_dim=mc_dim + ).rename(optimal_case="case") + refA_ds.to_zarr(str(refA_path), mode="w") + else: + print("Baseline adaptation RPs already calculated...") ############################### # get groups for running pyCIAM @@ -1340,7 +1393,10 @@ def execute_pyciam( i += most_segadm else: this_group = this_group.isel( - {seg_var: this_group.seg != this_group.seg.isel(seg_adm=-1, drop=True)} + { + seg_var: this_group.seg + != this_group.seg.isel({seg_var: -1}, drop=True) + } ) i += len(this_group[seg_var]) @@ -1371,6 +1427,9 @@ def execute_pyciam( surge_input_path=surge_input_paths[seg_var], seg_var=seg_var, mc_dim=mc_dim, + slr_site_id_dim=slr_site_id_dim, + lsl_var=lsl_var, + scen_mc_filter=scen_mc_filter, quantiles=quantiles, storage_options=storage_options, diaz_inputs=diaz_inputs, @@ -1391,7 +1450,9 @@ def execute_pyciam( ) ), dim="seg", - ) + ), + seg_var, + mc_dim=mc_dim, ) out.attrs.update(attr_dict) return out @@ -1491,12 +1552,24 @@ def get_refA( params, surge_input_path=None, mc_dim="quantile", - storage_options={}, - quantiles=[0.5], + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", + storage_options=None, + quantile=0.5, eps=1, + scen_mc_filter=None, diaz_inputs=False, + output_path=None, **model_kwargs, ): + if check_finished_zarr_workflow( + finalstore=output_path, + varname="refA", + storage_options=storage_options, + final_selector={"seg": segs}, + ): + return None + if diaz_inputs: inputs, slr = load_diaz_inputs( econ_input_path, @@ -1512,16 +1585,18 @@ def get_refA( econ_input_path, slr_input_path, params, - segs, + selectors={"seg": segs}, surge_lookup_store=surge_input_path, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, include_cc=False, include_ncc=True, + scen_mc_filter=scen_mc_filter, storage_options=storage_options, - quantiles=quantiles, + quantiles=[quantile], **params.refA_scenario_selectors, ) - slr = slr.unstack("scen_mc") slr = slr.squeeze( [d for d in slr.dims if len(slr[d]) == 1 and d != "seg"], drop=True ) @@ -1552,13 +1627,17 @@ def get_refA( lowest = npv.argmin("case").astype("uint8") refA = refA.isel(case=lowest) + refA = refA.to_dataset(name="refA").reset_coords(drop=True) refA["case"] = lowest + if output_path is not None: + refA.to_zarr(str(output_path), storage_options=storage_options, region="auto") + return None return refA def calc_all_cases( - seg_adms, + selectors, params, econ_input_path, slr_input_paths, @@ -1568,8 +1647,11 @@ def calc_all_cases( surge_input_path=None, seg_var="seg_adm", mc_dim="quantile", + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", quantiles=[0.5], - storage_options={}, + scen_mc_filter=None, + storage_options=None, check=True, diaz_inputs=False, **model_kwargs, @@ -1577,12 +1659,12 @@ def calc_all_cases( if check_finished_zarr_workflow( finalstore=output_path if check else None, varname="costs", - final_selector={seg_var: seg_adms, "case": CASES[:-1]}, + final_selector={"case": CASES[:-1], **selectors}, storage_options=storage_options, ): return None - segs = ["_".join(seg_adm.split("_")[:2]) for seg_adm in seg_adms] + segs = ["_".join(seg_adm.split("_")[:2]) for seg_adm in selectors[seg_var]] if diaz_inputs: inputs, slr = load_diaz_inputs( @@ -1594,11 +1676,14 @@ def calc_all_cases( econ_input_path, slr_input_paths, params, - seg_adms, + selectors=selectors, slr_names=slr_names, seg_var=seg_var, surge_lookup_store=surge_input_path, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, + scen_mc_filter=scen_mc_filter, quantiles=quantiles, storage_options=storage_options, ) @@ -1613,7 +1698,7 @@ def calc_all_cases( .refA.sel(seg=segs) .drop_vars("case") ) - refA["seg"] = seg_adms + refA["seg"] = selectors[seg_var] if "movefactor" in refA.dims: refA = refA.sel(movefactor=params.movefactor, drop=True) @@ -1634,7 +1719,7 @@ def calc_all_cases( .sum("year") ) if output_path is not None: - save_to_zarr_region(out, output_path, storage_options=storage_options) + out.to_zarr(str(output_path), storage_options=storage_options, region="auto") return None return out @@ -1647,7 +1732,7 @@ def optimize_case( seg_var="seg_adm", check=True, eps=1, - storage_options={}, + storage_options=None, ): # use last fpath to check if this task has already been run if check and check_finished_zarr_workflow( @@ -1666,18 +1751,14 @@ def optimize_case( this_seg_adms = all_segs[seg_var].isel({seg_var: all_segs.seg == seg}).values - save_to_zarr_region( - select_optimal_case( - output_path, - seg_adm, - this_seg_adms, - eps=eps, - region_var=seg_var, - storage_options=storage_options, - ), + select_optimal_case( output_path, + seg_adm, + this_seg_adms, + eps=eps, + region_var=seg_var, storage_options=storage_options, - ) + ).to_zarr(str(output_path), storage_options=storage_options, region="auto") return None diff --git a/pyCIAM/surge/lookup.py b/pyCIAM/surge/lookup.py index 935e626..809785a 100644 --- a/pyCIAM/surge/lookup.py +++ b/pyCIAM/surge/lookup.py @@ -23,8 +23,12 @@ import numpy as np import pandas as pd import xarray as xr +from numpy.dtypes import StringDType -from pyCIAM.io import _load_lslr_for_ciam, save_to_zarr_region +from pyCIAM.io import ( + _load_lslr_for_ciam, + check_finished_zarr_workflow, +) from pyCIAM.surge._calc import ( _calc_storm_damages_no_resilience, _get_surge_heights_probs, @@ -61,7 +65,9 @@ def _get_lslr_rhdiff_range( include_ncc=True, slr_0_years=2005, mc_dim="mc_sample_id", - storage_options={}, + site_id_dim="site_id", + lsl_var="lsl_msl05", + storage_options=None, ): """Get range of lslr and rhdiff that we need to model to cover the full range. @@ -78,7 +84,7 @@ def _get_lslr_rhdiff_range( pc_in = _s2d( xr.open_zarr( str(sliiders_store), chunks=None, storage_options=storage_options - ).sel({seg_var: seg_vals}) + ).sel({seg_var: seg_vals, "year": slice(min(slr_0_years), None)}) ) if interp_years is None: @@ -100,9 +106,11 @@ def _get_lslr_rhdiff_range( include_cc=include_cc, include_ncc=include_ncc, mc_dim=mc_dim, + lsl_var=lsl_var, slr_0_year=slr_0_years[sx], storage_options=storage_options, - quantiles=quantiles, + site_id_dim=site_id_dim, + quantiles=[0, 1] if mc_dim != "quantile" else quantiles, ) # get the max LSLR experienced @@ -175,7 +183,7 @@ def _create_surge_lookup_skeleton_store( seg_var="seg", seg_var_subset=None, force_overwrite=True, - storage_options={}, + storage_options=None, ): pc_in = subset_econ_inputs( xr.open_zarr(str(sliiders_store), storage_options=storage_options), @@ -186,15 +194,17 @@ def _create_surge_lookup_skeleton_store( to_save = xr.DataArray( da.empty( (len(pc_in[seg_var]), n_interp_pts_lslr, n_interp_pts_rhdiff, 2, 2), - chunks=(seg_chunksize, -1, -1, -1, -1), + chunks=(seg_chunksize, -1, -1, -1, -1), dtype="float64" ), dims=[seg_var, "lslr", "rh_diff", "costtype", "adapttype"], coords={ seg_var: pc_in[seg_var].values, "lslr": np.arange(n_interp_pts_lslr), "rh_diff": np.arange(n_interp_pts_rhdiff), - "adapttype": ["retreat", "protect"], - "costtype": ["stormCapital", "stormPopulation"], + "adapttype": np.array(["retreat", "protect"], dtype=StringDType), + "costtype": np.array( + ["stormCapital", "stormPopulation"], dtype=StringDType + ), }, ).to_dataset(name="frac_losses") to_save["rh_diff_by_seg"] = ( @@ -228,6 +238,7 @@ def _create_surge_lookup_skeleton_store( def _save_storm_dam( seg_vals, seg_var="seg", + lsl_var="lsl_msl05", sliiders_store=None, slr_stores=None, surge_lookup_store=None, @@ -239,11 +250,21 @@ def _save_storm_dam( quantiles=None, scen_mc_filter=None, mc_dim="mc_sample_id", + slr_site_id_dim="site_id", start_year=None, slr_0_years=2005, - storage_options={}, + overwrite=False, + storage_options=None, ): """Map over each chunk to run through damage calcs.""" + + if not overwrite and check_finished_zarr_workflow( + surge_lookup_store, + varname="frac_losses", + final_selector={seg_var: seg_vals}, + storage_options=storage_options, + ): + return None diff_ranges = _get_lslr_rhdiff_range( sliiders_store, slr_stores, @@ -255,6 +276,8 @@ def _save_storm_dam( quantiles=quantiles, scen_mc_filter=scen_mc_filter, mc_dim=mc_dim, + lsl_var=lsl_var, + site_id_dim=slr_site_id_dim, slr_0_years=slr_0_years, storage_options=storage_options, ) @@ -268,18 +291,26 @@ def _save_storm_dam( # these must be unique otherwise interp function will raise error template["lslr_by_seg"] = ( (seg_var, "lslr"), - np.tile(np.arange(len(template.lslr))[np.newaxis, :], (len(seg_vals), 1)), + np.tile( + np.arange(len(template.lslr), dtype=template.lslr_by_seg.dtype)[ + np.newaxis, : + ], + (len(seg_vals), 1), + ), ) template["rh_diff_by_seg"] = ( (seg_var, "rh_diff"), np.tile( - np.arange(len(template.rh_diff))[np.newaxis, :], (len(seg_vals), 1) + np.arange(len(template.rh_diff), dtype=template.rh_diff_by_seg.dtype)[ + np.newaxis, : + ], + (len(seg_vals), 1), ), ) if surge_lookup_store is None: return template - save_to_zarr_region( - template, surge_lookup_store, storage_options=storage_options + template.to_zarr( + str(surge_lookup_store), storage_options=storage_options, region="auto" ) return None @@ -354,7 +385,7 @@ def _save_storm_dam( return res # identify which index to save to in template zarr - save_to_zarr_region(res, surge_lookup_store, storage_options=storage_options) + res.to_zarr(str(surge_lookup_store), storage_options=storage_options, region="auto") def create_surge_lookup( @@ -374,10 +405,12 @@ def create_surge_lookup( scen_mc_filter=None, quantiles=None, mc_dim="mc_sample_id", + slr_site_id_dim="site_id", + lsl_var="lsl_msl05", force_overwrite=False, client=None, client_kwargs={}, - storage_options={}, + storage_options=None, ): """Create storm surge lookup table. @@ -405,6 +438,10 @@ def create_surge_lookup( equivalent to segments. The reason you may wish to have nested regions is to be able to aggregate impacts to a different regions than those that are defined by the segments. + lsl_var : str, default "lsl_msl05" + The name of the variable in ``slr_stores`` containing local SLR values + slr_site_id_dim : str, default "site_id" + The name of the location dimension in ``slr_stores``. at_start : list of int A list specifying the starting years of each adpatation period. In pyCIAM, each segment chooses a new retreat or protection height at the start of each of these @@ -492,6 +529,8 @@ def create_surge_lookup( n_interp_pts_lslr=n_interp_pts_lslr, n_interp_pts_rhdiff=n_interp_pts_rhdiff, mc_dim=mc_dim, + lsl_var=lsl_var, + slr_site_id_dim=slr_site_id_dim, ddf_i=ddf_i, dmf_i=dmf_i, quantiles=quantiles, @@ -499,6 +538,7 @@ def create_surge_lookup( start_year=start_year, slr_0_years=slr_0_years, storage_options=storage_options, + overwrite=force_overwrite, **client_kwargs, ) ) diff --git a/pyCIAM/utils.py b/pyCIAM/utils.py index fd635b4..16b8aac 100644 --- a/pyCIAM/utils.py +++ b/pyCIAM/utils.py @@ -116,7 +116,7 @@ def spherical_nearest_neighbor(df1, df2, x1="lon", y1="lat", x2="lon", y2="lat") return pd.Series(df2.index[ixs[:, 0]], index=df1.index) -def add_attrs_to_result(ds): +def add_attrs_to_result(ds, seg_var, mc_dim=None): attr_dict = { "case": { "long_name": "Adaptation Strategy", @@ -180,11 +180,22 @@ def add_attrs_to_result(ds): "long_name": "Shared Socioeconomic Pathway", "description": "Socioeconomic growth model used", }, + "refA": { + "long_name": "Initial adaptation height", + "description": ( + "Initial retreat height assumed in model. Determined by choosing " + "optimal adaptation pathway under a 'no-climate-change' scenario and " + "selecting the initial height. Retreat is assumed regardless of " + "whether optimal path is retreat or protect." + ), + }, } + if mc_dim is not None: + attr_dict[mc_dim] = {"long_name": "Monte carlo sample index"} extra_vars = [ v for v in ds.variables - if v not in ["year", "seg_adm", "npv"] + list(attr_dict.keys()) + if v not in ["year", seg_var, "npv"] + list(attr_dict.keys()) ] assert not len(extra_vars), f"Unexpected variables: {extra_vars}" for v in ds.variables: @@ -193,13 +204,19 @@ def add_attrs_to_result(ds): return ds +def _get_exp_year(da): + exp_year = [v for v in da.data_vars if v.startswith("pop_") and "scale" not in v] + assert len(exp_year) == 1, exp_year + return int(exp_year[0].split("_")[1]) + + def collapse_econ_inputs_to_seg( econ_input_path, output_path, seg_var_subset=None, output_chunksize=100, seg_var="seg_adm", - storage_options={}, + storage_options=None, ): sliiders = subset_econ_inputs( xr.open_zarr( @@ -222,10 +239,15 @@ def collapse_econ_inputs_to_seg( sliiders.ypcc.sel(country="USA", drop=True).load().reset_coords(drop=True) ) + # allow for different base years in K and pop spatial variables + exp_year = _get_exp_year(sliiders) + pop_var = f"pop_{exp_year}" + k_var = f"K_{exp_year}" + out = ( - sliiders[["K_2019", "pop_2019", "landarea", "length", "wetland"]] + sliiders[[k_var, pop_var, "landarea", "length", "wetland"]] .groupby(grouper) - .sum("seg_adm") + .sum(seg_var) ) out[["surge_height", "gumbel_params", "seg_lon", "seg_lat"]] = ( @@ -246,16 +268,18 @@ def weighted_avg(varname, wts_in): for v, w in [ ( "mobcapfrac", - sliiders.K_2019.sum("elev"), + sliiders[k_var].sum("elev"), ), - ("pop_scale", sliiders.pop_2019.sum("elev")), - ("K_scale", sliiders.K_2019.sum("elev")), + ("pop_scale", sliiders[pop_var].sum("elev")), + ("K_scale", sliiders[k_var].sum("elev")), ("interior", sliiders.landarea.sum("elev")), ("pc", sliiders.length), - ("ypcc", sliiders.pop_2019.sum("elev")), + ("ypcc", sliiders[pop_var].sum("elev")), ("wetlandservice", sliiders.wetland.sum("elev")), + ("vsl", sliiders[pop_var].sum("elev")), ]: - weighted_avg(v, w) + if v in sliiders.data_vars: + weighted_avg(v, w) out["rho"] = out.ypcc / (out.ypcc + usa_ypcc_ref.sel(year=2000, drop=True)) diff --git a/pyproject.toml b/pyproject.toml index 7e08fc8..e7bcfa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,9 +20,10 @@ maintainers = [{ name = "Ian Bolliger", email = "ian@reask.earth"}] dependencies = [ "cloudpathlib", "gitpython", - "rhg_compute_tools", + "numpy >= 2", "parameterize_jobs", "pint-xarray", + "rhg_compute_tools", "scikit-learn", "xarray[accel,parallel]", "zarr"