diff --git a/README.md b/README.md index 2a86c5e..58d3a88 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ pip install -r requirements.txt ## CLI Usage -Available modes, depending on publisher input: `OBP` (Open Book Publishers), `punctum` (punctum books), `AM` (African Minds), `UWP` (University of Westminster Press), `WHP` (The White Horse Press), `EDITUS` (Editus), `EDUEPB` (EDUEPB), `EDUFBA` (EDUFBA), `Rosario` (Editorial Universidad del Rosario), `Leuven` (Leuven University Press), `LHarmattan` (L'Harmattan), `MayFly` (MayFly Books), `PlayStory` (Play Story Press) +Available modes, depending on publisher input: `OBP` (Open Book Publishers), `punctum` (punctum books), `AM` (African Minds), `UWP` (University of Westminster Press), `WHP` (The White Horse Press), `EDITUS` (Editus), `EDUEPB` (EDUEPB), `EDUFBA` (EDUFBA), `Rosario` (Editorial Universidad del Rosario), `Leuven` (Leuven University Press), `LHarmattan` (L'Harmattan), `MayFly` (MayFly Books), `PlayStory` (Play Story Press), `CSV` (Thoth CSV template) ### Live Thoth API ``` diff --git a/bookloader.py b/bookloader.py index 4623735..76186b6 100644 --- a/bookloader.py +++ b/bookloader.py @@ -58,7 +58,8 @@ class BookLoader: "edited_volume": "EDITED_BOOK", "Journal Issue": "JOURNAL_ISSUE", "Journal": "JOURNAL_ISSUE", - "textbook": "TEXTBOOK" + "textbook": "TEXTBOOK", + "Textbook": "TEXTBOOK", } work_statuses = { "Active": "ACTIVE", @@ -103,9 +104,11 @@ class BookLoader: "Foreword": "FOREWORD_BY", "A24": "INTRODUCTION_BY", "Introduction": "INTRODUCTION_BY", + "Introduction By": "INTRODUCTION_BY", "writer of introduction": "INTRODUCTION_BY", "A15": "PREFACE_BY", "Preface": "PREFACE_BY", + "Preface By": "PREFACE_BY", "Music editor": "MUSIC_EDITOR", "Research By": "RESEARCH_BY", "Contributions By": "CONTRIBUTIONS_BY", diff --git a/csvloader.py b/csvloader.py new file mode 100644 index 0000000..25048f6 --- /dev/null +++ b/csvloader.py @@ -0,0 +1,513 @@ +#!/usr/bin/env python +"""Load book metadata from official Thoth CSV template into Thoth""" + +import logging +import sys +import re +import pandas as pd +from bookloader import BookLoader +from thothlibrary import ThothError + + +class CSVLoader(BookLoader): + """Thoth CSV template specific logic to ingest metadata from CSV into Thoth""" + single_imprint = True + cache_contributors = True + cache_institutions = True + cache_issues = True + cache_series = True + publisher_name = "Edizioni Ca’ Foscari – Venice University Press" + publisher_shortname = "ECF" + publisher_url = "https://edizionicafoscari.unive.it/" + + def run(self): + """Process CSV and call Thoth to insert its data""" + + # find number of contributor columns. The template has columns for 5 contributors + # by default, but publishers may add additional sets of columns for additional contributors. + contributions_index = self.get_highest_contributor_index(self.data.columns) + + for index, row in self.data.iterrows(): + logging.info("\n\n\n\n**********") + # Data start in row 2, so start counting in logging from there + logging.info(f"processing book from row {index + 2}: {row['title']}") + work = self.get_work(row) + # try to find the work in Thoth + try: + work_id = self.thoth.work_by_doi(work['doi']).workId + existing_work = self.thoth.work_by_id(work_id) + # if work is found, try to update it with the new data + if existing_work: + try: + existing_work.update((k, v) for k, v in work.items() if v is not None) + self.thoth.update_work(existing_work) + logging.info(f"workId for updated work: {work_id}") + # if update fails, log the error and exit the import + except ThothError as t: + logging.error(f"Failed to update work with id {work_id}, exception: {t}") + sys.exit(1) + # if work isn't found, create it + except (IndexError, AttributeError, ThothError): + work_id = self.thoth.create_work(work) + logging.info(f"created work with workId: {work_id}") + work = self.thoth.work_by_id(work_id) + self.create_contributors(row, work, contributions_index) + self.create_languages(row, work) + self.create_subjects(row, work) + self.create_publications(row, work) + self.create_series(row, work) + + def get_work(self, row): + """Returns a dictionary with all attributes of a 'work' + + row: current row number + """ + + work_type = row["work_type"] + work_status = row["work_status"] + + # Exit with error if any of the required fields for Work are not present + if pd.isna(work_type) or pd.isna(work_status) or pd.isna(row["title"]): + logging.error("Work missing a required field: title, workType, or workStatus") + sys.exit(1) + + subtitle = None + if pd.notna(row["subtitle"]): + subtitle = row["subtitle"] + title = self.sanitise_title(row["title"], subtitle) + + doi = None + if pd.notna(row["doi"]): + doi = f"https://doi.org/{row["doi"]}" + + work = { + "workType": self.work_types[work_type], + "workStatus": self.work_statuses[work_status], + "fullTitle": title["fullTitle"], + "title": title["title"], + "subtitle": title["subtitle"], + "reference": None, + "edition": row["edition"], + "imprintId": self.imprint_id, + "doi": doi, + "publicationDate": row["publication_date"], + "withdrawnDate": row["withdrawn_date"], + "place": row["place_of_publication"], + "width": None, + "height": None, + "pageCount": int(row["page_count"]) if pd.notna(row["page_count"]) else None, + "pageBreakdown": row["page_breakdown"], + "imageCount": row["image_count"], + "tableCount": row["table_count"], + "audioCount": row["audio_count"], + "videoCount": row["video_count"], + "license": row["license"], + "copyrightHolder": row["copyright_holder"], + "landingPage": row["landing_page"], + "lccn": None, + "oclc": None, + "shortAbstract": row["short_abstract"], + "longAbstract": row["long_abstract"], + "generalNote": None, + "toc": None, + "coverUrl": row["cover_url"], + "coverCaption": None, + "firstPage": None, + "lastPage": None, + "pageInterval": None, + } + # Convert NaN to None for all fields + work = self.convert_nan_to_none(work) + return work + + def create_contributors(self, row, work, contributions_index): + """Creates/updates all contributors associated with the current work and their contributions + + row: current CSV row + + work: Work from Thoth + + contributions_index: Number of sets of contribution columns in the CSV, determined by publisher + """ + + highest_contribution_ordinal = max((c.contributionOrdinal for c in work.contributions), default=0) + + for index in range(1, contributions_index + 1): + given_name = row[f"contribution_{index}_given_name"] + family_name = row[f"contribution_{index}_family_name"] + + # family_name is required to create Contributor. If it is NaN, continue + if pd.isna(family_name): + continue + given_name = given_name.strip() + family_name = family_name.strip() + full_name = f"{given_name} {family_name}" + orcid = None + orcid_value = row[f"contribution_{index}_orcid"] + # only assign value to orcid variable if a value is present in the row + if pd.notna(orcid_value): + orcid = f"https://orcid.org/{orcid_value}" + website = row[f"contribution_{index}_website"] + contributor = { + "firstName": given_name, + "lastName": family_name, + "fullName": full_name, + "orcid": orcid, + "website": website + } + # Convert NaN to None for all fields + contributor = self.convert_nan_to_none(contributor) + + if orcid and orcid in self.all_contributors: + contributor_id = self.all_contributors[orcid] + logging.info(f"existing contributor with ORCID: {full_name}, {orcid}") + elif full_name in self.all_contributors: + contributor_id = self.all_contributors[full_name] + logging.info(f"existing contributor (name): {full_name}") + else: + # Create new contributor + contributor_id = self.thoth.create_contributor(contributor) + logging.info(f"created contributor: {full_name}, {contributor_id}") + + # Cache by both name and ORCID if available + self.all_contributors[full_name] = contributor_id + if orcid: + self.all_contributors[orcid] = contributor_id + + existing_contribution = next( + (c for c in work.contributions if c.contributor.contributorId == contributor_id), + None) + if not existing_contribution: + if pd.notna(row[f"contribution_{index}_role"]): + contribution_type = self.contribution_types[row[f"contribution_{index}_role"]] + else: + logging.error("no contributionType for contribution, cannot create") + continue + contribution = { + "workId": work.workId, + "contributorId": contributor_id, + "contributionType": contribution_type, + "mainContribution": "true", + "contributionOrdinal": highest_contribution_ordinal + 1, + "biography": row[f"contribution_{index}_biography"], + "firstName": given_name, + "lastName": family_name, + "fullName": full_name + } + # Convert NaN to None for all fields + contribution = self.convert_nan_to_none(contribution) + contribution_id = self.thoth.create_contribution(contribution) + logging.info(f"created contribution for {full_name}, type: {contribution_type}") + highest_contribution_ordinal += 1 + else: + logging.info(f"existing contribution for {full_name}, type: {existing_contribution.contributionType}, skipping") + contribution_id = existing_contribution.contributionId + + # find if institution name is present, if not, no institution can be found or created + institution_name = row[f"contribution_{index}_affiliation_institution_name"] + if pd.isna(institution_name): + logging.info("no institution name, skipping creating Institution and Affiliation") + continue + + # retrieve institution or create if it doesn't exist + if institution_name in self.all_institutions: + institution_id = self.all_institutions[institution_name] + logging.info(f"existing institution {institution_name} found in cached institutions") + else: + ror = None + if pd.notna(row[f"contribution_{index}_affiliation_institution_ror"]): + ror = f"https://ror.org/{row[f"contribution_{index}_affiliation_institution_ror"].strip()}" + institution = { + "institutionName": institution_name, + "institutionDoi": None, + "ror": ror, + "countryCode": None, + } + institution_id = self.thoth.create_institution(institution) + # cache new institution + self.all_institutions[institution_name] = institution_id + logging.info(f"created and cached new institution {institution_name}") + + existing_affiliations = next( + (c.affiliations for c in work.contributions if c.contributionId == contribution_id), []) + if any(a.institution.institutionId == institution_id for a in existing_affiliations): + logging.info(f"contribution for {full_name} already has affiliation, skipping") + continue + else: + # create affiliation + position = None + if pd.notna(row[f"contribution_{index}_affiliation_position"]): + position = row[f"contribution_{index}_affiliation_position"] + # each contributor only has 1 affiliation in CSV, so affiliationOrdinal is + # hardcoded as 1 + affiliation = { + "contributionId": contribution_id, + "institutionId": institution_id, + "position": position, + "affiliationOrdinal": 1 + } + self.thoth.create_affiliation(affiliation) + + def get_highest_contributor_index(self, columns): + max_index = 0 + pattern = re.compile(r"contribution_(\d+)_given_name") + for column in columns: + match = pattern.match(column) + if match: + index = int(match.group(1)) + if index > max_index: + max_index = index + return max_index + + def create_languages(self, row, work): + """Creates languages associated with the current work + + row: current CSV record + + work: Work from Thoth + """ + original_language_codes = row["original_language"] + translated_from_language_codes = row["translated_from_language"] + translated_into_language_codes = row["translated_into_language"] + + all_languages = [ + [original_language_codes, "ORIGINAL"], [translated_from_language_codes, "TRANSLATED_FROM"], [translated_into_language_codes, "TRANSLATED_INTO"] + ] + + for languages, language_relation in all_languages: + if pd.notna(languages): + # language codes are separated by ; + languages_array = languages.split(";") + for language_code in languages_array: + if any(work_language.languageCode == language_code for work_language in work.languages): + logging.info(f"existing language {language_code}") + continue + language = { + "workId": work.workId, + "languageCode": language_code, + "languageRelation": language_relation, + "mainLanguage": "true" + } + self.thoth.create_language(language) + logging.info(f"created language {language_code}") + else: + logging.info(f"no languages for {language_relation}") + + def create_subjects(self, row, work): + """Creates all subjects associated with the current work + + row: current row in CSV + + work: Work from Thoth + """ + + thema_subjects = ([s.strip() for s in row["thema_subjects"].split(";")] + if pd.notna(row["thema_subjects"]) else None) + bic_subjects = ([s.strip() for s in row["bic_subjects"].split(";")] + if pd.notna(row["bic_subjects"]) else None) + bisac_subjects = ([s.strip() for s in row["bisac_subjects"].split(";")] + if pd.notna(row["bisac_subjects"]) else None) + keyword_subjects = ([s.strip() for s in row["keywords"].split(";")] + if pd.notna(row["keywords"]) else None) + + def create_subject(subject_type, subject_code, subject_ordinal): + subject = { + "workId": work.workId, + "subjectType": subject_type, + "subjectCode": subject_code, + "subjectOrdinal": subject_ordinal + } + self.thoth.create_subject(subject) + + def prepare_subject(subjects_array, subject_type): + if subjects_array: + for subject_ordinal, subject_code in enumerate(subjects_array, start=1): + # check if the work already has a subject with an existing subject type/subject code combination + if not any( + subject.subjectCode == subject_code and subject.subjectType == subject_type + for subject in work.subjects + ): + create_subject(subject_type, subject_code, subject_ordinal) + logging.info(f"New {subject_type} {subject_code} added as Subject") + else: + logging.info(f"Existing {subject_type} {subject_code} already associated with Work") + else: + logging.info(f"no subjects for {subject_type}") + + prepare_subject(thema_subjects, "THEMA") + prepare_subject(bic_subjects, "BIC") + prepare_subject(bisac_subjects, "BISAC") + prepare_subject(keyword_subjects, "KEYWORD") + + def create_publications(self, row, work): + """Creates print and digital publications associated with the current work + + row: current CSV record + + work: Work from Thoth + """ + + publication_types = ["paperback", "hardback", "pdf", "epub"] + + for publication_type in publication_types: + isbn = None + if pd.notna(row[f"publication_{publication_type}_isbn"]): + isbn = self.sanitise_isbn(str(row[f"publication_{publication_type}_isbn"]).strip()) + + publication = { + "workId": work.workId, + "publicationType": publication_type.upper(), + "isbn": isbn, + "widthMm": None, + "widthIn": None, + "heightMm": None, + "heightIn": None, + "depthMm": None, + "depthIn": None, + "weightG": None, + "weightOz": None, + } + + # Check if publication already exists + existing_pub = next((p for p in work.publications if p.publicationType == publication_type.upper()), None) + if existing_pub: + publication_id = existing_pub.publicationId + logging.info(f"existing {publication_type} publication: {publication_id}") + continue + + # Create new publication if it doesn't already exist + publication_id = self.thoth.create_publication(publication) + logging.info(f"created {publication_type} publication: {publication_id}") + + # Handle separate logic for print and digital + if publication_type in ["paperback", "hardback"]: + self.create_price(row, publication_type, publication_id) + elif publication_type in ["pdf", "epub"]: + self.create_location(row, publication_type, publication_id) + + def create_price(self, row, publication_type, publication_id): + """Create price for paperback and hardback publications""" + for index in range(1, 3): + unit_price = None + if pd.notna(row[f"publication_{publication_type}_price_{index}_unit_price"]): + unit_price = row[f"publication_{publication_type}_price_{index}_unit_price"] + + currency = None + if pd.notna(row[f"publication_{publication_type}_price_{index}_currency_code"]): + currency = row[f"publication_{publication_type}_price_{index}_currency_code"] + + if unit_price and currency: + price = { + "publicationId": publication_id, + "currencyCode": currency, + "unitPrice": unit_price + } + self.thoth.create_price(price) + logging.info(f"created price in {currency} for publication {publication_id}") + else: + logging.info(f"missing unit price and/or currency for {publication_type} price {index}, skipping") + + def create_location(self, row, publication_type, publication_id): + """Create location for PDF and EPUB publications""" + + landing_page = row[f"publication_{publication_type}_location_landing_page"] + full_text = row[f"publication_{publication_type}_location_full_text_url"] + location_platform = row[f"publication_{publication_type}_location_platform"] + + if not pd.notna(location_platform): + logging.info(f"no location platform for {publication_type} publication, skip creating location") + return + + # Transform e.g. "Publisher Website" to "PUBLISHER_WEBSITE" + location_platform = location_platform.upper().replace(" ", "_") + + location_platforms = ["PROJECT_MUSE", "OAPEN", "DOAB", "JSTOR", "EBSCO_HOST", + "OCLC_KB", "PROQUEST_KB", "PROQUEST_EXLIBRIS", "EBSCO_KB", + "JISC_KB", "GOOGLE_BOOKS", "INTERNET_ARCHIVE", "SCIENCE_OPEN", + "ZENODO", "PUBLISHER_WEBSITE", "THOTH", "OTHER"] + + if location_platform not in location_platforms: + logging.error("Location Platform is not supported by Thoth") + sys.exit(1) + + location = { + "publicationId": publication_id, + "landingPage": landing_page, + "fullTextUrl": full_text, + "locationPlatform": location_platform, + "canonical": "true", + } + + # Convert NaN to None for all fields + location = self.convert_nan_to_none(location) + self.thoth.create_location(location) + logging.info(f"created location for {location_platform} with publicationId {publication_id}") + + def create_series(self, row, work): + """Creates series associated with the current work + + row: current CSV row + + work: current work + """ + + series_name = row["series_name"] + if pd.isna(series_name): + logging.info(f"{work.fullTitle} missing series name; skipping create_series") + return + if series_name not in self.all_series: + issn = None + if pd.notna(row["series_issn"]): + try: + issn = self.sanitise_issn(row["series_issn"]) + except ValueError as e: + logging.error(f"{e} ({work.workId})") + series = { + "seriesType": "BOOK_SERIES", + "seriesName": series_name, + "issnDigital": issn, + "issnPrint": issn, + "seriesUrl": None, + "seriesDescription": None, + "seriesCfpUrl": None, + "imprintId": self.imprint_id + } + series_id = self.thoth.create_series(series) + logging.info(f"new series created: {series['seriesName']}") + self.all_series[series_name] = series_id + else: + logging.info(f"existing series {series_name}") + series_id = self.all_series[series_name] + + # find all existing issues in Series + current_series = self.thoth.series(series_id) + issue_work_ids = [] + + for issue in current_series.issues: + issue_work_ids.append(issue.work.workId) + + # find out if current work already has an issue. If not, create a new one. + if work.workId not in issue_work_ids: + # count them + number_of_issues = len(current_series.issues) + + # if series_issue_number is present in CSV, use it + if pd.notna(row["series_issue_number"]): + issue_ordinal = int(row["series_issue_number"]) + # otherwise assign next highest issueOrdinal + else: + issue_ordinal = number_of_issues + 1 + + issue = { + "seriesId": series_id, + "workId": work.workId, + "issueOrdinal": issue_ordinal, + } + self.thoth.create_issue(issue) + logging.info(f"Created new issue of {current_series} for work") + else: + logging.info(f"Series Issue already exists for work; skipping creating issue of {current_series}") + + def convert_nan_to_none(self, data_dict): + """Convert NaN values to None for all fields in a dictionary""" + return {k: (None if pd.isna(v) else v) for k, v in data_dict.items()} diff --git a/loader.py b/loader.py index 9dbf20b..abcab11 100755 --- a/loader.py +++ b/loader.py @@ -29,6 +29,7 @@ from leuvenloader import LeuvenLoader from lharmattanloader import LHarmattanLoader from ubiquityapiloader import UbiquityAPILoader +from csvloader import CSVLoader from mayflyloader import MayflyLoader from playstoryloader import PlayStoryLoader @@ -56,6 +57,7 @@ "UOL": UOLLoader, "Leuven": LeuvenLoader, "LHarmattan": LHarmattanLoader, + "CSV": CSVLoader, "MayFly": MayflyLoader, "PlayStory": PlayStoryLoader, }