|
1 | | -from typing import Dict, Generic, Iterable, List, Set, Tuple, TypeVar |
| 1 | +from collections import defaultdict |
| 2 | +from typing import Dict, Generic, Iterable, List, Set, Tuple, TypeVar, Union |
2 | 3 |
|
3 | 4 | from ..dynamic_typing import DOptional, ModelMeta, ModelPtr |
4 | 5 |
|
@@ -28,13 +29,36 @@ def insert_before(self, value: T, *before: T): |
28 | 29 | raise ValueError |
29 | 30 | pos = min(ix) |
30 | 31 | self.insert(pos, value) |
| 32 | + return pos |
31 | 33 |
|
32 | 34 | def insert_after(self, value: T, *after: T): |
33 | 35 | ix = self._safe_indexes(*after) |
34 | 36 | if not ix: |
35 | 37 | raise ValueError |
36 | | - pos = max(ix) |
37 | | - self.insert(pos + 1, value) |
| 38 | + pos = max(ix) + 1 |
| 39 | + self.insert(pos, value) |
| 40 | + return pos |
| 41 | + |
| 42 | + |
| 43 | +class PositionsDict(defaultdict): |
| 44 | + INC = object() |
| 45 | + |
| 46 | + def __init__(self, default_factory=int, **kwargs): |
| 47 | + super().__init__(default_factory, **kwargs) |
| 48 | + |
| 49 | + def update_position(self, key: str, value: Union[object, int]): |
| 50 | + if value is self.INC: |
| 51 | + value = self[key] + 1 |
| 52 | + if key in self: |
| 53 | + old_value = self[key] |
| 54 | + delta = value - old_value |
| 55 | + else: |
| 56 | + old_value = value |
| 57 | + delta = 1 |
| 58 | + for k, v in self.items(): |
| 59 | + if k != key and v >= old_value: |
| 60 | + self[k] += delta |
| 61 | + self[key] = value |
38 | 62 |
|
39 | 63 |
|
40 | 64 | def compose_models(models_map: Dict[str, ModelMeta]) -> ModelsStructureType: |
@@ -91,6 +115,59 @@ def compose_models(models_map: Dict[str, ModelMeta]) -> ModelsStructureType: |
91 | 115 | return root_models, path_injections |
92 | 116 |
|
93 | 117 |
|
| 118 | +def compose_models_flat(models_map: Dict[Index, ModelMeta]) -> ModelsStructureType: |
| 119 | + """ |
| 120 | + Generate flat sorted (by nesting level, ASC) models structure for internal usage. |
| 121 | +
|
| 122 | + :param models_map: Mapping (model index -> model meta instance). |
| 123 | + :return: List of root models data, Map(child model -> root model) for absolute ref generation |
| 124 | + """ |
| 125 | + root_models = ListEx() |
| 126 | + positions: PositionsDict[Index, int] = PositionsDict() |
| 127 | + top_level_models: Set[Index] = set() |
| 128 | + structure_hash_table: Dict[Index, dict] = { |
| 129 | + key: { |
| 130 | + "model": model, |
| 131 | + "nested": ListEx(), |
| 132 | + "roots": list(extract_root(model)), # Indexes of root level models |
| 133 | + } for key, model in models_map.items() |
| 134 | + } |
| 135 | + |
| 136 | + for key, model in models_map.items(): |
| 137 | + pointers = list(filter_pointers(model)) |
| 138 | + has_root_pointers = len(pointers) != len(model.pointers) |
| 139 | + if not pointers: |
| 140 | + # Root level model |
| 141 | + if not has_root_pointers: |
| 142 | + raise Exception(f'Model {model.name} has no pointers') |
| 143 | + root_models.insert(positions["root"], structure_hash_table[key]) |
| 144 | + top_level_models.add(key) |
| 145 | + positions.update_position("root", PositionsDict.INC) |
| 146 | + else: |
| 147 | + parents = {ptr.parent.index for ptr in pointers} |
| 148 | + struct = structure_hash_table[key] |
| 149 | + # Model is using by other models |
| 150 | + if has_root_pointers or len(parents) > 1 and len(struct["roots"]) >= 1: |
| 151 | + # Model is using by different root models |
| 152 | + if parents & top_level_models: |
| 153 | + parents.add("root") |
| 154 | + parents_positions = {positions[parent_key] for parent_key in parents |
| 155 | + if parent_key in positions} |
| 156 | + parents_joined = "#".join(sorted(parents)) |
| 157 | + if parents_joined in positions: |
| 158 | + parents_positions.add(positions[parents_joined]) |
| 159 | + pos = max(parents_positions) if parents_positions else len(root_models) |
| 160 | + positions.update_position(parents_joined, pos + 1) |
| 161 | + else: |
| 162 | + # Model is using by only one model |
| 163 | + parent = next(iter(parents)) |
| 164 | + pos = positions.get(parent, len(root_models)) |
| 165 | + positions.update_position(key, pos + 1) |
| 166 | + root_models.insert(pos, struct) |
| 167 | + |
| 168 | + return root_models, {} |
| 169 | + |
| 170 | + |
94 | 171 | def filter_pointers(model: ModelMeta) -> Iterable[ModelPtr]: |
95 | 172 | """ |
96 | 173 | Return iterator over pointers with not None parent |
|
0 commit comments