diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6cc3b31..2a1403f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,12 +12,10 @@ on: jobs: django_42: - runs-on: ubuntu-latest strategy: matrix: python-version: ["3.8", "3.11", "3.12", "3.13"] - steps: - uses: actions/checkout@v2 - uses: kamiazya/setup-graphviz@v1 @@ -50,13 +48,11 @@ jobs: parallel: true flag-name: Unit Test - django_5: - + django_52: runs-on: ubuntu-latest strategy: matrix: python-version: ["3.10", "3.11", "3.12", "3.13"] - steps: - uses: actions/checkout@v2 - uses: kamiazya/setup-graphviz@v1 @@ -88,9 +84,8 @@ jobs: with: parallel: true flag-name: Unit Test - - mysql: + mysql: runs-on: ubuntu-latest strategy: matrix: @@ -139,7 +134,6 @@ jobs: flag-name: Unit Test postgres: - runs-on: ubuntu-latest strategy: matrix: @@ -187,7 +181,6 @@ jobs: flag-name: Unit Test postgres-psycopg3: - runs-on: ubuntu-latest strategy: matrix: @@ -235,7 +228,6 @@ jobs: flag-name: Unit Test mariadb: - runs-on: ubuntu-latest strategy: matrix: @@ -283,8 +275,50 @@ jobs: parallel: true flag-name: Unit Test + backend: + runs-on: ubuntu-latest + strategy: + matrix: + updatemode: ['FLAT', 'SAVE', 'BULK'] + python-version: ["3.13"] + env: + UPDATEMODE: ${{ matrix.updatemode }} + steps: + - uses: actions/checkout@v2 + - uses: kamiazya/setup-graphviz@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install "Django~=5.2" + pip install -r example/requirements-ci.txt + - name: Setup DB + run: | + ./example/manage.py makemigrations + ./example/manage.py migrate + - name: Run tests + run: | + echo $UPDATEMODE + coverage run --parallel-mode --branch --source='computedfields' ./example/manage.py test exampleapp + coverage run --parallel-mode --branch --source='computedfields' ./example/manage.py test test_full + coverage combine + coverage report + - name: Build docs + run: | + cd docs && make html + cd ../ + - name: Coveralls + uses: AndreMiras/coveralls-python-action@develop + with: + parallel: true + flag-name: Unit Test + + coveralls_finish: - needs: [django_42, django_5, mysql, postgres, postgres-psycopg3, mariadb] + needs: [django_42, django_52, mysql, postgres, postgres-psycopg3, mariadb, backend] runs-on: ubuntu-latest steps: - name: Coveralls Finished diff --git a/computedfields/backends.py b/computedfields/backends.py new file mode 100644 index 0000000..1e4e8fc --- /dev/null +++ b/computedfields/backends.py @@ -0,0 +1,29 @@ +from fast_update.fast import fast_update +from fast_update.update import flat_update +from django.db.models import QuerySet +from typing import Sequence, Any, Iterable + + +def fast(queryset: QuerySet, objs: Sequence[Any], fields: Iterable[str]) -> int: + return fast_update(queryset, objs, tuple(fields), None, True) + +def flat(queryset: QuerySet, objs: Sequence[Any], fields: Iterable[str]) -> int: + return flat_update(queryset, objs, tuple(fields), True) + +def save(queryset: QuerySet, objs: Sequence[Any], fields: Iterable[str]) -> int: + from .resolver import NotComputed + with NotComputed(): + for inst in objs: + inst.save(update_fields=fields) + return len(objs) + +def bulk(queryset: QuerySet, objs: Sequence[Any], fields: Iterable[str]) -> int: + return queryset.model._base_manager.bulk_update(objs, fields) + + +UPDATE_IMPLEMENTATIONS = { + 'FAST': fast, + 'FLAT': flat, + 'SAVE': save, + 'BULK': bulk +} diff --git a/computedfields/helpers.py b/computedfields/helpers.py index c46fb90..bb375aa 100644 --- a/computedfields/helpers.py +++ b/computedfields/helpers.py @@ -1,6 +1,7 @@ from itertools import tee, zip_longest from django.db.models import Model, QuerySet -from typing import Any, Iterator, List, Sequence, Type, TypeVar, Tuple, Union, Generator, Iterable +from typing import (Any, Iterator, List, Sequence, Type, TypeVar, Tuple, Union, + Generator, Iterable, Optional, FrozenSet) T = TypeVar('T', covariant=True) @@ -88,3 +89,9 @@ def proxy_to_base_model(proxymodel: Type[Model]) -> Union[Type[Model], None]: def are_same(*args) -> bool: return len(set(args)) == 1 + + +def frozenset_none(data: Optional[Iterable[Any]]) -> Optional[FrozenSet[Any]]: + if data is None: + return + return frozenset(data) diff --git a/computedfields/management/commands/checkdata.py b/computedfields/management/commands/checkdata.py index db63c06..b8b43ac 100644 --- a/computedfields/management/commands/checkdata.py +++ b/computedfields/management/commands/checkdata.py @@ -80,7 +80,7 @@ def action_check(self, models, progress, size, json_out): for model in models: qs = model._base_manager.all() amount = qs.count() - fields = set(active_resolver.computed_models[model].keys()) + fields = frozenset(active_resolver.computed_models[model].keys()) qsize = active_resolver.get_querysize(model, fields, size) self.eprint(f'- {self.style.MIGRATE_LABEL(modelname(model))}') self.eprint(f' Fields: {", ".join(fields)}') diff --git a/computedfields/management/commands/updatedata.py b/computedfields/management/commands/updatedata.py index 1baf860..91614e8 100644 --- a/computedfields/management/commands/updatedata.py +++ b/computedfields/management/commands/updatedata.py @@ -10,6 +10,7 @@ from computedfields.models import active_resolver from computedfields.helpers import modelname, slice_iterator from computedfields.settings import settings +from computedfields.backends import UPDATE_IMPLEMENTATIONS from ._helpers import retrieve_computed_models, HAS_TQDM, tqdm from typing import Type, cast @@ -48,8 +49,8 @@ def add_arguments(self, parser): '-m', '--mode', default='default', type=str, - choices=('loop', 'bulk', 'fast'), - help='Set explicit update mode, default: bulk/fast from settings.py.' + choices=('FAST', 'FLAT', 'SAVE', 'BULK', 'LOOP'), + help='Set explicit update mode, default is taken from settings.py.' ) parser.add_argument( '-q', '--querysize', @@ -85,7 +86,7 @@ def action_fileinput(self, file, size, progress): model_name, desync = data.get('model'), data.get('desync') model: Type[Model] = cast(Type[Model], apps.get_model(model_name)) amount = len(desync) - fields = set(active_resolver.computed_models[model].keys()) + fields = frozenset(active_resolver.computed_models[model].keys()) self.stdout.write(f'- {self.style.MIGRATE_LABEL(modelname(model))}') self.stdout.write(f' Fields: {", ".join(fields)}') self.stdout.write(f' Desync Records: {amount}') @@ -112,7 +113,7 @@ def action_default(self, models, size, show_progress, mode=''): Runs either in fast or bulk mode, whatever was set in settings. """ if not mode: - mode = 'fast' if settings.COMPUTEDFIELDS_FASTUPDATE else 'bulk' + mode = settings.COMPUTEDFIELDS_UPDATEMODE self.stdout.write(f'Update mode: settings.py --> {mode}') self.stdout.write(f'Default querysize: {size}') @@ -120,24 +121,12 @@ def action_default(self, models, size, show_progress, mode=''): for model in models: qs = model._base_manager.all() amount = qs.count() - fields = set(active_resolver.computed_models[model].keys()) + fields = frozenset(active_resolver.computed_models[model].keys()) self.stdout.write(f'- {self.style.MIGRATE_LABEL(modelname(model))}') self.stdout.write(f' Fields: {", ".join(fields)}') self.stdout.write(f' Records: {amount}') self.stdout.write(f' Querysize: {active_resolver.get_querysize(model, fields, size)}') - # TODO: dummy test code to get some idea about long taking tasks in the update tree - # this is linked to bad perf from slicing and distinct() calls in bulk_updater (#101) - ##qs = qs.filter(pk__in=range(1, 1001)) - #counted = count_dependent(qs) - #explained = explain_dependent(qs, query_pks=False) - #self.stdout.write('records to check:', counted) - #for ex in explained: - # self.stdout.write(ex) - #timer(lambda: explain_dependent(qs), 1) - #timer(lambda: count_dependent(qs), 1) - #return - if not amount: continue if show_progress: @@ -157,20 +146,33 @@ def action_default(self, models, size, show_progress, mode=''): else: active_resolver.update_dependent(qs, querysize=size) - def action_bulk(self, models, size, show_progress): - active_resolver.use_fastupdate = False - self.stdout.write('Update mode: bulk') - self.action_default(models, size, show_progress, 'bulk') - - def action_fast(self, models, size, show_progress): - active_resolver.use_fastupdate = True - active_resolver._batchsize = settings.COMPUTEDFIELDS_BATCHSIZE_FAST + def action_FAST(self, models, size, show_progress): + active_resolver._update_mode = 'FAST' + active_resolver._update = UPDATE_IMPLEMENTATIONS['FAST'] self.stdout.write('Update mode: fast') - self.action_default(models, size, show_progress, 'fast') + self.action_default(models, size, show_progress, 'FAST') + + def action_FLAT(self, models, size, show_progress): + active_resolver._update_mode = 'FLAT' + active_resolver._update = UPDATE_IMPLEMENTATIONS['FLAT'] + self.stdout.write('Update mode: flat') + self.action_default(models, size, show_progress, 'FLAT') + + def action_SAVE(self, models, size, show_progress): + active_resolver._update_mode = 'SAVE' + active_resolver._update = UPDATE_IMPLEMENTATIONS['SAVE'] + self.stdout.write('Update mode: save') + self.action_default(models, size, show_progress, 'SAVE') + + def action_BULK(self, models, size, show_progress): + active_resolver._update_mode = 'BULK' + active_resolver._update = UPDATE_IMPLEMENTATIONS['BULK'] + self.stdout.write('Update mode: bulk') + self.action_default(models, size, show_progress, 'BULK') @transaction.atomic - def action_loop(self, models, size, show_progress): - self.stdout.write('Update mode: loop') + def action_LOOP(self, models, size, show_progress): + self.stdout.write('Update mode: LOOP') self.stdout.write(f'Global querysize: {size}') self.stdout.write('Models:') if size != settings.COMPUTEDFIELDS_QUERYSIZE: @@ -181,7 +183,7 @@ def action_loop(self, models, size, show_progress): for model in models: qs = model._base_manager.all() amount = qs.count() - fields = list(active_resolver.computed_models[model].keys()) + fields = frozenset(active_resolver.computed_models[model].keys()) qsize = active_resolver.get_querysize(model, fields, size) self.stdout.write(f'- {self.style.MIGRATE_LABEL(modelname(model))}') self.stdout.write(f' Fields: {", ".join(fields)}') @@ -204,33 +206,3 @@ def action_loop(self, models, size, show_progress): else: for obj in slice_iterator(qs, qsize): obj.save() - - -# get some explaining on update_dependent -#def count_dependent(queryset, fields=None): -# #counted = queryset.count() -# counted = len(set(queryset.values_list('pk', flat=True).iterator())) -# if counted: -# updates = active_resolver._querysets_for_update(queryset.model, queryset, fields).values() -# for qs, f in updates: -# counted += count_dependent(qs, f) -# return counted -# -#def explain_dependent(queryset, fields=None, level=0, query_pks=False): -# s = time() -# #counted = queryset.count() -# counted = len(set(queryset.values_list('pk', flat=True).iterator())) -# d = time() - s -# res = [(level, queryset.model, fields, counted, d, queryset.distinct().values_list('pk', flat=True) if query_pks else [])] -# if counted: -# updates = active_resolver._querysets_for_update(queryset.model, queryset, fields).values() -# for qs, f in updates: -# res += explain_dependent(qs, f, level+1, query_pks) -# return res -# -# -#def timer(f, n): -# start = time() -# for _ in range(n): -# f() -# print(time()-start) diff --git a/computedfields/resolver.py b/computedfields/resolver.py index 221b543..8e50f64 100644 --- a/computedfields/resolver.py +++ b/computedfields/resolver.py @@ -11,15 +11,15 @@ from .settings import settings from .graph import ComputedModelsGraph, ComputedFieldsException, Graph, ModelGraph, IM2mMap -from .helpers import proxy_to_base_model, slice_iterator, subquery_pk, are_same +from .helpers import proxy_to_base_model, slice_iterator, subquery_pk, are_same, frozenset_none from . import __version__ from .signals import resolver_start, resolver_exit, resolver_update -from fast_update.fast import fast_update +from .backends import UPDATE_IMPLEMENTATIONS # typing imports from typing import (Any, Callable, Dict, Generator, Iterable, List, Optional, Sequence, Set, - Tuple, Type, Union, cast, overload) + Tuple, Type, Union, cast, overload, FrozenSet) from django.db.models import Field, Model from .graph import (IComputedField, IDepends, IFkMap, ILocalMroMap, ILookupMap, _ST, _GT, F, IRecorded, IRecordedStrict, IModelUpdate, IModelUpdateCache) @@ -82,17 +82,27 @@ def __init__(self): self._local_mro: ILocalMroMap = {} self._m2m: IM2mMap = {} self._proxymodels: Dict[Type[Model], Type[Model]] = {} - self.use_fastupdate: bool = settings.COMPUTEDFIELDS_FASTUPDATE - self._batchsize: int = (settings.COMPUTEDFIELDS_BATCHSIZE_FAST - if self.use_fastupdate else settings.COMPUTEDFIELDS_BATCHSIZE_BULK) + self._batchsize: int = settings.COMPUTEDFIELDS_BATCHSIZE + self._update_mode: str = settings.COMPUTEDFIELDS_UPDATEMODE + try: + self._update = UPDATE_IMPLEMENTATIONS[self._update_mode] + except KeyError: + raise ResolverException( + f'\nCOMPUTEDFIELDS_UPDATEMODE must be one of ' + f'{list(UPDATE_IMPLEMENTATIONS.keys())}' + ) # some internal states self._sealed: bool = False # initial boot phase self._initialized: bool = False # initialized (computed_models populated)? self._map_loaded: bool = False # final stage with fully loaded maps - # model update cache - self._updates_cache: IModelUpdateCache = defaultdict(dict) + # runtime caches + self._cached_updates: IModelUpdateCache = defaultdict(dict) + self._cached_mro = defaultdict(dict) + self._cached_select_related = defaultdict(dict) + self._cached_prefetch_related = defaultdict(dict) + self._cached_querysize = defaultdict(lambda: defaultdict(dict)) def add_model(self, sender: Type[Model], **kwargs) -> None: """ @@ -236,7 +246,17 @@ def load_maps(self, _force_recreation: bool = False) -> None: self._m2m = self._graph._m2m self._patch_proxy_models() self._map_loaded = True - self._updates_cache = defaultdict(dict) + self._clear_runtime_caches() + + def _clear_runtime_caches(self): + """ + Clear all runtime caches. + """ + self._cached_updates.clear() + self._cached_mro.clear() + self._cached_select_related.clear() + self._cached_prefetch_related.clear() + self._cached_querysize.clear() def _patch_proxy_models(self) -> None: """ @@ -258,7 +278,7 @@ def _patch_proxy_models(self) -> None: def get_local_mro( self, model: Type[Model], - update_fields: Optional[Iterable[str]] = None + update_fields: Optional[FrozenSet[str]] = None ) -> List[str]: """ Return `MRO` for local computed field methods for a given set of `update_fields`. @@ -267,39 +287,44 @@ def get_local_mro( Returns computed fields as self dependent to simplify local field dependency calculation. """ - # TODO: investigate - memoization of update_fields result? (runs ~4 times faster) + try: + return self._cached_mro[model][update_fields] + except KeyError: + pass entry = self._local_mro.get(model) if not entry: + self._cached_mro[model][update_fields] = [] return [] if update_fields is None: + self._cached_mro[model][update_fields] = entry['base'] return entry['base'] - update_fields = frozenset(update_fields) base = entry['base'] fields = entry['fields'] mro = 0 for field in update_fields: mro |= fields.get(field, 0) - return [name for pos, name in enumerate(base) if mro & (1 << pos)] + result = [name for pos, name in enumerate(base) if mro & (1 << pos)] + self._cached_mro[model][update_fields] = result + return result def get_model_updates( self, model: Type[Model], - update_fields: Optional[Iterable[str]] = None + update_fields: Optional[FrozenSet[str]] = None ) -> IModelUpdate: """ For a given model and updated fields this method returns a dictionary with dependent models (keys) and a tuple with dependent fields and the queryset accessor string (value). """ - modeldata = self._map.get(model) - if not modeldata: - return {} - if not update_fields is None: - update_fields = frozenset(update_fields) try: - return self._updates_cache[model][update_fields] + return self._cached_updates[model][update_fields] except KeyError: pass + modeldata = self._map.get(model) + if not modeldata: + self._cached_updates[model][update_fields] = {} + return {} if not update_fields: updates: Set[str] = set(modeldata.keys()) else: @@ -316,7 +341,7 @@ def get_model_updates( m_fields, m_paths = model_updates[m] m_fields.update(fields) m_paths.update(paths) - self._updates_cache[model][update_fields] = model_updates + self._cached_updates[model][update_fields] = model_updates return model_updates def _querysets_for_update( @@ -331,7 +356,7 @@ def _querysets_for_update( queryset containing all dependent objects. """ final: Dict[Type[Model], List[Any]] = {} - model_updates = self.get_model_updates(model, update_fields) + model_updates = self.get_model_updates(model, frozenset_none(update_fields)) if not model_updates: return final @@ -500,9 +525,11 @@ def update_dependent( if update_local and self.has_computedfields(_model): # We skip a transaction here in the same sense, # as local cf updates are not guarded either. - queryset = instance if isinstance(instance, QuerySet) \ - else _model._base_manager.filter(pk__in=[instance.pk]) - self.bulk_updater(queryset, _update_fields, local_only=True, querysize=querysize) + # FIXME: signals are broken here... + if isinstance(instance, QuerySet): + self.bulk_updater(instance, _update_fields, local_only=True, querysize=querysize) + else: + self.single_updater(_model, instance, _update_fields) updates = self._querysets_for_update(_model, instance, _update_fields).values() if updates: @@ -525,6 +552,27 @@ def update_dependent( if not _is_recursive: resolver_exit.send(sender=self) + def single_updater( + self, + model, + instance, + update_fields + ): + # TODO: needs a couple of tests, proper typing and doc + cf_mro = self.get_local_mro(model, frozenset_none(update_fields)) + if update_fields: + update_fields.update(cf_mro) + changed = [] + for fieldname in cf_mro: + old_value = getattr(instance, fieldname) + new_value = self._compute(instance, model, fieldname) + if new_value != old_value: + changed.append(fieldname) + setattr(instance, fieldname, new_value) + if changed: + self._update(model.objects.all(), [instance], changed) + resolver_update.send(sender=self, model=model, fields=changed, pks=[instance.pk]) + def bulk_updater( self, queryset: QuerySet, @@ -566,8 +614,8 @@ def bulk_updater( queryset = model._base_manager.filter(pk__in=subquery_pk(queryset, queryset.db)) # correct update_fields by local mro - mro: List[str] = self.get_local_mro(model, update_fields) - fields = set(mro) + mro: List[str] = self.get_local_mro(model, frozenset_none(update_fields)) + fields = frozenset(mro) if update_fields: update_fields.update(fields) @@ -585,7 +633,7 @@ def bulk_updater( pks = [] if fields: q_size = self.get_querysize(model, fields, querysize) - change: List[Model] = [] + changed_objs: List[Model] = [] for elem in slice_iterator(queryset, q_size): # note on the loop: while it is technically not needed to batch things here, # we still prebatch to not cause memory issues for very big querysets @@ -596,13 +644,13 @@ def bulk_updater( has_changed = True setattr(elem, comp_field, new_value) if has_changed: - change.append(elem) + changed_objs.append(elem) pks.append(elem.pk) - if len(change) >= self._batchsize: - self._update(model._base_manager.all(), change, fields) - change = [] - if change: - self._update(model._base_manager.all(), change, fields) + if len(changed_objs) >= self._batchsize: + self._update(model._base_manager.all(), changed_objs, fields) + changed_objs = [] + if changed_objs: + self._update(model._base_manager.all(), changed_objs, fields) if pks: resolver_update.send(sender=self, model=model, fields=fields, pks=pks) @@ -619,12 +667,6 @@ def bulk_updater( _is_recursive=True ) return set(pks) if return_pks else None - - def _update(self, queryset: QuerySet, change: Sequence[Any], fields: Iterable[str]) -> Union[int, None]: - # we can skip batch_size here, as it already was batched in bulk_updater - if self.use_fastupdate: - return fast_update(queryset, change, fields, None) - return queryset.model._base_manager.bulk_update(change, fields) def _compute(self, instance: Model, model: Type[Model], fieldname: str) -> Any: """ @@ -679,49 +721,65 @@ def compute(self, instance: Model, fieldname: str) -> Any: stack.append((field, getattr(instance, field))) setattr(instance, field, self._compute(instance, model, field)) - # TODO: the following 3 lookups are very expensive at runtime adding ~2s for 1M calls - # --> all need pregenerated lookup maps - # Note: the same goes for get_local_mro and _queryset_for_update... def get_select_related( self, model: Type[Model], - fields: Optional[Iterable[str]] = None + fields: Optional[FrozenSet[str]] = None ) -> Set[str]: """ Get defined select_related rules for `fields` (all if none given). """ - if fields is None: - fields = self._computed_models[model].keys() + try: + return self._cached_select_related[model][fields] + except KeyError: + pass select: Set[str] = set() - for field in fields: + ff = fields + if ff is None: + ff = frozenset(self._computed_models[model].keys()) + for field in ff: select.update(self._computed_models[model][field]._computed['select_related']) + self._cached_select_related[model][fields] = select return select def get_prefetch_related( self, model: Type[Model], - fields: Optional[Iterable[str]] = None + fields: Optional[FrozenSet[str]] = None ) -> List: """ Get defined prefetch_related rules for `fields` (all if none given). """ - if fields is None: - fields = self._computed_models[model].keys() + try: + return self._cached_prefetch_related[model][fields] + except KeyError: + pass prefetch: List[Any] = [] - for field in fields: + ff = fields + if ff is None: + ff = frozenset(self._computed_models[model].keys()) + for field in ff: prefetch.extend(self._computed_models[model][field]._computed['prefetch_related']) + self._cached_prefetch_related[model][fields] = prefetch return prefetch def get_querysize( self, model: Type[Model], - fields: Optional[Iterable[str]] = None, + fields: Optional[FrozenSet[str]] = None, override: Optional[int] = None ) -> int: + try: + return self._cached_querysize[model][fields][override] + except KeyError: + pass + ff = fields + if ff is None: + ff = frozenset(self._computed_models[model].keys()) base = settings.COMPUTEDFIELDS_QUERYSIZE if override is None else override - if fields is None: - fields = self._computed_models[model].keys() - return min(self._computed_models[model][f]._computed['querysize'] or base for f in fields) + result = min(self._computed_models[model][f]._computed['querysize'] or base for f in ff) + self._cached_querysize[model][fields][override] = result + return result def get_contributing_fks(self) -> IFkMap: """ @@ -994,7 +1052,7 @@ def update_computedfields( model = type(instance) if not self.has_computedfields(model): return update_fields - cf_mro = self.get_local_mro(model, update_fields) + cf_mro = self.get_local_mro(model, frozenset_none(update_fields)) if update_fields: update_fields = set(update_fields) update_fields.update(set(cf_mro)) @@ -1150,7 +1208,7 @@ def _resync(self): # FIXME: untangle the side effect update of fields in update_dependent <-- bulk_updater fields = local_data['fields'] if fields and active_resolver.has_computedfields(model): - fields = set(active_resolver.get_local_mro(model, local_data['fields'])) + fields = set(active_resolver.get_local_mro(model, frozenset(fields))) mdata = active_resolver._querysets_for_update( model, diff --git a/computedfields/settings.py b/computedfields/settings.py index cca1ee7..1e81595 100644 --- a/computedfields/settings.py +++ b/computedfields/settings.py @@ -9,14 +9,11 @@ # whether to allow intermodel field recursions 'COMPUTEDFIELDS_ALLOW_RECURSION': False, - # batchsize for bulk_update - 'COMPUTEDFIELDS_BATCHSIZE_BULK': 100, + # update mode to use + 'COMPUTEDFIELDS_UPDATEMODE': 'FAST', - # batchsize for fast_update - 'COMPUTEDFIELDS_BATCHSIZE_FAST': 10000, - - # whether to use fast_update - 'COMPUTEDFIELDS_FASTUPDATE': False, + # batchsize for update + 'COMPUTEDFIELDS_BATCHSIZE': 5000, # batchsize of select queries done by resolver 'COMPUTEDFIELDS_QUERYSIZE': 10000 diff --git a/docs/examples.rst b/docs/examples.rst index 6354731..143b777 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -657,8 +657,6 @@ them unchanged and not update anything). The update speed is still quite high, which is possible due to using the `fast` update mode. With `bulk` it already drops to 4600 rec/s (3:30 min), with `loop` we are at 240 rec/s (1h 10 min). -Therefore it might be a good idea to activate ``COMPUTEDFIELDS_FASTUPDATE`` in `settings.py` for -update intensive projects. The example already contains another optimization discussed below - a `select_related` entry for `Baz.foo_bar_baz`. Without it, the record throughput drops to 1500 - 2000 rec/s for `fast` or `bulk`. @@ -749,10 +747,6 @@ Of course this does not come for free - multiple n:1 relations put into `select_ the temporary JOIN table rather quick, possibly leading to memory / performance issues on the DBMS. This is also the reason, why it is not enabled by default. -.. TIP:: - - The resolver batches computed field update queries itself with `bulk_update` and a default batch size - of 100. This can be further tweaked project-wide in `settings.py` with ``COMPUTEDFIELDS_BATCHSIZE``. Using `prefetch_related` diff --git a/docs/manual.rst b/docs/manual.rst index f89b258..b9e4909 100644 --- a/docs/manual.rst +++ b/docs/manual.rst @@ -5,6 +5,12 @@ User Guide model methods. +.. ATTENTION:: + + With version 0.4.0 the update mode has changed to use `fast_update` by default. + If you want the old behavior back, see ``COMPUTEDFIELDS_UPDATEMODE`` below. + + Installation ------------ @@ -16,60 +22,57 @@ Install the package with pip: and add ``computedfields`` to your ``INSTALLED_APPS``. -To render the update dependency graph during development, also install :mod:`graphviz`: - -.. code:: bash - - $ pip install graphviz - Settings -------- The module respects optional settings in `settings.py`: -- ``COMPUTEDFIELDS_ADMIN`` +- ``COMPUTEDFIELDS_UPDATEMODE`` + The update mode determines, how the auto resolver writes update data to the database. + It understands one of the following string values: + + - 'FAST' (default) + Update mode using UPDATE ... FROM VALUES pattern from the package :mod:`django-fast-update`. + This mode is typically magnitudes faster than all other modes. + - 'FLAT' + Second fastest update mode using looping `update` calls. Use this mode if you experience + issues with 'FAST'. + - 'SAVE' + Update mode using looped `save` calls on model instances. Use this mode if you have to rely + on signals of computed model instances (not recommended due to high performance penalty). + - 'BULK' + This was the old default mode using Djano's `bulk_update`, but got replaced due to + serious performance issues. Use this if you want to stick with the old behavior. + Most likely you also have to lower the batch size to 100 - 1000. + +- ``COMPUTEDFIELDS_BATCHSIZE`` (default 5000) + Set the batch size used for computed field updates by the auto resolver. The setting depends on + the selected update mode above. For 'FAST' values 1000 - 10k are reasonable, with 'BULK' + you might have to lower the value below 1000 to not stress the database planners too much. + Default value is 5000. + +- ``COMPUTEDFIELDS_QUERYSIZE`` (default 10000) + Limits the query size used by the resolver to slices of the given value. This setting is mainly + to avoid excessive memory usage from big querysets, where a direct evaluation would try to cache + everything into RAM. The global setting acts as a "damper" on all reading querysets invoked by + the resolver. + + The querysize can be further adjusted for individual computed fields as optional argument `querysize` + on the ``@computed`` decorator. This is especially useful, if a field has overly complicated + dependencies pulling much more into memory than other fields. Also see :ref:`memory-issues` in examples. + +- ``COMPUTEDFIELDS_ADMIN`` (default False) Set this to ``True`` to get a listing of ``ComputedFieldsModel`` models with their field dependencies in admin. Useful during development. -- ``COMPUTEDFIELDS_ALLOW_RECURSION`` +- ``COMPUTEDFIELDS_ALLOW_RECURSION`` (default False) Normally cycling updates to the same model field indicate an error in database design. Therefore the dependency resolver raises a ``CycleNodeException`` if a cycle was encountered. For more complicated setups (like tree structures) you can disable the recursion check. This comes with the drawback, that the underlying graph cannot linearize and optimize the update paths anymore. -- ``COMPUTEDFIELDS_BATCHSIZE_BULK`` and ``COMPUTEDFIELDS_BATCHSIZE_FAST`` - Set the batch size used for computed field updates by the auto resolver. - Internally the resolver updates computed fields either by `bulk_update` or `fast_update`, - which might penalize update performance for very big updates due high memory usage or - expensive SQL evaluation, if done in a single update statement. Here batch size will split - the update into smaller batches of the given size. For `bulk_update` reasonable batch sizes - are typically between 100 to 1000 (going much higher will degrade performance a lot with - `bulk_update`), for `fast_update` higher values in 10k to 100k are still reasonable, - if RAM usage is no concern. If not explicitly set in `settings.py` the default value will be - set to 100 for `bulk_update` and 10k for `fast_update`. - The batch size might be further restricted by certain database adapters. - -- ``COMPUTEDFIELDS_FASTUPDATE`` (Beta) - Set this to ``True`` to use `fast_update` from :mod:`django-fast-update` instead of - `bulk_update`. This is recommended if you face serious update pressure from computed fields, - and will speed up writing to the database by multitudes. While :mod:`django-computedfields` - depends on the package by default (gets installed automatically), it does not enable it yet. - This is likely to change once :mod:`django-fast-update` has seen more in-the-wild testing and fixes. - Note that `fast_update` relies on recent database versions (see `package description - `_). - -- ``COMPUTEDFIELDS_QUERYSIZE`` - Limits the query size used by the resolver to slices of the given value (global default is 10k). - This setting is mainly to avoid excessive memory usage from big querysets, where a direct - evaluation would try to cache everything into RAM. The global setting acts as a "damper" on all - reading querysets invoked by the resolver. - - The querysize can be further adjusted for individual computed fields as optional argument `querysize` - on the ``@computed`` decorator. This is especially useful, if a field has overly complicated - dependencies pulling much more into memory than other fields. Also see :ref:`memory-issues` in examples. - Basic usage ----------- @@ -280,7 +283,7 @@ computed field updates. In the next step ``resolver.bulk_updater`` applies `select_related` and `prefetch_related` optimizations to the queryset (if defined) and executes the queryset pulling all possible affected records. It walks the instances calculating computed field values in in topological order and places the results -in the database by batched `bulk_update` calls. +in the database by batched update calls. If another computed field on a different model depends on these changes the process repeats until all computed fields have been finally updated. @@ -645,11 +648,10 @@ Management Commands - ``--progress`` Show a progressbar during the run (needs :mod:`tqdm` to be installed). - - ``--mode {loop,bulk,fast}`` - Set the update operation mode explicitly. By default either `bulk` or `fast` will be used, depending on - ``COMPUTEDFIELDS_FASTUPDATE`` in `settings.py`. The mode `loop` resembles the old command behavior - and will update all computed fields instances by loop-saving. Its usage is strongly discouraged, - as it shows very bad update performance (can easily take hours to update bigger tables). This argument + - ``--mode {FAST,FLAT,SAVE,BULK,LOOP}`` + Set the update operation mode explicitly. By default ``COMPUTEDFIELDS_UPDATEMODE`` in `settings.py` + will be used. The mode `LOOP` resembles the old command behavior of version 0.1 by loop-saving. + Its usage is strongly discouraged, as it shows very bad update performance. This argument has no effect in conjunction with ``--from-json`` (always uses mode from `settings.py`). - ``--querysize NUMBER`` See ``COMPUTEDFIELDS_QUERYSIZE`` setting. diff --git a/example/example/settings.py b/example/example/settings.py index b778bc0..a37dd1a 100644 --- a/example/example/settings.py +++ b/example/example/settings.py @@ -42,9 +42,11 @@ 'django.contrib.staticfiles', ] + +# computedfields settings COMPUTEDFIELDS_ADMIN = True -# COMPUTEDFIELDS_MAP = os.path.join(BASE_DIR, 'map.pickle') -COMPUTEDFIELDS_FASTUPDATE = True +COMPUTEDFIELDS_UPDATEMODE = os.environ.get('UPDATEMODE', 'FAST') + MIDDLEWARE = [ 'django.middleware.security.SecurityMiddleware', diff --git a/example/requirements-ci-psycopg3.txt b/example/requirements-ci-psycopg3.txt index 05e5b47..5081815 100644 --- a/example/requirements-ci-psycopg3.txt +++ b/example/requirements-ci-psycopg3.txt @@ -9,4 +9,4 @@ typing_extensions>=4.13.2 psycopg>=3.1.8 psycopg-binary>=3.1.8 tqdm==4.66.3 -django-fast-update +django-fast-update>=0.3.0 diff --git a/example/requirements-ci.txt b/example/requirements-ci.txt index 31d20e2..6be3bcc 100644 --- a/example/requirements-ci.txt +++ b/example/requirements-ci.txt @@ -9,4 +9,4 @@ typing_extensions>=4.13.2 mysqlclient>=2.2.7 psycopg2-binary>=2.9.10 tqdm>=4.67.1 -django-fast-update +django-fast-update>=0.3.0 diff --git a/example/requirements-dev.txt b/example/requirements-dev.txt index 8757def..6feb1d9 100644 --- a/example/requirements-dev.txt +++ b/example/requirements-dev.txt @@ -10,4 +10,4 @@ typing_extensions>=4.13.2 mysqlclient>=2.2.7 psycopg2-binary>=2.9.10 tqdm>=4.67.1 -django-fast-update +django-fast-update>=0.3.0 diff --git a/example/requirements-rtd.txt b/example/requirements-rtd.txt index 793ecb8..db040f1 100644 --- a/example/requirements-rtd.txt +++ b/example/requirements-rtd.txt @@ -9,4 +9,4 @@ coverage>=7.6.1 typing_extensions>=4.13.2 tqdm>=4.67.1 -django-fast-update +django-fast-update>=0.3.0 diff --git a/example/test_full/tests/test_querysize.py b/example/test_full/tests/test_querysize.py index 414bc1f..c8c8058 100644 --- a/example/test_full/tests/test_querysize.py +++ b/example/test_full/tests/test_querysize.py @@ -7,7 +7,7 @@ class TestQuerysize(TestCase): def test_default(self): self.assertEqual( - active_resolver.get_querysize(Querysize, ['default']), + active_resolver.get_querysize(Querysize, frozenset(['default'])), settings.COMPUTEDFIELDS_QUERYSIZE ) self.assertEqual( @@ -19,7 +19,7 @@ def test_default(self): def test_default_altered(self): self.assertEqual(settings.COMPUTEDFIELDS_QUERYSIZE, 10000) self.assertEqual( - active_resolver.get_querysize(Querysize, ['default'], 10000), + active_resolver.get_querysize(Querysize, frozenset(['default']), 10000), settings.COMPUTEDFIELDS_QUERYSIZE ) @@ -28,22 +28,22 @@ def test_lowest_in_updates(self): self.assertEqual(active_resolver.get_querysize(Querysize), 1) self.assertEqual(active_resolver.get_querysize(Querysize, None, 10000), 1) # q10 limits - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q10']), 10) - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q10'], 10000), 10) - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q10', 'q100', 'q1000']), 10) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q10'])), 10) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q10']), 10000), 10) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q10', 'q100', 'q1000'])), 10) # q100 limits - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q100']), 100) - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q100', 'q1000'], 10000), 100) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q100'])), 100) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q100', 'q1000']), 10000), 100) # q1000 limits - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q1000'], 10000), 1000) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q1000']), 10000), 1000) def test_chain(self): # c_10_100 can do 100, but is limited by prev q10 - mro = active_resolver.get_local_mro(Querysize, ['q10']) - self.assertEqual(active_resolver.get_querysize(Querysize, mro, 10000), 1) + mro = active_resolver.get_local_mro(Querysize, frozenset(['q10'])) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(mro), 10000), 1) def test_low_override_wins(self): # q1000 wins - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q1000'], 10000), 1000) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q1000']), 10000), 1000) # override wins - self.assertEqual(active_resolver.get_querysize(Querysize, ['default', 'q1000'], 10), 10) + self.assertEqual(active_resolver.get_querysize(Querysize, frozenset(['default', 'q1000']), 10), 10) diff --git a/example/test_full/tests/test_union_related_settings.py b/example/test_full/tests/test_union_related_settings.py index fbd205e..8eac484 100644 --- a/example/test_full/tests/test_union_related_settings.py +++ b/example/test_full/tests/test_union_related_settings.py @@ -1,12 +1,19 @@ from django.test import TestCase from ..models import UAppartment, UPerson -from computedfields.models import update_dependent +from computedfields.models import active_resolver from django.db.transaction import atomic from time import time +from typing import cast +from computedfields.graph import IComputedField + PERSONS = 100 +def casted_cf(fieldname): + return cast(IComputedField, UPerson._meta.get_field(fieldname)) + + class UnionRelatedPerf(TestCase): def setUp(self): with atomic(): @@ -25,8 +32,10 @@ def test_rename_appartment_perf(self): self.assertEqual(UPerson.objects.filter(address='App #666, Hellway').count(), PERSONS+1) # patch select_related - UPerson._meta.get_field('address')._computed['select_related'] = ['appartment', 'parent__appartment'] - UPerson._meta.get_field('address')._computed['prefetch_related'] = [] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = ['appartment', 'parent__appartment'] + casted_cf('address')._computed['prefetch_related'] = [] start = time() with atomic(): self.a.street = 'Heaven Lane' @@ -36,8 +45,10 @@ def test_rename_appartment_perf(self): self.assertEqual(UPerson.objects.filter(address='App #777, Heaven Lane').count(), PERSONS+1) # patch prefetch_related - UPerson._meta.get_field('address')._computed['select_related'] = [] - UPerson._meta.get_field('address')._computed['prefetch_related'] = ['appartment', 'parent__appartment'] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = [] + casted_cf('address')._computed['prefetch_related'] = ['appartment', 'parent__appartment'] start = time() with atomic(): self.a.street = 'Celestial Border' @@ -49,8 +60,10 @@ def test_rename_appartment_perf(self): self.assertLess(sr, plain) self.assertLess(pr, plain) - UPerson._meta.get_field('address')._computed['select_related'] = [] - UPerson._meta.get_field('address')._computed['prefetch_related'] = [] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = [] + casted_cf('address')._computed['prefetch_related'] = [] @@ -75,8 +88,10 @@ def test_rename_appartment_perf(self): self.assertEqual(UPerson.objects.filter(address='App #666, Hellway').count(), PERSONS+1) # patch select_related - UPerson._meta.get_field('address')._computed['select_related'] = ['appartment', 'parent__appartment'] - UPerson._meta.get_field('address')._computed['prefetch_related'] = [] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = ['appartment', 'parent__appartment'] + casted_cf('address')._computed['prefetch_related'] = [] start = time() with atomic() and not_computed(recover=True): self.a.street = 'Heaven Lane' @@ -86,8 +101,10 @@ def test_rename_appartment_perf(self): self.assertEqual(UPerson.objects.filter(address='App #777, Heaven Lane').count(), PERSONS+1) # patch prefetch_related - UPerson._meta.get_field('address')._computed['select_related'] = [] - UPerson._meta.get_field('address')._computed['prefetch_related'] = ['appartment', 'parent__appartment'] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = [] + casted_cf('address')._computed['prefetch_related'] = ['appartment', 'parent__appartment'] start = time() with atomic() and not_computed(recover=True): self.a.street = 'Celestial Border' @@ -99,5 +116,7 @@ def test_rename_appartment_perf(self): self.assertLess(sr, plain) self.assertLess(pr, plain) - UPerson._meta.get_field('address')._computed['select_related'] = [] - UPerson._meta.get_field('address')._computed['prefetch_related'] = [] + active_resolver._cached_select_related.clear() + active_resolver._cached_prefetch_related.clear() + casted_cf('address')._computed['select_related'] = [] + casted_cf('address')._computed['prefetch_related'] = [] diff --git a/setup.py b/setup.py index 61d3ce2..60fca9e 100644 --- a/setup.py +++ b/setup.py @@ -16,7 +16,7 @@ def get_version(): install_requires=[ 'Django>=4.2,<6.0', 'typing_extensions>=4.1', - 'django-fast-update' + 'django-fast-update>=0.3.0' ], version=get_version(), license='MIT',