From ecad4762f13a9c8fe056ebe49aec7679ea7e1467 Mon Sep 17 00:00:00 2001 From: Catting <5874051+mm12@users.noreply.github.com> Date: Sat, 15 Nov 2025 17:33:51 -0600 Subject: [PATCH 1/4] feat: Add a macro that allows users to convert paths into field values closes #1203 --- src/tagstudio/core/library/alchemy/library.py | 67 +++ src/tagstudio/qt/mixed/paths_to_fields.py | 458 ++++++++++++++++++ src/tagstudio/qt/ts_qt.py | 12 + src/tagstudio/qt/views/main_window.py | 5 + src/tagstudio/resources/translations/en.json | 16 + tests/macros/test_paths_to_fields.py | 86 ++++ 6 files changed, 644 insertions(+) create mode 100644 src/tagstudio/qt/mixed/paths_to_fields.py create mode 100644 tests/macros/test_paths_to_fields.py diff --git a/src/tagstudio/core/library/alchemy/library.py b/src/tagstudio/core/library/alchemy/library.py index a25231e95..d3beba6c8 100644 --- a/src/tagstudio/core/library/alchemy/library.py +++ b/src/tagstudio/core/library/alchemy/library.py @@ -1262,6 +1262,73 @@ def get_value_type(self, field_key: str) -> ValueType: session.expunge(field) return field + def add_value_type( + self, + key: str, + *, + name: str | None = None, + field_type: FieldTypeEnum = FieldTypeEnum.TEXT_LINE, + is_default: bool = False, + position: int | None = None, + ) -> ValueType: + """Create a new ValueType row and return it. + + - Preserves the provided `key` as-is. + - Derives a display `name` from key when not provided. + - Appends to the end of current field positions when `position` is not provided. + """ + display_name = name or key.replace("_", " ").title() + + with Session(self.engine) as session: + existing = session.scalar(select(ValueType).where(ValueType.key == key)) + if existing: + session.expunge(existing) + return existing + + if position is None: + max_pos = session.scalar(select(func.max(ValueType.position))) + position = (max_pos or 0) + 1 + + vt = ValueType( + key=key, + name=display_name, + type=field_type, + is_default=is_default, + position=position, + ) + try: + session.add(vt) + session.commit() + session.expunge(vt) + except IntegrityError: + session.rollback() + # Fetch the existing row to return a consistent object + vt = unwrap(session.scalar(select(ValueType).where(ValueType.key == key))) + session.expunge(vt) + return vt + + def ensure_value_type( + self, + key: str, + *, + name: str | None = None, + field_type: FieldTypeEnum = FieldTypeEnum.TEXT_LINE, + is_default: bool = False, + ) -> ValueType: + """Get or create a `ValueType` with the provided key. + + Returns the existing type when present; otherwise creates it. + """ + try: + return self.get_value_type(key) + except Exception: + return self.add_value_type( + key, + name=name, + field_type=field_type, + is_default=is_default, + ) + def add_field_to_entry( self, entry_id: int, diff --git a/src/tagstudio/qt/mixed/paths_to_fields.py b/src/tagstudio/qt/mixed/paths_to_fields.py new file mode 100644 index 000000000..128721a36 --- /dev/null +++ b/src/tagstudio/qt/mixed/paths_to_fields.py @@ -0,0 +1,458 @@ + +from __future__ import annotations + +import re +from collections.abc import Iterable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +from PySide6.QtCore import Qt +from PySide6.QtWidgets import ( + QCheckBox, + QComboBox, + QFormLayout, + QFrame, + QHBoxLayout, + QLabel, + QLineEdit, + QMessageBox, + QPlainTextEdit, + QPushButton, + QSizePolicy, + QVBoxLayout, + QWidget, +) + +from tagstudio.core.library.alchemy.enums import FieldTypeEnum +from tagstudio.core.library.alchemy.fields import FieldID +from tagstudio.core.library.alchemy.library import Library +from tagstudio.core.library.alchemy.models import Entry +from tagstudio.core.utils.types import unwrap +from tagstudio.qt.translations import Translations + +if TYPE_CHECKING: + from tagstudio.qt.ts_qt import QtDriver + + +@dataclass +class PathFieldRule: + """Define how to extract data from a path and map to fields. + + pattern: Full regex applied to the entry path (string form). Supports + numbered groups ($1) and named groups ($name / ${name}). + fields: Mapping of field keys to value templates. Templates can contain + placeholders like "$1", "$name", or "${name}". + use_filename_only: If True, match only against the filename, else full path. + flags: Regex flags OR'd, e.g. re.IGNORECASE. + """ + + pattern: str + fields: dict[str, str] + use_filename_only: bool = False + flags: int = 0 + + def compile(self) -> re.Pattern[str]: + return re.compile(self.pattern, self.flags) + + +@dataclass +class EntryFieldUpdate: + entry_id: int + path: str + updates: dict[str, str] = field(default_factory=dict) + + +PLACEHOLDER_RE = re.compile( + r"\$(?:\{(?P[A-Za-z_][A-Za-z0-9_]*)\}|(?P[A-Za-z_][A-Za-z0-9_]*)|(?P\d+))(?P\+\+|--)?" +) + + +def _expand_template(template: str, match: re.Match[str]) -> str: + def repl(m: re.Match[str]) -> str: + original = "" + if (idx := m.group("i")) is not None: + try: + original = match.group(int(idx)) or "" + except IndexError: + original = "" + else: + name = m.group("n1") or m.group("n2") + if name: + original = match.groupdict().get(name, "") or "" + + op = m.group("op") + if not op: + return original + + # Apply simple numeric transforms with zero-fill preservation + if original.isdigit(): + width = len(original) + try: + num = int(original) + if op == "++": + num += 1 + elif op == "--": + num -= 1 + return str(num).zfill(width) + except ValueError: + return original + return original + + return PLACEHOLDER_RE.sub(repl, template) + + +def _iter_entries(library: Library) -> Iterable[Entry]: + # with_joins=True ensures we can inspect current fields when needed + yield from library.all_entries(with_joins=True) + +def preview_paths_to_fields( + library: Library, + rules: list[PathFieldRule], + only_unset: bool = True, +) -> list[EntryFieldUpdate]: + """Return a dry-run of field updates inferred from entry paths. + + - Respects existing non-empty field values when only_unset=True. + - Supports multiple rules; first matching rule contributes its mapped fields. + """ + compiled = [(r, r.compile()) for r in rules] + results: list[EntryFieldUpdate] = [] + + # Determine library root for relative matching + base_path = None + try: + folder_obj = getattr(library, "folder", None) + if folder_obj is not None: + base_path = getattr(folder_obj, "path", None) + except Exception: + base_path = None + + for entry in _iter_entries(library): + # Normalize path for cross-platform matching (use forward slashes), use relative if possible + try: + if base_path is not None: + rel = entry.path.relative_to(base_path) + full_path = rel.as_posix() + else: + full_path = ( + entry.path.as_posix() + if hasattr(entry.path, "as_posix") + else str(entry.path).replace("\\", "/") + ) # ** TODO: move to helper + except Exception: + full_path = ( + entry.path.as_posix() + if hasattr(entry.path, "as_posix") + else str(entry.path).replace("\\", "/") + ) + + pending: dict[str, str] = {} + + # DEBUG: minimal trace for first entries (temporarily enabled to diagnose matching) + # print(f"[preview] full_path={full_path}") + + for rule, cre in compiled: + target = entry.filename if rule.use_filename_only else full_path + m = cre.search(target) + if not m: + continue + + for key, tmpl in rule.fields.items(): + value = _expand_template(tmpl, m).strip() + if value == "": + continue + + if only_unset: + # check if field key exists and has a non-empty value + existing = next((f for f in entry.fields if ( + f.type_key == key and (f.value or "") != "")), None) + if existing: + continue + + pending[key] = value + + if pending: + results.append(EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending)) + + return results + + +# ** TODO: document the optional 'field_types' parameter (maps field keys to FieldTypeEnum) +def apply_paths_to_fields( + library: Library, + updates: list[EntryFieldUpdate], + *, + create_missing_field_types: bool = True, + overwrite: bool = False, + field_types: dict[str, FieldTypeEnum] | None = None, +) -> int: + """Apply field updates to entries. + + - If a field key doesn't exist, optionally create a new ValueType. + - If the field already exists on an entry: + - Overwrite when overwrite=True + - Otherwise only fill when existing value is empty or None. + + Returns the count of individual field updates applied. + """ + applied = 0 + + for upd in updates: + entry = unwrap(library.get_entry_full(upd.entry_id)) + + for key, value in upd.updates.items(): # ** TODO: optimizeations can be made here + # ensure field type exists if requested + if create_missing_field_types: + # prefer library-provided helper if available, else attempt to create/get via available APIs + _ensure_fn = getattr(library, "ensure_value_type", None) + ftype = FieldTypeEnum.TEXT_LINE + if field_types and key in field_types: + ftype = field_types[key] + if callable(_ensure_fn): + _ensure_fn(key, name=None, field_type=ftype) + else: + try: + # try to access existing type + library.get_value_type(key) + except Exception: + # try common creation APIs if present + _create_fn = ( + getattr(library, "create_value_type", None) + or getattr(library, "add_value_type", None) + ) + if callable(_create_fn): + _create_fn(key, name=None, field_type=ftype) + else: + # fallback to calling get_value_type to raise a clear error + library.get_value_type(key) + else: + # will raise if missing; keep behavior explicit + library.get_value_type(key) + + existing = next((f for f in entry.fields if f.type_key == key), None) + if existing: + current = existing.value or "" + if overwrite or current == "": + library.update_entry_field(entry.id, existing, value) + applied += 1 + continue + + if library.add_field_to_entry(entry.id, field_id=key, value=value): + applied += 1 + + return applied + + +# ================= UI: Paths → Fields Modal ================ + + +class _MappingRow(QWidget): + def __init__(self, parent: QWidget | None = None) -> None: + super().__init__(parent) + layout = QHBoxLayout(self) + layout.setContentsMargins(0, 0, 0, 0) + # Field selector: choose from built-in FieldID + self.field_select = QComboBox() + for fid in FieldID: + self.field_select.addItem(fid.value.name, fid.name) + self.val_edit = QLineEdit() + self.val_edit.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + self.remove_btn = QPushButton("-") + self.remove_btn.setFixedWidth(28) + layout.addWidget(self.field_select) + layout.addWidget(self.val_edit) + layout.addWidget(self.remove_btn) + + + def as_pair(self) -> tuple[str, str] | None: + v = self.val_edit.text().strip() + if not v: + return None + fid_name = self.field_select.currentData() + return (str(fid_name), v) + + + +class PathsToFieldsModal(QWidget): + def __init__(self, library: Library, driver: QtDriver) -> None: + super().__init__() + self.library = library + self.driver = driver + self.setWindowTitle(Translations["paths_to_fields.title"]) # fallback shows [key] + self.setWindowModality(Qt.WindowModality.ApplicationModal) + self.setMinimumSize(720, 640) + + root = QVBoxLayout(self) + root.setContentsMargins(8, 8, 8, 8) + + title = QLabel(Translations["paths_to_fields.title"]) # may show [paths_to_fields.title] + title.setAlignment(Qt.AlignmentFlag.AlignCenter) + title.setStyleSheet("font-weight:600;font-size:14px;padding:6px 0") + desc = QLabel( + Translations[ + "paths_to_fields.description" + ] + ) + desc.setWordWrap(True) + desc.setAlignment(Qt.AlignmentFlag.AlignCenter) + + # Pattern and options (use a FormLayout to tie label to input) + form = QWidget() + form_layout = QFormLayout(form) + form_layout.setContentsMargins(0, 0, 0, 0) + form_layout.setFormAlignment(Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignTop) + form_layout.setLabelAlignment(Qt.AlignmentFlag.AlignLeft) + form_layout.setFieldGrowthPolicy(QFormLayout.FieldGrowthPolicy.ExpandingFieldsGrow) + + pattern_label = QLabel(Translations["paths_to_fields.pattern_label"]) + self.pattern_edit = QPlainTextEdit() + self.pattern_edit.setPlaceholderText(r"^(?P[^/]+)/(?P[^_]+)_(?P\d+)\.[^.]+$") + self.pattern_edit.setFixedHeight(80) + self.pattern_edit.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + pattern_label.setBuddy(self.pattern_edit) + + self.filename_only_cb = QCheckBox(Translations["paths_to_fields.use_filename_only"]) + + form_layout.addRow(pattern_label, self.pattern_edit) + form_layout.addRow(self.filename_only_cb) + + # Ensure the form block doesn't vertically stretch on resize + form.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) + + # Mappings section + map_label = QLabel(Translations["paths_to_fields.mappings_label"]) + map_container = QWidget() + self.map_v = QVBoxLayout(map_container) + self.map_v.setContentsMargins(0, 0, 0, 0) + self.map_v.setSpacing(6) + # Keep mappings area height fixed to its contents + map_container.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) + + add_map_btn = QPushButton(Translations["paths_to_fields.add_mapping"]) + add_map_btn.clicked.connect(self._add_mapping_row) + + # Preview area + preview_btn = QPushButton(Translations["paths_to_fields.preview"]) + preview_btn.clicked.connect(self._on_preview) + self.preview_area = QPlainTextEdit() + self.preview_area.setReadOnly(True) + self.preview_area.setFrameShape(QFrame.Shape.StyledPanel) + self.preview_area.setPlaceholderText(Translations["paths_to_fields.preview_empty"]) + self.preview_area.setMinimumHeight(200) + self.preview_area.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + + # Apply + apply_btn = QPushButton(Translations["generic.apply_alt"]) # existing key + apply_btn.setMinimumWidth(100) + apply_btn.clicked.connect(self._on_apply) + + # Layout assembly + root.addWidget(title) + root.addWidget(desc) + root.addWidget(form) + root.addWidget(map_label) + root.addWidget(map_container) + root.addWidget(add_map_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(preview_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.preview_area) + root.addWidget(apply_btn, alignment=Qt.AlignmentFlag.AlignCenter) + + # Make only the preview area consume extra vertical space on resize + root.setStretchFactor(self.preview_area, 1) + + # Seed one mapping row + self._add_mapping_row() + + def _add_mapping_row(self): + row = _MappingRow() + row.remove_btn.clicked.connect(lambda: self._remove_row(row)) + self.map_v.addWidget(row) + + def _remove_row(self, row: _MappingRow): + row.setParent(None) + + def _collect_rules(self) -> tuple[list[PathFieldRule], dict[str, FieldTypeEnum]] | None: + pattern = self.pattern_edit.toPlainText().strip() + if not pattern: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Warning) + msg_box.setWindowTitle(Translations["window.title.error"]) # reuse common title + msg_box.setText(Translations["paths_to_fields.msg.enter_pattern"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + fields: dict[str, str] = {} + f_types: dict[str, FieldTypeEnum] = {} + for i in range(self.map_v.count()): + w = self.map_v.itemAt(i).widget() + if isinstance(w, _MappingRow): + kv = w.as_pair() + if kv: + k, v = kv + fields[k] = v + # No custom fields support in UI; backend keeps optional field_types for tests + if not fields: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Warning) + msg_box.setWindowTitle(Translations["window.title.error"]) # reuse common title + msg_box.setText(Translations["paths_to_fields.msg.add_mapping"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + try: + re.compile(pattern) + except re.error as e: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Critical) + msg_box.setWindowTitle(Translations["paths_to_fields.msg.invalid_regex_title"]) + msg_box.setText(Translations["paths_to_fields.msg.invalid_regex_title"]) + msg_box.setInformativeText(str(e)) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return None + rule = PathFieldRule( + pattern=pattern, + fields=fields, + use_filename_only=self.filename_only_cb.isChecked(), + ) + return [rule], f_types + + def _on_preview(self): + r = self._collect_rules() + if not r: + return + rules, _ = r + previews = preview_paths_to_fields(self.library, rules) + if not previews: + self.preview_area.setPlainText(Translations["paths_to_fields.msg.no_matches"]) + return + lines: list[str] = [] + for upd in previews: + lines.append(f"{upd.path}") + for k, v in upd.updates.items(): + lines.append(f" - {k}: {v}") + self.preview_area.setPlainText("\n".join(lines)) + + def _on_apply(self): + r = self._collect_rules() + if not r: + return + rules, f_types = r + previews = preview_paths_to_fields(self.library, rules) + if not previews: + msg_box = QMessageBox() + msg_box.setIcon(QMessageBox.Icon.Information) + msg_box.setWindowTitle(Translations["paths_to_fields.title"]) # use modal title + msg_box.setText(Translations["paths_to_fields.msg.no_matches"]) + msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) + msg_box.exec_() + return + apply_paths_to_fields( + self.library, + previews, + create_missing_field_types=True, + field_types=f_types, + ) + self.close() + # refresh selection/preview pane like other macros + self.driver.main_window.preview_panel.set_selection(self.driver.selected, update_preview=False) diff --git a/src/tagstudio/qt/ts_qt.py b/src/tagstudio/qt/ts_qt.py index 8d7edde30..f720191d8 100644 --- a/src/tagstudio/qt/ts_qt.py +++ b/src/tagstudio/qt/ts_qt.py @@ -84,6 +84,7 @@ from tagstudio.qt.mixed.folders_to_tags import FoldersToTagsModal from tagstudio.qt.mixed.item_thumb import BadgeType from tagstudio.qt.mixed.migration_modal import JsonMigrationModal +from tagstudio.qt.mixed.paths_to_fields import PathsToFieldsModal from tagstudio.qt.mixed.progress_bar import ProgressWidget from tagstudio.qt.mixed.settings_panel import SettingsPanel from tagstudio.qt.mixed.tag_color_manager import TagColorManager @@ -543,6 +544,15 @@ def create_folders_tags_modal(): create_folders_tags_modal ) + def create_paths_fields_modal(): + if not hasattr(self, "paths_fields_modal"): + self.paths_fields_modal = PathsToFieldsModal(self.lib, self) + self.paths_fields_modal.show() + + self.main_window.menu_bar.paths_to_fields_action.triggered.connect( + create_paths_fields_modal + ) + # endregion # region Help Menu ============================================================ @@ -769,6 +779,7 @@ def close_library(self, is_shutdown: bool = False): self.main_window.menu_bar.fix_dupe_files_action.setEnabled(False) self.main_window.menu_bar.clear_thumb_cache_action.setEnabled(False) self.main_window.menu_bar.folders_to_tags_action.setEnabled(False) + self.main_window.menu_bar.paths_to_fields_action.setEnabled(False) self.main_window.menu_bar.library_info_action.setEnabled(False) except AttributeError: logger.warning( @@ -1622,6 +1633,7 @@ def _init_library(self, path: Path, open_status: LibraryStatus): self.main_window.menu_bar.fix_dupe_files_action.setEnabled(True) self.main_window.menu_bar.clear_thumb_cache_action.setEnabled(True) self.main_window.menu_bar.folders_to_tags_action.setEnabled(True) + self.main_window.menu_bar.paths_to_fields_action.setEnabled(True) self.main_window.menu_bar.library_info_action.setEnabled(True) self.main_window.preview_panel.set_selection(self.selected) diff --git a/src/tagstudio/qt/views/main_window.py b/src/tagstudio/qt/views/main_window.py index df675fbe6..14c5f84b2 100644 --- a/src/tagstudio/qt/views/main_window.py +++ b/src/tagstudio/qt/views/main_window.py @@ -385,6 +385,11 @@ def setup_macros_menu(self): self.folders_to_tags_action.setEnabled(False) self.macros_menu.addAction(self.folders_to_tags_action) + # Paths → Fields + self.paths_to_fields_action = QAction(Translations["menu.macros.paths_to_fields"], self) + self.paths_to_fields_action.setEnabled(False) + self.macros_menu.addAction(self.paths_to_fields_action) + assign_mnemonics(self.macros_menu) self.addMenu(self.macros_menu) diff --git a/src/tagstudio/resources/translations/en.json b/src/tagstudio/resources/translations/en.json index edda02311..35c6d665a 100644 --- a/src/tagstudio/resources/translations/en.json +++ b/src/tagstudio/resources/translations/en.json @@ -229,6 +229,7 @@ "menu.help.about": "About", "menu.help": "&Help", "menu.macros.folders_to_tags": "Folders to Tags", + "menu.macros.paths_to_fields": "Paths to Fields", "menu.macros": "&Macros", "menu.select": "Select", "menu.settings": "Settings...", @@ -246,6 +247,21 @@ "namespace.create.title": "Create Namespace", "namespace.new.button": "New Namespace", "namespace.new.prompt": "Create a New Namespace to Start Adding Custom Colors!", + "paths_to_fields.add_mapping": "Add Mapping", + "paths_to_fields.converting": "Converting paths to Fields", + "paths_to_fields.description": "Creates fields based on your file paths and applies them to your entries.\n The structure below shows all the fields that will be created and what entries they will be applied to.", + "paths_to_fields.field_key_placeholder": "field_key e.g. page_number", + "paths_to_fields.pattern_label": "File Path Pattern", + "paths_to_fields.preview": "Preview", + "paths_to_fields.preview_empty": "No Preview Available", + "paths_to_fields.mappings_label": "Field Mappings", + "paths_to_fields.msg.enter_pattern": "Please enter a regex pattern.", + "paths_to_fields.msg.add_mapping": "Please add at least one field mapping.", + "paths_to_fields.msg.invalid_regex_title": "Invalid Regex", + "paths_to_fields.msg.no_matches": "No matches found.", + "paths_to_fields.template_placeholder": "template e.g. $page or example.com/$id", + "paths_to_fields.title": "Create Fields From Paths", + "paths_to_fields.use_filename_only": "Use Filename Only", "preview.ignored": "Ignored", "preview.multiple_selection": "{count} Items Selected", "preview.no_selection": "No Items Selected", diff --git a/tests/macros/test_paths_to_fields.py b/tests/macros/test_paths_to_fields.py new file mode 100644 index 000000000..cec4814ee --- /dev/null +++ b/tests/macros/test_paths_to_fields.py @@ -0,0 +1,86 @@ +# Copyright (C) 2025 +# Licensed under the GPL-3.0 License. +# Created for TagStudio: https://github.com/CyanVoxel/TagStudio + +from pathlib import Path + +from tagstudio.core.library.alchemy.fields import FieldID +from tagstudio.core.library.alchemy.library import Library +from tagstudio.core.library.alchemy.models import Entry +from tagstudio.core.utils.types import unwrap +from tagstudio.qt.mixed.paths_to_fields import ( + PathFieldRule, + apply_paths_to_fields, + preview_paths_to_fields, +) + + +def test_paths_to_fields_preview_and_apply(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("series-MySeries/01_10.jpg"), fields=[]), + Entry(folder=folder, path=Path("creator-jdoe/abc123_02.png"), fields=[]), + Entry( + folder=folder, + path=Path("creator-jane/Some-Series_source-name_003.jpeg"), + fields=[], + ), + ] + ids = library.add_entries(entries) + + rules = [ + # series-{series}/{page}_{total}.ext + PathFieldRule( + pattern=r"^series-(?P[^/]+)/(?P\d+)_\d+\.[^.]+$", + fields={ + FieldID.SERIES.name: "$series", + "page_number": "$page", + }, + ), + # creator-{artist}/{source_ident}_{page}.ext -> artist + source URL + PathFieldRule( + pattern=r"^creator-(?P[^/]+)/(?P[^_]+)_(?P\d+)\.[^.]+$", + fields={ + FieldID.ARTIST.name: "$artist", + FieldID.SOURCE.name: "example.com/abc/$source_ident", + }, + ), + # creator-{artist}/{series}_{source}_{page}.ext + PathFieldRule( + pattern=r"^creator-(?P[^/]+)/(?P[^_]+)_(?P[^_]+)_(?P\d+)\.[^.]+$", + fields={ + FieldID.ARTIST.name: "$artist", + FieldID.SERIES.name: "$series", + FieldID.SOURCE.name: "$source", + "page_number": "$page", + }, + ), + ] + + preview = preview_paths_to_fields(library, rules) + # should propose updates for all 3 entries + assert len(preview) == 3 + + applied = apply_paths_to_fields(library, preview, create_missing_field_types=True) + # ** TODO: The test only verifies that 'applied >= 5' but doesn't + # verify the exact number or check for potential duplicate field assignments. + assert applied >= 5 # at least series + page + artist + source for 2 rules + + # Validate the fields were set as expected + e0 = unwrap(library.get_entry_full(ids[0])) + kv0 = {f.type_key: (f.value or "") for f in e0.fields} + assert kv0.get(FieldID.SERIES.name) == "MySeries" + assert kv0.get("page_number") == "01" + + e1 = unwrap(library.get_entry_full(ids[1])) + kv1 = {f.type_key: (f.value or "") for f in e1.fields} + assert kv1.get(FieldID.ARTIST.name) == "jdoe" + assert kv1.get(FieldID.SOURCE.name) == "example.com/abc/abc123" + + e2 = unwrap(library.get_entry_full(ids[2])) + kv2 = {f.type_key: (f.value or "") for f in e2.fields} + assert kv2.get(FieldID.ARTIST.name) == "jane" + assert kv2.get(FieldID.SERIES.name) == "Some-Series" + assert kv2.get(FieldID.SOURCE.name) == "source-name" + assert kv2.get("page_number") == "003" From 1d200c7ee16edd0770d10c1333e6e2bcd40dd9e4 Mon Sep 17 00:00:00 2001 From: Catting <5874051+mm12@users.noreply.github.com> Date: Sun, 16 Nov 2025 21:55:40 -0600 Subject: [PATCH 2/4] fix: path-to-field bugfixes * Enable multi-line input for multi-line text fields * Allow multiple fields of the same type to be added --- src/tagstudio/qt/mixed/paths_to_fields.py | 150 ++++++++++++++++------ tests/macros/test_paths_to_fields.py | 30 +++++ 2 files changed, 140 insertions(+), 40 deletions(-) diff --git a/src/tagstudio/qt/mixed/paths_to_fields.py b/src/tagstudio/qt/mixed/paths_to_fields.py index 128721a36..4ceff7c13 100644 --- a/src/tagstudio/qt/mixed/paths_to_fields.py +++ b/src/tagstudio/qt/mixed/paths_to_fields.py @@ -39,18 +39,24 @@ class PathFieldRule: """Define how to extract data from a path and map to fields. pattern: Full regex applied to the entry path (string form). Supports - numbered groups ($1) and named groups ($name / ${name}). - fields: Mapping of field keys to value templates. Templates can contain - placeholders like "$1", "$name", or "${name}". + numbered groups ($1) and named groups ($name / ${name}). + fields: A list of (field_key, template) pairs. Templates can contain + placeholders like "$1", "$name", or "${name}". Dicts are accepted + for backward compatibility and will be converted preserving iteration order. use_filename_only: If True, match only against the filename, else full path. flags: Regex flags OR'd, e.g. re.IGNORECASE. """ pattern: str - fields: dict[str, str] + fields: list[tuple[str, str]] use_filename_only: bool = False flags: int = 0 + def __post_init__(self) -> None: + # Back-compat: allow callers/tests to pass a dict mapping. + if isinstance(self.fields, dict): + self.fields = list(self.fields.items()) + def compile(self) -> re.Pattern[str]: return re.compile(self.pattern, self.flags) @@ -59,7 +65,8 @@ def compile(self) -> re.Pattern[str]: class EntryFieldUpdate: entry_id: int path: str - updates: dict[str, str] = field(default_factory=dict) + # list of (field_key, value) to preserve duplicates and order + updates: list[tuple[str, str]] = field(default_factory=list) PLACEHOLDER_RE = re.compile( @@ -146,33 +153,35 @@ def preview_paths_to_fields( else str(entry.path).replace("\\", "/") ) - pending: dict[str, str] = {} + pending_list: list[tuple[str, str]] = [] # DEBUG: minimal trace for first entries (temporarily enabled to diagnose matching) # print(f"[preview] full_path={full_path}") + # Precompute keys that should be skipped entirely when only_unset=True + skip_keys: set[str] = set() + if only_unset: + for f in entry.fields: + if (f.value or "") != "": + skip_keys.add(f.type_key) + for rule, cre in compiled: target = entry.filename if rule.use_filename_only else full_path m = cre.search(target) if not m: continue - for key, tmpl in rule.fields.items(): + for key, tmpl in rule.fields: + if only_unset and key in skip_keys: + continue value = _expand_template(tmpl, m).strip() if value == "": continue - if only_unset: - # check if field key exists and has a non-empty value - existing = next((f for f in entry.fields if ( - f.type_key == key and (f.value or "") != "")), None) - if existing: - continue + pending_list.append((key, value)) - pending[key] = value - - if pending: - results.append(EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending)) + if pending_list: + results.append(EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending_list)) return results @@ -200,10 +209,14 @@ def apply_paths_to_fields( for upd in updates: entry = unwrap(library.get_entry_full(upd.entry_id)) - for key, value in upd.updates.items(): # ** TODO: optimizeations can be made here + # Group proposed updates by field key to handle duplicates and overwrites deterministically + grouped: dict[str, list[str]] = {} + for key, value in upd.updates: + grouped.setdefault(key, []).append(value) + + for key, values in grouped.items(): # ensure field type exists if requested if create_missing_field_types: - # prefer library-provided helper if available, else attempt to create/get via available APIs _ensure_fn = getattr(library, "ensure_value_type", None) ftype = FieldTypeEnum.TEXT_LINE if field_types and key in field_types: @@ -212,10 +225,8 @@ def apply_paths_to_fields( _ensure_fn(key, name=None, field_type=ftype) else: try: - # try to access existing type library.get_value_type(key) except Exception: - # try common creation APIs if present _create_fn = ( getattr(library, "create_value_type", None) or getattr(library, "add_value_type", None) @@ -223,22 +234,39 @@ def apply_paths_to_fields( if callable(_create_fn): _create_fn(key, name=None, field_type=ftype) else: - # fallback to calling get_value_type to raise a clear error library.get_value_type(key) else: - # will raise if missing; keep behavior explicit library.get_value_type(key) - existing = next((f for f in entry.fields if f.type_key == key), None) - if existing: - current = existing.value or "" - if overwrite or current == "": - library.update_entry_field(entry.id, existing, value) - applied += 1 + existing_fields = [f for f in entry.fields if f.type_key == key] + + if overwrite: + # Overwrite existing in order, then append any remaining values + for i, val in enumerate(values): + if i < len(existing_fields): + library.update_entry_field(entry.id, existing_fields[i], val) + applied += 1 + else: + if library.add_field_to_entry(entry.id, field_id=key, value=val): + applied += 1 + continue + + # not overwrite: only fill when all existing are empty + # (prior behavior was 'any non-empty blocks') + if any((f.value or "") != "" for f in existing_fields): continue - if library.add_field_to_entry(entry.id, field_id=key, value=value): + # Fill existing empties first, then append extra + idx = 0 + for f in existing_fields: + if idx >= len(values): + break + library.update_entry_field(entry.id, f, values[idx]) applied += 1 + idx += 1 + for j in range(idx, len(values)): + if library.add_field_to_entry(entry.id, field_id=key, value=values[j]): + applied += 1 return applied @@ -255,22 +283,56 @@ def __init__(self, parent: QWidget | None = None) -> None: self.field_select = QComboBox() for fid in FieldID: self.field_select.addItem(fid.value.name, fid.name) - self.val_edit = QLineEdit() - self.val_edit.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + # Single-line editor + self.val_edit_line = QLineEdit() + self.val_edit_line.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + # Multi-line editor (for TEXT_BOX fields) + self.val_edit_box = QPlainTextEdit() + self.val_edit_box.setPlaceholderText(Translations["paths_to_fields.template_placeholder"]) + self.val_edit_box.setFixedHeight(64) self.remove_btn = QPushButton("-") self.remove_btn.setFixedWidth(28) layout.addWidget(self.field_select) - layout.addWidget(self.val_edit) + layout.addWidget(self.val_edit_line) + layout.addWidget(self.val_edit_box) layout.addWidget(self.remove_btn) + # Start with proper editor based on current selection + self._update_editor_kind() + self.field_select.currentIndexChanged.connect(self._update_editor_kind) + def as_pair(self) -> tuple[str, str] | None: - v = self.val_edit.text().strip() + editor = self._current_value_editor() + v = ( + editor.toPlainText().strip() + if isinstance(editor, QPlainTextEdit) + else editor.text().strip() + ) if not v: return None fid_name = self.field_select.currentData() return (str(fid_name), v) + def _current_value_editor(self) -> QLineEdit | QPlainTextEdit: + # TEXT_BOX => multi-line, else single-line + try: + fid_name = self.field_select.currentData() + ftype = ( + FieldID[fid_name].value.type + if fid_name in FieldID.__members__ + else FieldTypeEnum.TEXT_LINE + ) + except Exception: + ftype = FieldTypeEnum.TEXT_LINE + return self.val_edit_box if ftype == FieldTypeEnum.TEXT_BOX else self.val_edit_line + + def _update_editor_kind(self) -> None: + editor = self._current_value_editor() + use_box = isinstance(editor, QPlainTextEdit) + self.val_edit_box.setVisible(use_box) + self.val_edit_line.setVisible(not use_box) + class PathsToFieldsModal(QWidget): @@ -346,6 +408,15 @@ def __init__(self, library: Library, driver: QtDriver) -> None: apply_btn.setMinimumWidth(100) apply_btn.clicked.connect(self._on_apply) + # Ensure pressing Enter in editors doesn't trigger any default button + # Explicitly disable default behaviors on buttons + for b in (preview_btn, apply_btn): + try: + b.setAutoDefault(False) + b.setDefault(False) + except Exception: + pass + # Layout assembly root.addWidget(title) root.addWidget(desc) @@ -381,17 +452,16 @@ def _collect_rules(self) -> tuple[list[PathFieldRule], dict[str, FieldTypeEnum]] msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) msg_box.exec_() return None - fields: dict[str, str] = {} + fields_list: list[tuple[str, str]] = [] f_types: dict[str, FieldTypeEnum] = {} for i in range(self.map_v.count()): w = self.map_v.itemAt(i).widget() if isinstance(w, _MappingRow): kv = w.as_pair() if kv: - k, v = kv - fields[k] = v + fields_list.append(kv) # No custom fields support in UI; backend keeps optional field_types for tests - if not fields: + if not fields_list: msg_box = QMessageBox() msg_box.setIcon(QMessageBox.Icon.Warning) msg_box.setWindowTitle(Translations["window.title.error"]) # reuse common title @@ -412,7 +482,7 @@ def _collect_rules(self) -> tuple[list[PathFieldRule], dict[str, FieldTypeEnum]] return None rule = PathFieldRule( pattern=pattern, - fields=fields, + fields=fields_list, use_filename_only=self.filename_only_cb.isChecked(), ) return [rule], f_types @@ -429,7 +499,7 @@ def _on_preview(self): lines: list[str] = [] for upd in previews: lines.append(f"{upd.path}") - for k, v in upd.updates.items(): + for k, v in upd.updates: lines.append(f" - {k}: {v}") self.preview_area.setPlainText("\n".join(lines)) diff --git a/tests/macros/test_paths_to_fields.py b/tests/macros/test_paths_to_fields.py index cec4814ee..c5033ecb6 100644 --- a/tests/macros/test_paths_to_fields.py +++ b/tests/macros/test_paths_to_fields.py @@ -84,3 +84,33 @@ def test_paths_to_fields_preview_and_apply(library: Library): assert kv2.get(FieldID.SERIES.name) == "Some-Series" assert kv2.get(FieldID.SOURCE.name) == "source-name" assert kv2.get("page_number") == "003" + + +def test_paths_to_fields_allows_duplicate_fields(library: Library): + folder = unwrap(library.folder) + + entry = Entry(folder=folder, path=Path("multi-foo_bar.jpg"), fields=[]) + [eid] = library.add_entries([entry]) + + rule = PathFieldRule( + pattern=r"^multi-(?P[^_]+)_(?P[^.]+)\.[^.]+$", + fields=[ + (FieldID.COMMENTS.name, "$a"), + (FieldID.COMMENTS.name, "$b"), + ], + ) + + preview = preview_paths_to_fields(library, [rule]) + assert len(preview) == 1 + # Should propose two updates for the same key, in order + assert preview[0].updates == [ + (FieldID.COMMENTS.name, "foo"), + (FieldID.COMMENTS.name, "bar"), + ] + + applied = apply_paths_to_fields(library, preview, create_missing_field_types=True) + assert applied == 2 + + e = unwrap(library.get_entry_full(eid)) + comment_values = [f.value or "" for f in e.fields if f.type_key == FieldID.COMMENTS.name] + assert sorted(comment_values) == ["bar", "foo"] From 2d2a5784944d3674c944eda48365d59b776369dd Mon Sep 17 00:00:00 2001 From: Catting <5874051+mm12@users.noreply.github.com> Date: Wed, 19 Nov 2025 14:50:58 -0600 Subject: [PATCH 3/4] feat: Improve upon UI and docs * Add progress bar when applying changes * Add progress bar when loading preview * Add documentation markdown * Add warning icon for items that will be skipped * Add ability for user to apply to warned items --- docs/macros.md | 17 + src/tagstudio/core/library/alchemy/library.py | 6 + src/tagstudio/qt/mixed/paths_to_fields.py | 458 +++++++++++++++--- src/tagstudio/resources/translations/en.json | 6 + tests/macros/test_paths_to_fields.py | 95 ++++ 5 files changed, 519 insertions(+), 63 deletions(-) diff --git a/docs/macros.md b/docs/macros.md index 9268d267a..41f7edd55 100644 --- a/docs/macros.md +++ b/docs/macros.md @@ -48,3 +48,20 @@ Tool is in development. Will allow for user-defined sorting of [fields](fields.m ### Folders to Tags Creates tags from the existing folder structure in the library, which are previewed in a hierarchy view for the user to confirm. A tag will be created for each folder and applied to all entries, with each subfolder being linked to the parent folder as a [parent tag](tags.md#parent-tags). Tags will initially be named after the folders, but can be fully edited and customized afterwards. + +### Paths to Fields + +Populates fields on entries based on their file paths. Users can define regular expressions to extract specific parts of the path, which can be referenced when adding a field. +In addition, simple operations (`++` and `--`) can be applied on numberic fields. This allows 0-indexed fields to be converted to 1-indexed fields, and vise-a-versa. +Example usage: +: Say you have paths like +: `TagStudioLibrary/artist-artistusername/series name/work title --- page 0.png` +: We want to extract `artistusername`, `series name`, `work title`, and `0` (the page number). +: To do this, we can define an expression to fully constrain our path. We *can* allow looser constraints, however if we do that we need to be more careful ensuring the preview matches our desired outcome. +: Here are some handy pieces: +: * `[^\.]+$` - This matches anything after the final `.` in the path. In other words, the file extension. Even if your path contains a `.`, this ensures the matching does not end early. `$` is an anchor to the end of the line. Similarly, `^` is the anchor to the start, so can be used in the begining. We need to escape `.` with a `\`, because `.` means "match any character once" in regex. `+` means "match this pattern one or more times". +: * `\\` and `\/` - these match your directory (folder) seperators. Which you use can depend on your Operating System, so use of `[\\\/]` (which matches both) is encouraged. +: * `[^\\\/]+` - Similar to the previous, but this does the opposite. This matches as many characters as it can, before it runs into a folder seperator. This is helpful in ensureing that each field you capture is truly in the folder level you expect, and not because the name of an internal folder is similar to that of an external one. +: * `\d+` and `\s+` - These match one or more digit and one or more whitespace (like spaces and tabs), respectively. If you need to further constrain this, you can use `\s?` (match a space if its there, otherwise continue) or `\d{3,5}` (match 3 to 5 digits only) to do so. +: * `(?Pmatch_pattern)` - This is a named capture group. We can define `match_pattern` to match the field we want, and make `name_of_group` our field name. This allows us to use `$name_of_group` to reference the item. If these groups were unnamed, we would need to count the order in which they occur, and use their number (ie, the first item is `$1`). +: Putting this together, we can make our regex capture: `artist-(?P[^\\\/]+)[\\\/](?P[^\\\/]+)[\\\/](?P.+) --- page\s?(?P<page>\d+)[^\.]+$` \ No newline at end of file diff --git a/src/tagstudio/core/library/alchemy/library.py b/src/tagstudio/core/library/alchemy/library.py index d3beba6c8..b03118266 100644 --- a/src/tagstudio/core/library/alchemy/library.py +++ b/src/tagstudio/core/library/alchemy/library.py @@ -966,6 +966,12 @@ def remove_entries(self, entry_ids: list[int]) -> None: session.query(Entry).where(Entry.id.in_(sub_list)).delete() session.commit() + def entry_count(self) -> int: + """Return the total number of entries in the library.""" + with Session(self.engine) as session: + count = session.scalar(select(func.count(Entry.id))) + return int(count or 0) + def has_path_entry(self, path: Path) -> bool: """Check if item with given path is in library already.""" with Session(self.engine) as session: diff --git a/src/tagstudio/qt/mixed/paths_to_fields.py b/src/tagstudio/qt/mixed/paths_to_fields.py index 4ceff7c13..6dbc6b746 100644 --- a/src/tagstudio/qt/mixed/paths_to_fields.py +++ b/src/tagstudio/qt/mixed/paths_to_fields.py @@ -1,12 +1,17 @@ - +# TODO list +# UI bugs +# - When preview loads, it extends below the apply button, likely because scrollbar isn't calculated +# - Multi-line fields sometimes get cut off when adding/removing mappings so they show up as 1 line. from __future__ import annotations import re -from collections.abc import Iterable +from collections.abc import Callable, Iterable, Iterator +from contextlib import suppress from dataclasses import dataclass, field from typing import TYPE_CHECKING -from PySide6.QtCore import Qt +from PySide6.QtCore import Qt, QThreadPool +from PySide6.QtGui import QTextOption from PySide6.QtWidgets import ( QCheckBox, QComboBox, @@ -17,6 +22,7 @@ QLineEdit, QMessageBox, QPlainTextEdit, + QProgressBar, QPushButton, QSizePolicy, QVBoxLayout, @@ -29,6 +35,8 @@ from tagstudio.core.library.alchemy.models import Entry from tagstudio.core.utils.types import unwrap from tagstudio.qt.translations import Translations +from tagstudio.qt.utils.custom_runnable import CustomRunnable +from tagstudio.qt.utils.function_iterator import FunctionIterator if TYPE_CHECKING: from tagstudio.qt.ts_qt import QtDriver @@ -69,6 +77,14 @@ class EntryFieldUpdate: updates: list[tuple[str, str]] = field(default_factory=list) +@dataclass +class PreviewProgress: + index: int + total: int | None + path: str + update: EntryFieldUpdate | None + + PLACEHOLDER_RE = re.compile( r"\$(?:\{(?P<n1>[A-Za-z_][A-Za-z0-9_]*)\}|(?P<n2>[A-Za-z_][A-Za-z0-9_]*)|(?P<i>\d+))(?P<op>\+\+|--)?" ) @@ -112,20 +128,19 @@ def _iter_entries(library: Library) -> Iterable[Entry]: # with_joins=True ensures we can inspect current fields when needed yield from library.all_entries(with_joins=True) -def preview_paths_to_fields( +def iter_preview_paths_to_fields( library: Library, rules: list[PathFieldRule], only_unset: bool = True, -) -> list[EntryFieldUpdate]: - """Return a dry-run of field updates inferred from entry paths. - - - Respects existing non-empty field values when only_unset=True. - - Supports multiple rules; first matching rule contributes its mapped fields. - """ + *, + cancel_callback: Callable[[], bool] | None = None, +) -> Iterator[PreviewProgress]: compiled = [(r, r.compile()) for r in rules] - results: list[EntryFieldUpdate] = [] + try: + total = library.entry_count() + except Exception: + total = None - # Determine library root for relative matching base_path = None try: folder_obj = getattr(library, "folder", None) @@ -134,8 +149,10 @@ def preview_paths_to_fields( except Exception: base_path = None - for entry in _iter_entries(library): - # Normalize path for cross-platform matching (use forward slashes), use relative if possible + for index, entry in enumerate(_iter_entries(library), start=1): + if cancel_callback and cancel_callback(): + break + try: if base_path is not None: rel = entry.path.relative_to(base_path) @@ -145,7 +162,7 @@ def preview_paths_to_fields( entry.path.as_posix() if hasattr(entry.path, "as_posix") else str(entry.path).replace("\\", "/") - ) # ** TODO: move to helper + ) except Exception: full_path = ( entry.path.as_posix() @@ -155,10 +172,6 @@ def preview_paths_to_fields( pending_list: list[tuple[str, str]] = [] - # DEBUG: minimal trace for first entries (temporarily enabled to diagnose matching) - # print(f"[preview] full_path={full_path}") - - # Precompute keys that should be skipped entirely when only_unset=True skip_keys: set[str] = set() if only_unset: for f in entry.fields: @@ -180,9 +193,27 @@ def preview_paths_to_fields( pending_list.append((key, value)) + update = None if pending_list: - results.append(EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending_list)) + update = EntryFieldUpdate(entry_id=entry.id, path=full_path, updates=pending_list) + + yield PreviewProgress(index=index, total=total, path=full_path, update=update) + + +def preview_paths_to_fields( + library: Library, + rules: list[PathFieldRule], + only_unset: bool = True, +) -> list[EntryFieldUpdate]: + """Return a dry-run of field updates inferred from entry paths. + - Respects existing non-empty field values when only_unset=True. + - Supports multiple rules; first matching rule contributes its mapped fields. + """ + results: list[EntryFieldUpdate] = [] + for progress in iter_preview_paths_to_fields(library, rules, only_unset=only_unset): + if progress.update: + results.append(progress.update) return results @@ -194,13 +225,15 @@ def apply_paths_to_fields( create_missing_field_types: bool = True, overwrite: bool = False, field_types: dict[str, FieldTypeEnum] | None = None, + allow_existing: bool = False, ) -> int: """Apply field updates to entries. - If a field key doesn't exist, optionally create a new ValueType. - If the field already exists on an entry: - Overwrite when overwrite=True - - Otherwise only fill when existing value is empty or None. + - Otherwise only fill when existing value is empty or None unless allow_existing=True, + in which case new values are appended without replacing existing ones. Returns the count of individual field updates applied. """ @@ -239,34 +272,64 @@ def apply_paths_to_fields( library.get_value_type(key) existing_fields = [f for f in entry.fields if f.type_key == key] + existing_values = [(f.value or "") for f in existing_fields] + # De-duplicate incoming values while preserving order + seen: set[str] = set() + dedup_values: list[str] = [] + for v in values: + if v not in seen: + dedup_values.append(v) + seen.add(v) + values = dedup_values if overwrite: # Overwrite existing in order, then append any remaining values for i, val in enumerate(values): if i < len(existing_fields): - library.update_entry_field(entry.id, existing_fields[i], val) - applied += 1 + # Only write if changing the value + if (existing_values[i] if i < len(existing_values) else "") != val: + library.update_entry_field(entry.id, existing_fields[i], val) + applied += 1 else: + # Skip appending if exact duplicate already exists + if val in existing_values: + continue if library.add_field_to_entry(entry.id, field_id=key, value=val): applied += 1 continue - # not overwrite: only fill when all existing are empty - # (prior behavior was 'any non-empty blocks') - if any((f.value or "") != "" for f in existing_fields): + if not allow_existing and any(val != "" for val in existing_values): continue - # Fill existing empties first, then append extra - idx = 0 + # Fill empty slots first without disturbing existing populated values + remaining: list[str] = [] + seen_existing = set(existing_values) + for val in values: + if val in seen_existing: + continue + if val not in remaining: + remaining.append(val) + for f in existing_fields: - if idx >= len(values): + if not remaining: break - library.update_entry_field(entry.id, f, values[idx]) - applied += 1 - idx += 1 - for j in range(idx, len(values)): - if library.add_field_to_entry(entry.id, field_id=key, value=values[j]): + current = f.value or "" + if current != "": + continue + next_val = remaining.pop(0) + if current != next_val: + library.update_entry_field(entry.id, f, next_val) + applied += 1 + existing_values.append(next_val) + seen_existing.add(next_val) + + for val in remaining: + if val in seen_existing: + continue + if library.add_field_to_entry(entry.id, field_id=key, value=val): applied += 1 + seen_existing.add(val) + existing_values.append(val) return applied @@ -344,6 +407,17 @@ def __init__(self, library: Library, driver: QtDriver) -> None: self.setWindowModality(Qt.WindowModality.ApplicationModal) self.setMinimumSize(720, 640) + self._preview_results: list[EntryFieldUpdate] = [] + self._preview_running = False + self._apply_running = False + self._cancel_preview = False + self._preview_iterator: FunctionIterator | None = None + self._preview_runnable: CustomRunnable | None = None + self._apply_iterator: FunctionIterator | None = None + self._apply_runnable: CustomRunnable | None = None + self._progress_prefix = "" + self._progress_cancel_handler: Callable[[], None] | None = None + root = QVBoxLayout(self) root.setContentsMargins(8, 8, 8, 8) @@ -374,9 +448,11 @@ def __init__(self, library: Library, driver: QtDriver) -> None: pattern_label.setBuddy(self.pattern_edit) self.filename_only_cb = QCheckBox(Translations["paths_to_fields.use_filename_only"]) + self.allow_existing_cb = QCheckBox(Translations["paths_to_fields.allow_existing"]) form_layout.addRow(pattern_label, self.pattern_edit) form_layout.addRow(self.filename_only_cb) + form_layout.addRow(self.allow_existing_cb) # Ensure the form block doesn't vertically stretch on resize form.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) @@ -390,27 +466,59 @@ def __init__(self, library: Library, driver: QtDriver) -> None: # Keep mappings area height fixed to its contents map_container.setSizePolicy(QSizePolicy.Policy.Preferred, QSizePolicy.Policy.Fixed) - add_map_btn = QPushButton(Translations["paths_to_fields.add_mapping"]) - add_map_btn.clicked.connect(self._add_mapping_row) + self.add_map_btn = QPushButton(Translations["paths_to_fields.add_mapping"]) + self.add_map_btn.clicked.connect(self._add_mapping_row) # Preview area - preview_btn = QPushButton(Translations["paths_to_fields.preview"]) - preview_btn.clicked.connect(self._on_preview) + self.preview_btn = QPushButton(Translations["paths_to_fields.preview"]) + self.preview_btn.clicked.connect(self._on_preview) self.preview_area = QPlainTextEdit() self.preview_area.setReadOnly(True) self.preview_area.setFrameShape(QFrame.Shape.StyledPanel) self.preview_area.setPlaceholderText(Translations["paths_to_fields.preview_empty"]) self.preview_area.setMinimumHeight(200) self.preview_area.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Expanding) + self.preview_area.setWordWrapMode(QTextOption.WrapMode.WrapAtWordBoundaryOrAnywhere) + + self.progress_container = QWidget() + self.progress_container.setVisible(False) + self.progress_container.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + progress_layout = QVBoxLayout(self.progress_container) + progress_layout.setContentsMargins(0, 0, 0, 0) + progress_layout.setSpacing(4) + + self.progress_label = QLabel() + self.progress_label.setWordWrap(True) + self.progress_label.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + + progress_bar_row = QHBoxLayout() + progress_bar_row.setContentsMargins(0, 0, 0, 0) + progress_bar_row.setSpacing(6) + + self.progress_bar = QProgressBar() + self.progress_bar.setMinimumWidth(240) + self.progress_bar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + self.progress_bar.setTextVisible(False) + + self.progress_cancel_btn = QPushButton(Translations["generic.cancel"]) + self.progress_cancel_btn.setVisible(False) + self.progress_cancel_btn.setSizePolicy(QSizePolicy.Policy.Fixed, QSizePolicy.Policy.Fixed) + self.progress_cancel_btn.clicked.connect(self._handle_progress_cancel) + + progress_bar_row.addWidget(self.progress_bar) + progress_bar_row.addWidget(self.progress_cancel_btn) + + progress_layout.addWidget(self.progress_label) + progress_layout.addLayout(progress_bar_row) # Apply - apply_btn = QPushButton(Translations["generic.apply_alt"]) # existing key - apply_btn.setMinimumWidth(100) - apply_btn.clicked.connect(self._on_apply) + self.apply_btn = QPushButton(Translations["generic.apply_alt"]) # existing key + self.apply_btn.setMinimumWidth(100) + self.apply_btn.clicked.connect(self._on_apply) # Ensure pressing Enter in editors doesn't trigger any default button # Explicitly disable default behaviors on buttons - for b in (preview_btn, apply_btn): + for b in (self.preview_btn, self.apply_btn): try: b.setAutoDefault(False) b.setDefault(False) @@ -423,10 +531,11 @@ def __init__(self, library: Library, driver: QtDriver) -> None: root.addWidget(form) root.addWidget(map_label) root.addWidget(map_container) - root.addWidget(add_map_btn, alignment=Qt.AlignmentFlag.AlignLeft) - root.addWidget(preview_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.add_map_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.preview_btn, alignment=Qt.AlignmentFlag.AlignLeft) + root.addWidget(self.progress_container) root.addWidget(self.preview_area) - root.addWidget(apply_btn, alignment=Qt.AlignmentFlag.AlignCenter) + root.addWidget(self.apply_btn, alignment=Qt.AlignmentFlag.AlignCenter) # Make only the preview area consume extra vertical space on resize root.setStretchFactor(self.preview_area, 1) @@ -488,27 +597,61 @@ def _collect_rules(self) -> tuple[list[PathFieldRule], dict[str, FieldTypeEnum]] return [rule], f_types def _on_preview(self): + if self._preview_running or self._apply_running: + return r = self._collect_rules() if not r: return rules, _ = r - previews = preview_paths_to_fields(self.library, rules) - if not previews: - self.preview_area.setPlainText(Translations["paths_to_fields.msg.no_matches"]) - return - lines: list[str] = [] - for upd in previews: - lines.append(f"{upd.path}") - for k, v in upd.updates: - lines.append(f" - {k}: {v}") - self.preview_area.setPlainText("\n".join(lines)) + self.preview_area.clear() + self._preview_results = [] + + try: + total = self.library.entry_count() + except Exception: + total = None + + self._cancel_preview = False + self._preview_running = True + self._set_controls_enabled(enabled=False) + + self._start_progress( + label=Translations["paths_to_fields.preview"], + total=total, + cancel_handler=self._request_preview_cancel, + ) + + def generator(): + return iter_preview_paths_to_fields( + self.library, + rules, + only_unset=False, + cancel_callback=lambda: self._cancel_preview, + ) + + iterator = FunctionIterator(generator) + iterator.value.connect(self._handle_preview_progress) + + runnable = CustomRunnable(iterator.run) + runnable.done.connect(self._finalize_preview) + + self._preview_iterator = iterator + self._preview_runnable = runnable + QThreadPool.globalInstance().start(runnable) def _on_apply(self): + if self._preview_running or self._apply_running: + return r = self._collect_rules() if not r: return rules, f_types = r - previews = preview_paths_to_fields(self.library, rules) + allow_existing = self.allow_existing_cb.isChecked() + previews = preview_paths_to_fields( + self.library, + rules, + only_unset=not allow_existing, + ) if not previews: msg_box = QMessageBox() msg_box.setIcon(QMessageBox.Icon.Information) @@ -517,12 +660,201 @@ def _on_apply(self): msg_box.addButton(Translations["generic.close"], QMessageBox.ButtonRole.AcceptRole) msg_box.exec_() return - apply_paths_to_fields( - self.library, - previews, - create_missing_field_types=True, - field_types=f_types, + + total = len(previews) + self._apply_running = True + self._set_controls_enabled(enabled=False) + self._start_progress( + label=Translations["paths_to_fields.progress.label.initial"], + total=total, + cancel_handler=None, ) + + def generator(): + return self._iter_apply_updates(previews, f_types, allow_existing) + + iterator = FunctionIterator(generator) + iterator.value.connect(self._handle_apply_progress) + + runnable = CustomRunnable(iterator.run) + runnable.done.connect(self._finalize_apply) + + self._apply_iterator = iterator + self._apply_runnable = runnable + QThreadPool.globalInstance().start(runnable) + + def _iter_apply_updates( + self, + previews: list[EntryFieldUpdate], + field_types: dict[str, FieldTypeEnum], + allow_existing: bool, + ) -> Iterator[PreviewProgress]: + try: + from tagstudio.core.library.alchemy import library as _libmod # local import + except Exception: + _libmod = None + + class _NoInfoLogger: + def __init__(self, base): + self._base = base + + def info(self, *_, **__): # suppress info noise during bulk apply + return None + + def debug(self, *_, **__): + return None + + def warning(self, *args, **kwargs): + return self._base.warning(*args, **kwargs) + + def error(self, *args, **kwargs): + return self._base.error(*args, **kwargs) + + def exception(self, *args, **kwargs): + return self._base.exception(*args, **kwargs) + + def __getattr__(self, name): + return getattr(self._base, name) + + _saved_logger = None + if _libmod is not None and hasattr(_libmod, "logger"): + _saved_logger = _libmod.logger + _libmod.logger = _NoInfoLogger(_saved_logger) + + total = len(previews) + try: + for index, upd in enumerate(previews, start=1): + apply_paths_to_fields( + self.library, + [upd], + create_missing_field_types=True, + field_types=field_types, + allow_existing=allow_existing, + ) + yield PreviewProgress(index=index, total=total, path=upd.path, update=upd) + finally: + if _saved_logger is not None and _libmod is not None: + _libmod.logger = _saved_logger + + def _append_preview_update(self, upd: EntryFieldUpdate) -> None: + lines = [upd.path] + entry = unwrap(self.library.get_entry_full(upd.entry_id)) + for key, value in upd.updates: + existing_vals = [f.value or "" for f in entry.fields if f.type_key == key] + allow_existing = self.allow_existing_cb.isChecked() + # Flag duplicates before generic already_set so we only warn for actual conflicts + if value in existing_vals and value != "": + marker = Translations["paths_to_fields.preview.markers.duplicate"] + else: + already_set = any(val != "" for val in existing_vals) + marker = ( + Translations["paths_to_fields.preview.markers.already_set"] + if already_set and not allow_existing + else None + ) + prefix = f"⚠ {marker} — " if marker else "" + lines.append(f" - {prefix}{key}: {value}") + self.preview_area.appendPlainText("\n".join(lines)) + self.preview_area.ensureCursorVisible() + + def _handle_preview_progress(self, progress: PreviewProgress) -> None: + self._update_progress(progress) + if progress.update: + self._preview_results.append(progress.update) + self._append_preview_update(progress.update) + + def _handle_apply_progress(self, progress: PreviewProgress) -> None: + self._update_progress(progress) + + def _update_progress(self, progress: PreviewProgress) -> None: + total = progress.total or 0 + if total > 0: + self.progress_bar.setRange(0, total) + self.progress_bar.setValue(min(progress.index, total)) + else: + self.progress_bar.setRange(0, 0) + + lines: list[str] = [] + if self._progress_prefix: + lines.append(self._progress_prefix) + if progress.total: + lines.append(f"{progress.index}/{progress.total}") + else: + lines.append(str(progress.index)) + if progress.path: + lines.append(progress.path) + self.progress_label.setText("\n".join(filter(None, lines))) + + def _start_progress( + self, + *, + label: str, + total: int | None, + cancel_handler: Callable[[], None] | None, + ) -> None: + self._progress_prefix = label + self.progress_label.setText(label) + self.progress_container.setVisible(True) + if total and total > 0: + self.progress_bar.setRange(0, total) + self.progress_bar.setValue(0) + else: + self.progress_bar.setRange(0, 0) + self._set_cancel_handler(cancel_handler) + + def _finish_progress(self) -> None: + self.progress_container.setVisible(False) + self.progress_label.clear() + self.progress_bar.setValue(0) + self._progress_prefix = "" + self._set_cancel_handler(None) + + def _set_cancel_handler(self, handler: Callable[[], None] | None) -> None: + self._progress_cancel_handler = handler + has_handler = handler is not None + self.progress_cancel_btn.setVisible(has_handler) + self.progress_cancel_btn.setEnabled(has_handler) + + def _handle_progress_cancel(self) -> None: + if self._progress_cancel_handler: + self.progress_cancel_btn.setEnabled(False) + self._progress_cancel_handler() + + def _request_preview_cancel(self) -> None: + self._cancel_preview = True + + def _finalize_preview(self) -> None: + cancelled = self._cancel_preview + self._preview_running = False + self._cancel_preview = False + self._preview_iterator = None + self._preview_runnable = None + self._finish_progress() + self._set_controls_enabled(enabled=True) + if not self._preview_results and not cancelled: + self.preview_area.setPlainText(Translations["paths_to_fields.msg.no_matches"]) + + def _finalize_apply(self) -> None: + self._apply_running = False + self._apply_iterator = None + self._apply_runnable = None + self._finish_progress() + self._set_controls_enabled(enabled=True) self.close() - # refresh selection/preview pane like other macros - self.driver.main_window.preview_panel.set_selection(self.driver.selected, update_preview=False) + with suppress(Exception): + self.driver.main_window.preview_panel.set_selection( + self.driver.selected, + update_preview=False, + ) + + def _set_controls_enabled(self, *, enabled: bool) -> None: + self.preview_btn.setEnabled(enabled) + self.apply_btn.setEnabled(enabled) + self.add_map_btn.setEnabled(enabled) + self.pattern_edit.setEnabled(enabled) + self.filename_only_cb.setEnabled(enabled) + self.allow_existing_cb.setEnabled(enabled) + for i in range(self.map_v.count()): + widget = self.map_v.itemAt(i).widget() + if isinstance(widget, _MappingRow): + widget.setEnabled(enabled) diff --git a/src/tagstudio/resources/translations/en.json b/src/tagstudio/resources/translations/en.json index 35c6d665a..d6051f576 100644 --- a/src/tagstudio/resources/translations/en.json +++ b/src/tagstudio/resources/translations/en.json @@ -254,6 +254,11 @@ "paths_to_fields.pattern_label": "File Path Pattern", "paths_to_fields.preview": "Preview", "paths_to_fields.preview_empty": "No Preview Available", + "paths_to_fields.preview.markers.apply": "apply", + "paths_to_fields.preview.markers.already_set": "skipped (already set)", + "paths_to_fields.preview.markers.duplicate": "skipped (duplicate)", + "paths_to_fields.progress.window_title": "Apply Fields", + "paths_to_fields.progress.label.initial": "Applying Field Updates...", "paths_to_fields.mappings_label": "Field Mappings", "paths_to_fields.msg.enter_pattern": "Please enter a regex pattern.", "paths_to_fields.msg.add_mapping": "Please add at least one field mapping.", @@ -262,6 +267,7 @@ "paths_to_fields.template_placeholder": "template e.g. $page or example.com/$id", "paths_to_fields.title": "Create Fields From Paths", "paths_to_fields.use_filename_only": "Use Filename Only", + "paths_to_fields.allow_existing": "Apply even when fields already have values", "preview.ignored": "Ignored", "preview.multiple_selection": "<b>{count}</b> Items Selected", "preview.no_selection": "No Items Selected", diff --git a/tests/macros/test_paths_to_fields.py b/tests/macros/test_paths_to_fields.py index c5033ecb6..7eb0c8d1e 100644 --- a/tests/macros/test_paths_to_fields.py +++ b/tests/macros/test_paths_to_fields.py @@ -11,6 +11,7 @@ from tagstudio.qt.mixed.paths_to_fields import ( PathFieldRule, apply_paths_to_fields, + iter_preview_paths_to_fields, preview_paths_to_fields, ) @@ -114,3 +115,97 @@ def test_paths_to_fields_allows_duplicate_fields(library: Library): e = unwrap(library.get_entry_full(eid)) comment_values = [f.value or "" for f in e.fields if f.type_key == FieldID.COMMENTS.name] assert sorted(comment_values) == ["bar", "foo"] + + +def test_apply_paths_to_fields_allow_existing_appends(library: Library): + folder = unwrap(library.folder) + + entry = Entry(folder=folder, path=Path("existing/NewSeries-extra.jpg"), fields=[]) + [eid] = library.add_entries([entry]) + assert library.add_field_to_entry(eid, field_id=FieldID.SERIES.name, value="Existing Series") + + rule = PathFieldRule( + pattern=r"^existing/(?P<series>[^-]+)-(?P<suffix>[^.]+)\.[^.]+$", + fields=[(FieldID.SERIES.name, "$series")], + ) + + preview = preview_paths_to_fields(library, [rule], only_unset=False) + assert preview, "Expected preview to include updates even when field already set" + + applied_without = apply_paths_to_fields( + library, + preview, + create_missing_field_types=True, + allow_existing=False, + ) + assert applied_without == 0 + entry_state = unwrap(library.get_entry_full(eid)) + values = [f.value or "" for f in entry_state.fields if f.type_key == FieldID.SERIES.name] + assert values == ["Existing Series"] + + preview = preview_paths_to_fields(library, [rule], only_unset=False) + applied_with = apply_paths_to_fields( + library, + preview, + create_missing_field_types=True, + allow_existing=True, + ) + assert applied_with == 1 + entry_state = unwrap(library.get_entry_full(eid)) + values = [f.value or "" for f in entry_state.fields if f.type_key == FieldID.SERIES.name] + assert sorted(values) == ["Existing Series", "NewSeries"] + + +def test_iter_preview_paths_to_fields_reports_progress(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("progress/alpha_01.jpg"), fields=[]), + Entry(folder=folder, path=Path("progress/beta_02.jpg"), fields=[]), + ] + library.add_entries(entries) + + rule = PathFieldRule( + pattern=r"^progress/(?P<name>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields=[ + (FieldID.SERIES.name, "$name"), + ("page_number", "$page"), + ], + ) + + events = list(iter_preview_paths_to_fields(library, [rule], only_unset=False)) + assert events, "Expected progress events to be emitted" + + totals = {evt.total for evt in events if evt.total is not None} + assert totals == {library.entry_count()} + assert any(evt.update is not None for evt in events) + + +def test_iter_preview_paths_to_fields_stop(library: Library): + folder = unwrap(library.folder) + + entries = [ + Entry(folder=folder, path=Path("stop/foo_01.jpg"), fields=[]), + Entry(folder=folder, path=Path("stop/bar_02.jpg"), fields=[]), + ] + library.add_entries(entries) + + rule = PathFieldRule( + pattern=r"^stop/(?P<stem>[^_]+)_(?P<page>\d+)\.[^.]+$", + fields=[(FieldID.SERIES.name, "$stem")], + ) + + seen: list[int] = [] + + def should_cancel() -> bool: + return len(seen) >= 1 + + for evt in iter_preview_paths_to_fields( + library, + [rule], + only_unset=False, + cancel_callback=should_cancel, + ): + seen.append(evt.index) + + assert seen == [1] From 0f5fc573c3eafcaa1379d1f727be2bcca56d08f8 Mon Sep 17 00:00:00 2001 From: Catting <5874051+mm12@users.noreply.github.com> Date: Tue, 25 Nov 2025 18:12:21 -0600 Subject: [PATCH 4/4] various fixed and improvements: * clean up progress bar - add percentage, move progress text into the bar itself * add information about current value to preview. specify if exact duplicate * attempt to throttle UI updates (needs further work) * use nonModal - this may need more checks around it, but lets users easily cross-reference with items in the main window * remove wrapping on progress bar label, since it was causing issues --- src/tagstudio/qt/mixed/paths_to_fields.py | 147 ++++++++++++++++++---- 1 file changed, 125 insertions(+), 22 deletions(-) diff --git a/src/tagstudio/qt/mixed/paths_to_fields.py b/src/tagstudio/qt/mixed/paths_to_fields.py index 6dbc6b746..0c9378339 100644 --- a/src/tagstudio/qt/mixed/paths_to_fields.py +++ b/src/tagstudio/qt/mixed/paths_to_fields.py @@ -1,16 +1,20 @@ # TODO list # UI bugs # - When preview loads, it extends below the apply button, likely because scrollbar isn't calculated -# - Multi-line fields sometimes get cut off when adding/removing mappings so they show up as 1 line. +# - progress item: show a truncated path or find a way to show the full path without breaking the UI +# Funcionality +# - exiting while job is running keeps job running? +# - clean up helpers that throttle UI updates, since they don't seem to work very well. from __future__ import annotations import re +import time from collections.abc import Callable, Iterable, Iterator from contextlib import suppress from dataclasses import dataclass, field from typing import TYPE_CHECKING -from PySide6.QtCore import Qt, QThreadPool +from PySide6.QtCore import Qt, QThreadPool, QTimer from PySide6.QtGui import QTextOption from PySide6.QtWidgets import ( QCheckBox, @@ -404,7 +408,7 @@ def __init__(self, library: Library, driver: QtDriver) -> None: self.library = library self.driver = driver self.setWindowTitle(Translations["paths_to_fields.title"]) # fallback shows [key] - self.setWindowModality(Qt.WindowModality.ApplicationModal) + self.setWindowModality(Qt.WindowModality.NonModal) # Fine to use other windows while processing self.setMinimumSize(720, 640) self._preview_results: list[EntryFieldUpdate] = [] @@ -417,6 +421,17 @@ def __init__(self, library: Library, driver: QtDriver) -> None: self._apply_runnable: CustomRunnable | None = None self._progress_prefix = "" self._progress_cancel_handler: Callable[[], None] | None = None + self._last_progress_update = 0.0 + self._progress_update_interval = 0.12 + self._pending_progress: PreviewProgress | None = None + self._progress_flush_timer = QTimer(self) + self._progress_flush_timer.setSingleShot(True) + self._progress_flush_timer.timeout.connect(self._handle_progress_flush_timeout) + self._preview_update_buffer: list[str] = [] + self._preview_buffer_timer = QTimer(self) + self._preview_buffer_timer.setSingleShot(True) + self._preview_buffer_timer.timeout.connect(self._flush_preview_buffer) + self._preview_buffer_interval_ms = 160 root = QVBoxLayout(self) root.setContentsMargins(8, 8, 8, 8) @@ -488,8 +503,9 @@ def __init__(self, library: Library, driver: QtDriver) -> None: progress_layout.setSpacing(4) self.progress_label = QLabel() - self.progress_label.setWordWrap(True) + self.progress_label.setWordWrap(False) self.progress_label.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) + self.progress_label.setVisible(False) progress_bar_row = QHBoxLayout() progress_bar_row.setContentsMargins(0, 0, 0, 0) @@ -498,7 +514,7 @@ def __init__(self, library: Library, driver: QtDriver) -> None: self.progress_bar = QProgressBar() self.progress_bar.setMinimumWidth(240) self.progress_bar.setSizePolicy(QSizePolicy.Policy.Expanding, QSizePolicy.Policy.Fixed) - self.progress_bar.setTextVisible(False) + self.progress_bar.setTextVisible(True) self.progress_cancel_btn = QPushButton(Translations["generic.cancel"]) self.progress_cancel_btn.setVisible(False) @@ -615,6 +631,9 @@ def _on_preview(self): self._preview_running = True self._set_controls_enabled(enabled=False) + if self._preview_buffer_timer.isActive(): + self._preview_buffer_timer.stop() + self._preview_update_buffer.clear() self._start_progress( label=Translations["paths_to_fields.preview"], total=total, @@ -745,6 +764,7 @@ def _append_preview_update(self, upd: EntryFieldUpdate) -> None: # Flag duplicates before generic already_set so we only warn for actual conflicts if value in existing_vals and value != "": marker = Translations["paths_to_fields.preview.markers.duplicate"] + conflict_vals = [value] else: already_set = any(val != "" for val in existing_vals) marker = ( @@ -752,10 +772,22 @@ def _append_preview_update(self, upd: EntryFieldUpdate) -> None: if already_set and not allow_existing else None ) + conflict_vals = [v for v in existing_vals if v != ""] if marker else [] prefix = f"⚠ {marker} — " if marker else "" - lines.append(f" - {prefix}{key}: {value}") - self.preview_area.appendPlainText("\n".join(lines)) - self.preview_area.ensureCursorVisible() + conflict_note = "" + if marker and conflict_vals: + dedup_conflicts: list[str] = [] + for val in conflict_vals: + if val not in dedup_conflicts: + dedup_conflicts.append(val) + conflicts_str = ", ".join(dedup_conflicts) + if conflicts_str: + conflict_note = f" (current: {conflicts_str})" + lines.append(f" - {prefix}{key}: {value}{conflict_note}") + text = "\n".join(lines) + self._preview_update_buffer.append(text) + if not self._preview_buffer_timer.isActive(): + self._preview_buffer_timer.start(self._preview_buffer_interval_ms) def _handle_preview_progress(self, progress: PreviewProgress) -> None: self._update_progress(progress) @@ -766,24 +798,64 @@ def _handle_preview_progress(self, progress: PreviewProgress) -> None: def _handle_apply_progress(self, progress: PreviewProgress) -> None: self._update_progress(progress) - def _update_progress(self, progress: PreviewProgress) -> None: + def _flush_pending_progress(self) -> None: + if self._pending_progress is not None: + if self._progress_flush_timer.isActive(): + self._progress_flush_timer.stop() + self._update_progress(self._pending_progress, force=True) + self._pending_progress = None + + def _flush_preview_buffer(self) -> None: + if not self._preview_update_buffer: + return + combined = "\n".join(self._preview_update_buffer) + self._preview_update_buffer.clear() + self.preview_area.appendPlainText(combined) + self.preview_area.ensureCursorVisible() + + def _update_progress(self, progress: PreviewProgress, *, force: bool = False) -> None: + self._pending_progress = progress + now = time.perf_counter() + if not force and (now - self._last_progress_update) < self._progress_update_interval: + if not self._progress_flush_timer.isActive(): + self._progress_flush_timer.start(int(self._progress_update_interval * 1000)) + return + self._last_progress_update = now + self._pending_progress = None + if self._progress_flush_timer.isActive(): + self._progress_flush_timer.stop() + total = progress.total or 0 + current = progress.index + percent: int | None = None if total > 0: + current = min(progress.index, total) self.progress_bar.setRange(0, total) - self.progress_bar.setValue(min(progress.index, total)) + self.progress_bar.setValue(current) + try: + percent = round((current / total) * 100) + except ZeroDivisionError: + percent = 0 + percent = max(0, min(100, percent)) + prefix = self._progress_prefix.strip() + bar_text = f"{current}/{total} ({percent}%)" + if prefix: + bar_text = f"{prefix}: {bar_text}" + self.progress_bar.setFormat(bar_text) else: self.progress_bar.setRange(0, 0) - - lines: list[str] = [] - if self._progress_prefix: - lines.append(self._progress_prefix) - if progress.total: - lines.append(f"{progress.index}/{progress.total}") - else: - lines.append(str(progress.index)) - if progress.path: - lines.append(progress.path) - self.progress_label.setText("\n".join(filter(None, lines))) + prefix = self._progress_prefix.strip() + bar_text = f"{current}" + if prefix: + bar_text = f"{prefix}: {bar_text}" + self.progress_bar.setFormat(bar_text) + + display_text = progress.path or "" + if not display_text and force: + display_text = self._progress_prefix.strip() + self.progress_label.setVisible(bool(display_text)) + self.progress_label.setText(display_text) + self.progress_label.setToolTip(display_text) def _start_progress( self, @@ -793,21 +865,42 @@ def _start_progress( cancel_handler: Callable[[], None] | None, ) -> None: self._progress_prefix = label - self.progress_label.setText(label) self.progress_container.setVisible(True) if total and total > 0: self.progress_bar.setRange(0, total) self.progress_bar.setValue(0) else: self.progress_bar.setRange(0, 0) + prefix = label.strip() + self.progress_bar.setFormat(prefix) + self._last_progress_update = 0.0 + self._pending_progress = None + if self._progress_flush_timer.isActive(): + self._progress_flush_timer.stop() + if self._preview_buffer_timer.isActive(): + self._preview_buffer_timer.stop() + self._preview_update_buffer.clear() + self.progress_label.clear() + self.progress_label.setToolTip("") + self.progress_label.setVisible(False) self._set_cancel_handler(cancel_handler) def _finish_progress(self) -> None: + self._flush_preview_buffer() self.progress_container.setVisible(False) self.progress_label.clear() + self.progress_label.setToolTip("") + self.progress_label.setVisible(False) self.progress_bar.setValue(0) + self.progress_bar.setFormat("") self._progress_prefix = "" self._set_cancel_handler(None) + self._last_progress_update = 0.0 + self._pending_progress = None + if self._progress_flush_timer.isActive(): + self._progress_flush_timer.stop() + if self._preview_buffer_timer.isActive(): + self._preview_buffer_timer.stop() def _set_cancel_handler(self, handler: Callable[[], None] | None) -> None: self._progress_cancel_handler = handler @@ -829,6 +922,8 @@ def _finalize_preview(self) -> None: self._cancel_preview = False self._preview_iterator = None self._preview_runnable = None + self._flush_pending_progress() + self._flush_preview_buffer() self._finish_progress() self._set_controls_enabled(enabled=True) if not self._preview_results and not cancelled: @@ -838,6 +933,8 @@ def _finalize_apply(self) -> None: self._apply_running = False self._apply_iterator = None self._apply_runnable = None + self._flush_pending_progress() + self._flush_preview_buffer() self._finish_progress() self._set_controls_enabled(enabled=True) self.close() @@ -858,3 +955,9 @@ def _set_controls_enabled(self, *, enabled: bool) -> None: widget = self.map_v.itemAt(i).widget() if isinstance(widget, _MappingRow): widget.setEnabled(enabled) + + def _handle_progress_flush_timeout(self) -> None: + if self._pending_progress is not None: + self._update_progress(self._pending_progress, force=True) + if self._preview_update_buffer and not self._preview_buffer_timer.isActive(): + self._flush_preview_buffer()