Skip to content

Commit 19d4d45

Browse files
committed
Add V2_importer to collect advisories from EUVD
Signed-off-by: Sampurna Pyne <sampurnapyne1710@gmail.com>
1 parent ab99939 commit 19d4d45

File tree

5 files changed

+391
-0
lines changed

5 files changed

+391
-0
lines changed

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
from vulnerabilities.pipelines.v2_importers import (
4848
elixir_security_importer as elixir_security_importer_v2,
4949
)
50+
from vulnerabilities.pipelines.v2_importers import euvd_importer as euvd_importer_v2
5051
from vulnerabilities.pipelines.v2_importers import github_osv_importer as github_osv_importer_v2
5152
from vulnerabilities.pipelines.v2_importers import gitlab_importer as gitlab_importer_v2
5253
from vulnerabilities.pipelines.v2_importers import istio_importer as istio_importer_v2
@@ -75,6 +76,7 @@
7576
pysec_importer_v2.PyPIImporterPipeline,
7677
xen_importer_v2.XenImporterPipeline,
7778
curl_importer_v2.CurlImporterPipeline,
79+
euvd_importer_v2.EUVDImporterPipeline,
7880
oss_fuzz_v2.OSSFuzzImporterPipeline,
7981
istio_importer_v2.IstioImporterPipeline,
8082
postgresql_importer_v2.PostgreSQLImporterPipeline,
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import json
11+
import logging
12+
import requests
13+
import time
14+
from datetime import datetime
15+
from http import HTTPStatus
16+
from typing import Iterable
17+
18+
from dateutil import parser as dateparser
19+
20+
from vulnerabilities.importer import AdvisoryData
21+
from vulnerabilities.importer import ReferenceV2
22+
from vulnerabilities.importer import VulnerabilitySeverity
23+
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
24+
from vulnerabilities.severity_systems import SCORING_SYSTEMS
25+
26+
logger = logging.getLogger(__name__)
27+
28+
29+
class EUVDImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
30+
"""
31+
EUVD (EU Vulnerability Database) Importer Pipeline
32+
33+
This pipeline imports security advisories from the European Union Vulnerability Database (EUVD).
34+
"""
35+
36+
pipeline_id = "euvd_importer_v2"
37+
spdx_license_expression = "LicenseRef-scancode-other-permissive"
38+
license_url = "https://www.enisa.europa.eu/about-enisa/legal-notice/"
39+
url = "https://euvdservices.enisa.europa.eu/api/search"
40+
41+
def __init__(self):
42+
super().__init__()
43+
self._cached_data = None
44+
45+
@classmethod
46+
def steps(cls):
47+
return (cls.collect_and_store_advisories,)
48+
49+
def fetch_data(self):
50+
# Return cached data if already fetched
51+
if self._cached_data is not None:
52+
logger.info(f"Using cached data: {len(self._cached_data)} items")
53+
return self._cached_data
54+
55+
headers = {"User-Agent": "VulnerableCode"}
56+
all_items = []
57+
page = 0
58+
size = 100
59+
max_retries = 100
60+
61+
logger.info(f"Fetching data from EUVD API: {self.url}")
62+
63+
while True:
64+
65+
retry_count = 0
66+
success = False
67+
68+
while retry_count < max_retries and not success:
69+
try:
70+
params = {"size": size, "page": page}
71+
response = requests.get(self.url, headers=headers, params=params, timeout=30)
72+
73+
if response.status_code != HTTPStatus.OK:
74+
logger.error(f"API returned status {response.status_code} for page {page}")
75+
retry_count += 1
76+
if retry_count < max_retries:
77+
sleep_time = min(10 * (2 ** min(retry_count - 1, 5)), 60)
78+
logger.info(f"Retrying page {page} in {sleep_time}s (attempt {retry_count}/{max_retries})")
79+
time.sleep(sleep_time)
80+
continue
81+
else:
82+
logger.error(f"Max retries reached for page {page}")
83+
return all_items
84+
85+
data = response.json()
86+
items = data.get("items", [])
87+
88+
if not items:
89+
logger.info(f"No items in response for page {page}; stopping fetch.")
90+
logger.info(f"Fetch completed successfully. Total items collected: {len(all_items)}")
91+
92+
# Cache the fetched data for reuse
93+
self._cached_data = all_items
94+
logger.info(f"Cached {len(all_items)} items for reuse")
95+
96+
return all_items
97+
98+
all_items.extend(items)
99+
logger.info(f"Fetched page {page}: {len(items)} items (total: {len(all_items)})")
100+
success = True
101+
page += 1
102+
103+
except requests.exceptions.Timeout as e:
104+
retry_count += 1
105+
if retry_count < max_retries:
106+
logger.warning(f"Timeout on page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})")
107+
time.sleep(10)
108+
else:
109+
logger.error(f"Max retries reached for page {page} after timeout")
110+
return all_items
111+
112+
except Exception as e:
113+
retry_count += 1
114+
if retry_count < max_retries:
115+
logger.error(f"Error fetching page {page}: {e}. Retrying in 10s (attempt {retry_count}/{max_retries})")
116+
time.sleep(10)
117+
else:
118+
logger.error(f"Max retries reached for page {page}")
119+
return all_items
120+
121+
def advisories_count(self) -> int:
122+
return len(self.fetch_data())
123+
124+
def collect_advisories(self) -> Iterable[AdvisoryData]:
125+
for raw_data in self.fetch_data():
126+
try:
127+
advisory = self.parse_advisory(raw_data)
128+
if advisory:
129+
yield advisory
130+
except Exception as e:
131+
logger.error(f"Failed to parse advisory: {e}")
132+
logger.debug(f"Raw data: {raw_data}")
133+
continue
134+
135+
def parse_advisory(self, raw_data: dict) -> AdvisoryData:
136+
advisory_id = raw_data.get("id", "")
137+
138+
aliases = [advisory_id] if advisory_id else []
139+
aliases_str = raw_data.get("aliases", "")
140+
if aliases_str:
141+
cve_aliases = [alias.strip() for alias in aliases_str.split("\n") if alias.strip()]
142+
aliases.extend(cve_aliases)
143+
144+
summary = raw_data.get("description", "")
145+
146+
date_published = None
147+
date_str = raw_data.get("datePublished", "")
148+
if date_str:
149+
try:
150+
date_published = dateparser.parse(date_str)
151+
if date_published and date_published.tzinfo is None:
152+
date_published = date_published.replace(tzinfo=datetime.now().astimezone().tzinfo)
153+
except Exception as e:
154+
logger.warning(f"Failed to parse date '{date_str}': {e}")
155+
156+
references = []
157+
references_str = raw_data.get("references", "")
158+
if references_str:
159+
urls = [url.strip() for url in references_str.split("\n") if url.strip()]
160+
for url in urls:
161+
references.append(ReferenceV2(url=url))
162+
163+
if advisory_id:
164+
advisory_url = f"https://euvd.enisa.europa.eu/vulnerability/{advisory_id}"
165+
references.append(ReferenceV2(url=advisory_url))
166+
167+
severities = []
168+
base_score = raw_data.get("baseScore")
169+
base_score_version = raw_data.get("baseScoreVersion")
170+
base_score_vector = raw_data.get("baseScoreVector")
171+
172+
if base_score and base_score_version:
173+
scoring_system = self.get_scoring_system(base_score_version)
174+
if scoring_system:
175+
severity = VulnerabilitySeverity(
176+
system=scoring_system,
177+
value=str(base_score),
178+
scoring_elements=base_score_vector or "",
179+
)
180+
severities.append(severity)
181+
182+
return AdvisoryData(
183+
advisory_id=advisory_id,
184+
aliases=aliases,
185+
summary=summary,
186+
references_v2=references,
187+
affected_packages=[],
188+
date_published=date_published,
189+
url=advisory_url if advisory_id else "",
190+
severities=severities,
191+
original_advisory_text=json.dumps(raw_data, indent=2, ensure_ascii=False),
192+
)
193+
194+
@staticmethod
195+
def get_scoring_system(version: str):
196+
version_map = {
197+
"4.0": "cvssv4",
198+
"3.1": "cvssv3.1",
199+
"3.0": "cvssv3",
200+
"2.0": "cvssv2",
201+
}
202+
system_key = version_map.get(version)
203+
if system_key:
204+
return SCORING_SYSTEMS.get(system_key)
205+
logger.warning(f"Unknown CVSS version: {version}")
206+
return None
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#
2+
# Copyright (c) nexB Inc. and others. All rights reserved.
3+
# VulnerableCode is a trademark of nexB Inc.
4+
# SPDX-License-Identifier: Apache-2.0
5+
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
6+
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
7+
# See https://aboutcode.org for more information about nexB OSS projects.
8+
#
9+
10+
import json
11+
from pathlib import Path
12+
from unittest import TestCase
13+
from unittest.mock import Mock
14+
from unittest.mock import patch
15+
16+
from vulnerabilities.importer import AdvisoryData
17+
from vulnerabilities.pipelines.v2_importers.euvd_importer import EUVDImporterPipeline
18+
19+
TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "euvd"
20+
21+
22+
class TestEUVDImporterPipeline(TestCase):
23+
@patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get")
24+
def test_collect_advisories(self, mock_get):
25+
"""Test collecting and parsing advisories from test data"""
26+
sample1_path = TEST_DATA / "euvd_sample1.json"
27+
sample2_path = TEST_DATA / "euvd_sample2.json"
28+
29+
sample1 = json.loads(sample1_path.read_text(encoding="utf-8"))
30+
sample2 = json.loads(sample2_path.read_text(encoding="utf-8"))
31+
32+
mock_responses = [
33+
Mock(status_code=200, json=lambda: sample1),
34+
Mock(status_code=200, json=lambda: sample2),
35+
Mock(status_code=200, json=lambda: {"items": []}),
36+
]
37+
mock_get.side_effect = mock_responses
38+
39+
pipeline = EUVDImporterPipeline()
40+
advisories = list(pipeline.collect_advisories())
41+
42+
assert len(advisories) == 5
43+
44+
first = advisories[0]
45+
assert isinstance(first, AdvisoryData)
46+
assert first.advisory_id == "EUVD-2025-197757"
47+
assert "EUVD-2025-197757" in first.aliases
48+
assert "CVE-2025-13284" in first.aliases
49+
assert (
50+
first.summary == "ThinPLUS vulnerability that allows remote code execution"
51+
)
52+
assert first.date_published is not None
53+
assert len(first.severities) == 1
54+
assert first.severities[0].system.identifier == "cvssv3.1"
55+
assert first.severities[0].value == "9.8"
56+
assert first.severities[0].scoring_elements == "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H"
57+
58+
urls = [ref.url for ref in first.references_v2]
59+
assert "https://nvd.nist.gov/vuln/detail/CVE-2025-13284" in urls
60+
assert "https://euvd.enisa.europa.eu/vulnerability/EUVD-2025-197757" in urls
61+
62+
second = advisories[1]
63+
assert second.advisory_id == "EUVD-2024-123456"
64+
assert "CVE-2024-12345" in second.aliases
65+
assert "CVE-2024-67890" in second.aliases
66+
assert len([a for a in second.aliases if a.startswith("CVE-")]) == 2
67+
68+
urls = [ref.url for ref in second.references_v2]
69+
assert "https://example.com/advisory1" in urls
70+
assert "https://example.com/advisory2" in urls
71+
72+
third = advisories[2]
73+
assert third.advisory_id == "EUVD-2023-999999"
74+
assert third.severities[0].system.identifier == "cvssv3"
75+
assert third.severities[0].value == "5.3"
76+
77+
fourth = advisories[3]
78+
assert fourth.advisory_id == "EUVD-2022-555555"
79+
assert fourth.summary == ""
80+
assert fourth.severities[0].system.identifier == "cvssv2"
81+
assert fourth.severities[0].value == "4.3"
82+
83+
fifth = advisories[4]
84+
assert fifth.advisory_id == "EUVD-2021-111111"
85+
assert len([a for a in fifth.aliases if a.startswith("CVE-")]) == 0
86+
assert fifth.summary == "Advisory without CVE alias but with EUVD ID"
87+
88+
def test_get_scoring_system(self):
89+
"""Test CVSS version to scoring system mapping"""
90+
pipeline = EUVDImporterPipeline()
91+
92+
system_v4 = pipeline.get_scoring_system("4.0")
93+
assert system_v4 is not None
94+
assert system_v4.identifier == "cvssv4"
95+
96+
system_v31 = pipeline.get_scoring_system("3.1")
97+
assert system_v31 is not None
98+
assert system_v31.identifier == "cvssv3.1"
99+
100+
system_v3 = pipeline.get_scoring_system("3.0")
101+
assert system_v3 is not None
102+
assert system_v3.identifier == "cvssv3"
103+
104+
system_v2 = pipeline.get_scoring_system("2.0")
105+
assert system_v2 is not None
106+
assert system_v2.identifier == "cvssv2"
107+
108+
system_unknown = pipeline.get_scoring_system("unknown")
109+
assert system_unknown is None
110+
111+
@patch("vulnerabilities.pipelines.v2_importers.euvd_importer.requests.get")
112+
def test_advisories_count(self, mock_get):
113+
"""Test counting advisories"""
114+
sample_data = {"items": [{"id": "1"}, {"id": "2"}, {"id": "3"}]}
115+
mock_responses = [
116+
Mock(status_code=200, json=lambda: sample_data),
117+
Mock(status_code=200, json=lambda: {"items": []}),
118+
]
119+
mock_get.side_effect = mock_responses
120+
121+
pipeline = EUVDImporterPipeline()
122+
count = pipeline.advisories_count()
123+
124+
assert count == 3
125+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
{
2+
"items": [
3+
{
4+
"id": "EUVD-2025-197757",
5+
"aliases": "CVE-2025-13284",
6+
"description": "ThinPLUS vulnerability that allows remote code execution",
7+
"datePublished": "2025-01-09T01:00:00.000Z",
8+
"baseScore": "9.8",
9+
"baseScoreVersion": "3.1",
10+
"baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H",
11+
"references": "https://nvd.nist.gov/vuln/detail/CVE-2025-13284"
12+
},
13+
{
14+
"id": "EUVD-2024-123456",
15+
"aliases": "CVE-2024-12345\nCVE-2024-67890",
16+
"description": "Multiple vulnerabilities in authentication system",
17+
"datePublished": "2024-12-15T10:30:00.000Z",
18+
"baseScore": "7.5",
19+
"baseScoreVersion": "3.1",
20+
"baseScoreVector": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:N/A:N",
21+
"references": "https://example.com/advisory1\nhttps://example.com/advisory2"
22+
},
23+
{
24+
"id": "EUVD-2023-999999",
25+
"aliases": "CVE-2023-99999",
26+
"description": "Denial of service vulnerability",
27+
"datePublished": "2023-06-20T14:22:00.000Z",
28+
"baseScore": "5.3",
29+
"baseScoreVersion": "3.0",
30+
"baseScoreVector": "CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:L",
31+
"references": "https://security.example.org/2023-999999"
32+
}
33+
]
34+
}

0 commit comments

Comments
 (0)