diff --git a/integration_tests/test_client.py b/integration_tests/test_client.py index ffd2f29..b432d26 100644 --- a/integration_tests/test_client.py +++ b/integration_tests/test_client.py @@ -52,8 +52,10 @@ def test_get_article(): def test_get_article_with_category_name(): - with pytest.raises(NotImplementedError): - picnic.get_article("s1018620", add_category_name=True) + response = picnic.get_article("s1018620", add_category=True) + assert isinstance(response, dict) + assert "category" in response + assert response["category"]["name"] == "H-Milch" def test_get_article_by_gtin(): @@ -81,7 +83,8 @@ def test_add_product(): assert isinstance(response, dict) assert "items" in response - assert any(item["id"] == "s1018620" for item in response["items"][0]["items"]) + assert any( + item["id"] == "s1018620" for item in response["items"][0]["items"]) assert _get_amount(response, "s1018620") == 2 diff --git a/src/python_picnic_api2/client.py b/src/python_picnic_api2/client.py index 9e37c21..fd61ee7 100644 --- a/src/python_picnic_api2/client.py +++ b/src/python_picnic_api2/client.py @@ -8,6 +8,7 @@ _extract_search_results, _tree_generator, _url_generator, + find_nodes_by_content, ) from .session import PicnicAPISession, PicnicAuthError @@ -100,10 +101,9 @@ def search(self, term: str): def get_cart(self): return self._get("/cart") - def get_article(self, article_id: str, add_category_name=False): - if add_category_name: - raise NotImplementedError() - path = f"/pages/product-details-page-root?id={article_id}" + def get_article(self, article_id: str, add_category=False): + path = f"/pages/product-details-page-root?id={article_id}" + \ + "&show_category_action=true" data = self._get(path, add_picnic_headers=True) article_details = [] for block in data["body"]["child"]["child"]["children"]: @@ -113,11 +113,28 @@ def get_article(self, article_id: str, add_category_name=False): if len(article_details) == 0: return None + article = {} + if add_category: + cat_node = find_nodes_by_content( + data, {"id": "category-button"}, max_nodes=1) + if len(cat_node) == 0: + raise KeyError( + f"Could not extract category from article with id {article_id}") + category_regex = re.compile( + "app\\.picnic:\\/\\/categories\\/(\\d+)\\/l2\\/(\\d+)\\/l3\\/(\\d+)") + cat_ids = category_regex.match( + cat_node[0]["pml"]["component"]["onPress"]["target"]).groups() + article["category"] = self.get_category_by_ids( + int(cat_ids[1]), int(cat_ids[2])) + color_regex = re.compile(r"#\(#\d{6}\)") - producer = re.sub(color_regex, "", str(article_details[1].get("markdown", ""))) - article_name = re.sub(color_regex, "", str(article_details[0]["markdown"])) + producer = re.sub(color_regex, "", str( + article_details[1].get("markdown", ""))) + article_name = re.sub(color_regex, "", str( + article_details[0]["markdown"])) - article = {"name": f"{producer} {article_name}", "id": article_id} + article["name"] = f"{producer} {article_name}" + article["id"] = article_id return article @@ -170,6 +187,17 @@ def get_current_deliveries(self): def get_categories(self, depth: int = 0): return self._get(f"/my_store?depth={depth}")["catalog"] + def get_category_by_ids(self, l2_id: int, l3_id: int): + path = "/pages/L2-category-page-root" + \ + f"?category_id={l2_id}&l3_category_id={l3_id}" + data = self._get(path, add_picnic_headers=True) + nodes = find_nodes_by_content( + data, {"id": f"vertical-article-tiles-sub-header-{l3_id}"}, max_nodes=1) + if len(nodes) == 0: + raise KeyError("Could not find category with specified IDs") + return {"l2_id": l2_id, "l3_id": l3_id, + "name": nodes[0]["pml"]["component"]["accessibilityLabel"]} + def print_categories(self, depth: int = 0): tree = "\n".join(_tree_generator(self.get_categories(depth=depth))) print(tree) diff --git a/src/python_picnic_api2/helper.py b/src/python_picnic_api2/helper.py index 9bd08cb..8a96518 100644 --- a/src/python_picnic_api2/helper.py +++ b/src/python_picnic_api2/helper.py @@ -85,39 +85,66 @@ def get_image(id: str, size="regular", suffix="webp"): return f"{IMAGE_BASE_URL}/{id}/{size}.{suffix}" +def find_nodes_by_content(node, filter, max_nodes: int = 10): + nodes = [] + + if len(nodes) >= 10: + return nodes + + def is_dict_included(node_dict, filter_dict): + for k, v in filter_dict.items(): + if k not in node_dict: + return False + if isinstance(v, dict) and isinstance(node_dict[k], dict): + if not is_dict_included(node_dict[k], v): + return False + elif node_dict[k] != v and v is not None: + return False + return True + + if is_dict_included(node, filter): + nodes.append(node) + + if isinstance(node, dict): + for _, v in node.items(): + if isinstance(v, dict): + nodes.extend(find_nodes_by_content(v, filter, max_nodes)) + continue + if isinstance(v, list): + for item in v: + if isinstance(v, dict | list): + nodes.extend(find_nodes_by_content( + item, filter, max_nodes)) + continue + + return nodes + + def _extract_search_results(raw_results, max_items: int = 10): """Extract search results from the nested dictionary structure returned by Picnic search. Number of max items can be defined to reduce excessive nested search""" - search_results = [] LOGGER.debug(f"Extracting search results from {raw_results}") - def find_articles(node): - if len(search_results) >= max_items: - return - - content = node.get("content", {}) - if content.get("type") == "SELLING_UNIT_TILE" and "sellingUnit" in content: - selling_unit = content["sellingUnit"] - sole_article_ids = SOLE_ARTICLE_ID_PATTERN.findall(json.dumps(node)) - sole_article_id = sole_article_ids[0] if sole_article_ids else None - result_entry = { - **selling_unit, - "sole_article_id": sole_article_id, - } - LOGGER.debug(f"Found article {result_entry}") - search_results.append(result_entry) - - for child in node.get("children", []): - find_articles(child) - - if "child" in node: - find_articles(node.get("child")) - body = raw_results.get("body", {}) - find_articles(body.get("child", {})) + nodes = find_nodes_by_content(body.get("child", {}), { + "type": "SELLING_UNIT_TILE", "sellingUnit": {}}) - LOGGER.debug(f"Found {len(search_results)}/{max_items} products after extraction") + search_results = [] + for node in nodes: + selling_unit = node["sellingUnit"] + sole_article_ids = SOLE_ARTICLE_ID_PATTERN.findall( + json.dumps(node)) + sole_article_id = sole_article_ids[0] if sole_article_ids else None + result_entry = { + **selling_unit, + "sole_article_id": sole_article_id, + } + LOGGER.debug(f"Found article {result_entry}") + search_results.append(result_entry) + + LOGGER.debug( + f"Found {len(search_results)}/{max_items} products after extraction") return [{"items": search_results}] diff --git a/tests/test_client.py b/tests/test_client.py index fdd4871..1b638d7 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -108,12 +108,77 @@ def test_search_encoding(self): ) def test_get_article(self): - self.client.get_article("p3f2qa") + self.session_mock().get.return_value = self.MockResponse( + {"body": {"child": {"child": {"children": [{ + "id": "product-details-page-root-main-container", + "pml": { + "component": { + "children": [ + { + "markdown": "#(#333333)Goede start halvarine#(#333333)", + }, + { + "markdown": "Blue Band", + }, + + ] + } + } + }]}}}}, + 200 + ) + + article = self.client.get_article("p3f2qa") self.session_mock().get.assert_called_with( - "https://storefront-prod.nl.picnicinternational.com/api/15/pages/product-details-page-root?id=p3f2qa", + "https://storefront-prod.nl.picnicinternational.com/api/15/pages/product-details-page-root?id=p3f2qa&show_category_action=true", headers=PICNIC_HEADERS, ) + self.assertEqual( + article, {'name': 'Blue Band Goede start halvarine', 'id': 'p3f2qa'}) + + def test_get_article_with_category(self): + self.session_mock().get.return_value = self.MockResponse( + {"body": {"child": {"child": {"children": [{ + "id": "product-details-page-root-main-container", + "pml": { + "component": { + "children": [ + { + "markdown": "#(#333333)Goede start halvarine#(#333333)", + }, + { + "markdown": "Blue Band", + }, + + ] + } + } + }, + { + "id": "category-button", + "pml": {"component": {"onPress": {"target": "app.picnic://categories/1000/l2/2000/l3/3000"}}} + }]}}}}, + 200 + ) + + category_patch = patch( + "python_picnic_api2.client.PicnicAPI.get_category_by_ids") + category_patch.start().return_value = { + "l2_id": 2000, "l3_id": 3000, "name": "Test"} + + article = self.client.get_article("p3f2qa", True) + + category_patch.stop() + self.session_mock().get.assert_called_with( + "https://storefront-prod.nl.picnicinternational.com/api/15/pages/product-details-page-root?id=p3f2qa&show_category_action=true", + headers=PICNIC_HEADERS, + ) + + self.assertEqual( + article, {'name': 'Blue Band Goede start halvarine', 'id': 'p3f2qa', + "category": {"l2_id": 2000, "l3_id": 3000, "name": "Test"}}) + def test_get_article_by_gtin(self): self.client.get_article_by_gtin("123456789") self.session_mock().get.assert_called_with( @@ -220,6 +285,30 @@ def test_get_categories(self): {"type": "CATEGORY", "id": "purchases", "name": "Besteld"}, ) + def test_get_category_by_ids(self): + self.session_mock().get.return_value = self.MockResponse( + {"children": [ + { + "id": "vertical-article-tiles-sub-header-22193", + "pml": { + "component": { + "accessibilityLabel": "Halvarine" + } + } + } + ]}, + 200 + ) + + category = self.client.get_category_by_ids(1000, 22193) + self.session_mock().get.assert_called_with( + f"{self.expected_base_url}/pages/L2-category-page-root" + + "?category_id=1000&l3_category_id=22193", headers=PICNIC_HEADERS + ) + + self.assertDictEqual( + category, {"name": "Halvarine", "l2_id": 1000, "l3_id": 22193}) + def test_get_auth_exception(self): self.session_mock().get.return_value = self.MockResponse( {"error": {"code": "AUTH_ERROR"}}, 400