Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

LOG_FORMAT = "%(asctime)s:%(levelname)s:%(name)s: %(message)s"

with open("primary_keys.json", "r") as file:
with open("primary_keys.json") as file:
"""validate pk database"""
logging.basicConfig(stream=sys.stderr, level=20, format=LOG_FORMAT)
log = logging.getLogger(__name__)
Expand All @@ -21,7 +21,7 @@
if column not in [c["column_name"].lower() for c in schema]:
raise ValueError(f"Column {column} not found in {table}")
log.info(
"Validation successful - columns listed in primary_keys.json are present in listed tables"
"Validation successful - columns listed in primary_keys.json are present in listed tables",
)
else:
invalid_keys = list(pk_db_tables - bcdata_tables)
Expand Down
6 changes: 3 additions & 3 deletions src/bcdata/bc2pg.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def bc2pg( # noqa: C901

# define requests
urls = WFS.define_requests(
dataset, query=query, bounds=bounds, bounds_crs=bounds_crs, count=count, sortby=sortby
dataset, query=query, bounds=bounds, bounds_crs=bounds_crs, count=count, sortby=sortby,
)

df = None # just for tracking if first download is done by geometry type check
Expand Down Expand Up @@ -112,7 +112,7 @@ def bc2pg( # noqa: C901
)
geometry_type = df_temp.geom_type.unique()[0] # keep only the first type
if numpy.any(
df_temp.has_z.unique()[0]
df_temp.has_z.unique()[0],
): # geopandas does not include Z in geom_type string
geometry_type = geometry_type + "Z"
# drop the last request dataframe to free up memory
Expand All @@ -132,7 +132,7 @@ def bc2pg( # noqa: C901
c["column_name"].upper() for c in table_definition["schema"]
]:
raise ValueError(
"Column {primary_key} specified as primary_key does not exist in source"
"Column {primary_key} specified as primary_key does not exist in source",
)

# build the table definition and create table
Expand Down
7 changes: 3 additions & 4 deletions src/bcdata/bcdc.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,14 @@ def get_table_name(package):
if len(layer_names) > 1:
raise ValueError(
"Package {} includes more than one WFS resource, specify one of the following: \n{}".format(
package, "\n".join(layer_names)
)
package, "\n".join(layer_names),
),
)
return layer_names[0]


def get_table_definition(table_name):
"""
Given a table/object name, search BCDC for the first package/resource with a matching "object_name",
"""Given a table/object name, search BCDC for the first package/resource with a matching "object_name",
returns dict: {"comments": <>, "notes": <>, "schema": {<schema dict>} }
"""
# only allow searching for tables present in WFS list
Expand Down
16 changes: 8 additions & 8 deletions src/bcdata/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def complete_dataset_names(ctx, param, incomplete):

def from_like_context(ctx, param, value):
"""Return the value for an option from the context if the option
or `--all` is given, else return None."""
or `--all` is given, else return None.
"""
if ctx.obj and ctx.obj.get("like") and (value == "like" or ctx.obj.get("all_like")):
return ctx.obj["like"][param.name]
else:
return None
return None


def bounds_handler(ctx, param, value):
Expand All @@ -47,7 +47,7 @@ def bounds_handler(ctx, param, value):
return retval
except Exception:
raise click.BadParameter(
"{0!r} is not a valid bounding box representation".format(value)
f"{value!r} is not a valid bounding box representation",
)
else: # pragma: no cover
return retval
Expand All @@ -67,7 +67,7 @@ def bounds_handler(ctx, param, value):


lowercase_opt = click.option(
"--lowercase", "-l", is_flag=True, help="Write column/properties names as lowercase"
"--lowercase", "-l", is_flag=True, help="Write column/properties names as lowercase",
)


Expand Down Expand Up @@ -161,7 +161,7 @@ def info(dataset, indent, meta_member, verbose, quiet):
@verbose_opt
@quiet_opt
def dump(
dataset, query, count, bounds, bounds_crs, sortby, lowercase, promote_to_multi, verbose, quiet
dataset, query, count, bounds, bounds_crs, sortby, lowercase, promote_to_multi, verbose, quiet,
):
"""Write DataBC features to stdout as GeoJSON feature collection.

Expand Down Expand Up @@ -364,7 +364,7 @@ def bc2pg(
raise ValueError("Options append and refresh are not compatible")
if refresh and (schema == "bcdata"):
raise ValueError("Refreshing tables in bcdata schema is not supported, use another schema")
elif refresh and schema:
if refresh and schema:
schema_target = schema
elif refresh and not schema:
schema_target, table = bcdata.validate_name(dataset).lower().split(".")
Expand Down Expand Up @@ -404,4 +404,4 @@ def bc2pg(

# do not notify of data load completion when no data load has occured
if not schema_only:
log.info("Load of {} to {} in {} complete".format(dataset, out_table, db_url))
log.info(f"Load of {dataset} to {out_table} in {db_url} complete")
14 changes: 7 additions & 7 deletions src/bcdata/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
log = logging.getLogger(__name__)


class Database(object):
class Database:
"""Wrapper around sqlalchemy"""

def __init__(self, url=os.environ.get("DATABASE_URL")):
Expand Down Expand Up @@ -103,13 +103,13 @@ def refresh(self, schema, table):
)
self.execute(dbq)
columns = list(
set(self.get_columns("bcdata", table)).intersection(self.get_columns(schema, table))
set(self.get_columns("bcdata", table)).intersection(self.get_columns(schema, table)),
)
identifiers = [sql.Identifier(c) for c in columns]
dbq = sql.SQL(
"""INSERT INTO {schema}.{table}
({columns})
SELECT {columns} FROM bcdata.{table}"""
SELECT {columns} FROM bcdata.{table}""",
).format(
schema=sql.Identifier(schema),
table=sql.Identifier(table),
Expand All @@ -130,7 +130,7 @@ def define_table(
table_comments=None,
primary_key=None,
):
"""build sqlalchemy table definition from bcdc provided json definitions"""
"""Build sqlalchemy table definition from bcdc provided json definitions"""
# remove columns of unsupported types, redundant columns
table_details = [c for c in table_details if c["data_type"] in self.supported_types.keys()]
table_details = [
Expand Down Expand Up @@ -159,15 +159,15 @@ def define_table(
column_type,
primary_key=True,
comment=column_comments,
)
),
)
else:
columns.append(
Column(
column_name,
column_type,
comment=column_comments,
)
),
)

# make everything multipart
Expand Down Expand Up @@ -212,7 +212,7 @@ def log(self, schema_name, table_name):
table_name text PRIMARY KEY,
latest_download timestamp WITH TIME ZONE
);
"""
""",
)
self.execute(
"""INSERT INTO bcdata.log (table_name, latest_download)
Expand Down
77 changes: 35 additions & 42 deletions src/bcdata/wfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ServiceException(Exception):
pass


class BCWFS(object):
class BCWFS:
"""Wrapper around web feature service"""

def __init__(self, refresh=False):
Expand All @@ -71,7 +71,7 @@ def __init__(self, refresh=False):
# if the file is named something else, prompt user to delete it
else:
raise RuntimeError(
f"Cache file exists, delete before using bcdata: {self.cache_path}"
f"Cache file exists, delete before using bcdata: {self.cache_path}",
)
# create cache folder if it does not exist
p.mkdir(parents=True, exist_ok=True)
Expand All @@ -80,10 +80,10 @@ def __init__(self, refresh=False):
self.capabilities = self.get_capabilities()
# get pagesize from xml using the xpath from https://github.com/bcgov/bcdata/
countdefault = ET.fromstring(self.capabilities).findall(
".//{http://www.opengis.net/ows/1.1}Constraint[@name='CountDefault']"
".//{http://www.opengis.net/ows/1.1}Constraint[@name='CountDefault']",
)[0]
self.pagesize = int(
countdefault.find("ows:DefaultValue", {"ows": "http://www.opengis.net/ows/1.1"}).text
countdefault.find("ows:DefaultValue", {"ows": "http://www.opengis.net/ows/1.1"}).text,
)

self.request_headers = {"User-Agent": "bcdata.py ({bcdata.__version__})"}
Expand All @@ -93,16 +93,14 @@ def check_cached_file(self, cache_file):
cache_file = os.path.join(self.cache_path, cache_file)
if not os.path.exists(os.path.join(cache_file)):
return True
else:
mod_date = datetime.fromtimestamp(os.path.getmtime(cache_file))
# if file older than specified days or empty, return true
if (
mod_date < (datetime.now() - timedelta(days=self.cache_refresh_days))
or os.stat(cache_file).st_size == 0
):
return True
else:
return False
mod_date = datetime.fromtimestamp(os.path.getmtime(cache_file))
# if file older than specified days or empty, return true
if (
mod_date < (datetime.now() - timedelta(days=self.cache_refresh_days))
or os.stat(cache_file).st_size == 0
):
return True
return False

@stamina.retry(on=requests.HTTPError, timeout=60)
def _request_schema(self, table):
Expand Down Expand Up @@ -146,7 +144,7 @@ def _request_count(self, table, query=None, bounds=None, bounds_crs=None, geom_c
log.error(f"Response headers: {r.headers}")
log.error(f"Response text: {r.text}")
raise ServiceException(r.text) # presumed request error
elif r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
if r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
log.warning(f"HTTP error: {r.status_code}, retrying")
log.warning(f"Response headers: {r.headers}")
log.warning(f"Response text: {r.text}")
Expand All @@ -166,7 +164,7 @@ def _request_features(self, url, silent=False):
log.error(f"Response headers: {r.headers}")
log.error(f"Response text: {r.text}")
raise ServiceException(r.text) # presumed request error
elif r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
if r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
log.warning(f"HTTP error: {r.status_code}")
log.warning(f"Response headers: {r.headers}")
log.warning(f"Response text: {r.text}")
Expand All @@ -186,7 +184,7 @@ def _request_featurecollection(self, url, silent=False):
log.error(f"Response headers: {r.headers}")
log.error(f"Response text: {r.text}")
raise ServiceException(r.text) # presumed request error
elif r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
if r.status_code in [500, 502, 503, 504]: # presumed serivce error, retry
log.warning(f"HTTP error: {r.status_code}")
log.warning(f"Response headers: {r.headers}")
log.warning(f"Response text: {r.text}")
Expand Down Expand Up @@ -214,8 +212,7 @@ def build_bounds_filter(self, query, bounds, bounds_crs, geom_column):
return cql_filter

def get_capabilities(self):
"""
Request server capabilities (layer definitions).
"""Request server capabilities (layer definitions).
Cache response as file daily, caching to one of:
- $BCDATA_CACHE environment variable
- default (~/.bcdata)
Expand All @@ -225,7 +222,7 @@ def get_capabilities(self):
with open(os.path.join(self.cache_path, "capabilities.xml"), "w") as f:
f.write(self._request_capabilities())
# load cached xml from file
with open(os.path.join(self.cache_path, "capabilities.xml"), "r") as f:
with open(os.path.join(self.cache_path, "capabilities.xml")) as f:
return f.read()

def get_count(self, dataset, query=None, bounds=None, bounds_crs="EPSG:3005", geom_column=None):
Expand All @@ -248,7 +245,7 @@ def get_schema(self, table):
schema = self._request_schema(table)
f.write(json.dumps(schema, indent=4))
# load cached schema
with open(os.path.join(self.cache_path, table), "r") as f:
with open(os.path.join(self.cache_path, table)) as f:
return json.loads(f.read())

def get_sortkey(self, table):
Expand All @@ -258,34 +255,32 @@ def get_sortkey(self, table):
if table.lower() in bcdata.primary_keys:
return bcdata.primary_keys[table.lower()].upper()
# if pk not known, use OBJECTID as default sort key when present
elif "OBJECTID" in columns:
if "OBJECTID" in columns:
return "OBJECTID"
# if OBJECTID is not present (several GSR tables), use SEQUENCE_ID
elif "SEQUENCE_ID" in columns:
if "SEQUENCE_ID" in columns:
return "SEQUENCE_ID"
# otherwise, presume first column is best value to sort by
# (in some cases this will be incorrect)
else:
log.warning(
f"Reliable sort key for {table} cannot be determined, defaulting to first column {columns[0]}"
)
return columns[0]
log.warning(
f"Reliable sort key for {table} cannot be determined, defaulting to first column {columns[0]}",
)
return columns[0]

def list_tables(self):
"""read and parse capabilities xml, which lists all tables available"""
"""Read and parse capabilities xml, which lists all tables available"""
return [
i.strip("pub:")
for i in list(
WebFeatureService(self.ows_url, version="2.0.0", xml=self.capabilities).contents
WebFeatureService(self.ows_url, version="2.0.0", xml=self.capabilities).contents,
)
]

def validate_name(self, dataset):
"""Check wfs/cache and the bcdc api to see if dataset name is valid"""
if dataset.upper() in self.list_tables():
return dataset.upper()
else:
return bcdata.get_table_name(dataset.upper())
return bcdata.get_table_name(dataset.upper())

def define_requests(
self,
Expand All @@ -304,6 +299,7 @@ def define_requests(
- http://www.opengeospatial.org/standards/wfs
- http://docs.geoserver.org/stable/en/user/services/wfs/vendor.html
- http://docs.geoserver.org/latest/en/user/tutorials/cql/cql_tutorial.html

"""
# validate the table name
table = self.validate_name(dataset)
Expand All @@ -315,9 +311,9 @@ def define_requests(
# find out how many records are in the table
if not count and check_count is False:
raise ValueError(
"{count: Null, check_count=False} is invalid, either provide record count or let bcdata request it"
"{count: Null, check_count=False} is invalid, either provide record count or let bcdata request it",
)
elif (
if (
not count and check_count is True
): # if not provided a count, get one if not told otherwise
count = self.get_count(
Expand All @@ -337,8 +333,7 @@ def define_requests(
bounds_crs=bounds_crs,
geom_column=geom_column,
)
if count > n:
count = n
count = min(count, n)

log.info(f"Total features requested: {count}")

Expand Down Expand Up @@ -406,8 +401,7 @@ def request_features(

if as_gdf:
return gdf
else:
return json.loads(gdf.to_json())
return json.loads(gdf.to_json())


def get_data(
Expand Down Expand Up @@ -436,8 +430,8 @@ def get_data(
for url in urls:
results.append(
WFS.request_features(
url, as_gdf=True, lowercase=lowercase, promote_to_multi=promote_to_multi
)
url, as_gdf=True, lowercase=lowercase, promote_to_multi=promote_to_multi,
),
)
if len(results) > 1:
gdf = pd.concat(results)
Expand All @@ -447,8 +441,7 @@ def get_data(
gdf = gpd.GeoDataFrame()
if as_gdf:
return gdf
else:
return json.loads(gdf.to_json())
return json.loads(gdf.to_json())


def get_count(dataset, query=None, bounds=None, bounds_crs="EPSG:3005"):
Expand Down
Loading