diff --git a/vulnerabilities/migrations/0104_ssvc.py b/vulnerabilities/migrations/0104_ssvc.py new file mode 100644 index 000000000..82e54b8db --- /dev/null +++ b/vulnerabilities/migrations/0104_ssvc.py @@ -0,0 +1,51 @@ +# Generated by Django 4.2.25 on 2025-11-26 13:31 + +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + ("vulnerabilities", "0103_codecommit_impactedpackage_affecting_commits_and_more"), + ] + + operations = [ + migrations.CreateModel( + name="SSVC", + fields=[ + ( + "id", + models.AutoField( + auto_created=True, primary_key=True, serialize=False, verbose_name="ID" + ), + ), + ( + "vector", + models.CharField( + help_text="The vector string representing the SSVC.", max_length=255 + ), + ), + ( + "options", + models.JSONField(help_text="A JSON object containing the SSVC options."), + ), + ( + "decision", + models.CharField(help_text="The decision string for the SSVC.", max_length=255), + ), + ( + "advisory", + models.ForeignKey( + help_text="The advisory associated with this SSVC.", + on_delete=django.db.models.deletion.CASCADE, + related_name="ssvc_entries", + to="vulnerabilities.advisoryv2", + ), + ), + ], + options={ + "unique_together": {("vector", "advisory", "decision")}, + }, + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index e1c4ddc6b..8358ae558 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -3414,3 +3414,21 @@ class CodeCommit(models.Model): class Meta: unique_together = ("commit_hash", "vcs_url") + + +class SSVC(models.Model): + vector = models.CharField(max_length=255, help_text="The vector string representing the SSVC.") + options = models.JSONField(help_text="A JSON object containing the SSVC options.") + advisory = models.ForeignKey( + AdvisoryV2, + on_delete=models.CASCADE, + related_name="ssvc_entries", + help_text="The advisory associated with this SSVC.", + ) + decision = models.CharField(max_length=255, help_text="The decision string for the SSVC.") + + def __str__(self): + return f"SSVC for Advisory {self.advisory.advisory_id}: {self.decision}" + + class Meta: + unique_together = ("vector", "advisory", "decision") diff --git a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py index a596d8d65..7de7ff7d7 100644 --- a/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py +++ b/vulnerabilities/pipelines/v2_importers/vulnrichment_importer.py @@ -16,6 +16,7 @@ from vulnerabilities.utils import get_advisory_url from vulnerabilities.utils import get_cwe_id from vulnerabilities.utils import get_reference_id +from vulnerabilities.utils import ssvc_calculator logger = logging.getLogger(__name__) @@ -210,117 +211,3 @@ def clean_downloads(self): def on_failure(self): self.clean_downloads() - - -def ssvc_calculator(ssvc_data): - """ - Return the ssvc vector and the decision value - """ - options = ssvc_data.get("options", []) - timestamp = ssvc_data.get("timestamp") - - # Extract the options into a dictionary - options_dict = {k: v.lower() for option in options for k, v in option.items()} - - # We copied the table value from this link. - # https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf - - # Determining Mission and Well-Being Impact Value - mission_well_being_table = { - # (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being" - ("minimal", "minimal"): "low", - ("minimal", "material"): "medium", - ("minimal", "irreversible"): "high", - ("support", "minimal"): "medium", - ("support", "material"): "medium", - ("support", "irreversible"): "high", - ("essential", "minimal"): "high", - ("essential", "material"): "high", - ("essential", "irreversible"): "high", - } - - if "Mission Prevalence" not in options_dict: - options_dict["Mission Prevalence"] = "minimal" - - if "Public Well-being Impact" not in options_dict: - options_dict["Public Well-being Impact"] = "material" - - options_dict["Mission & Well-being"] = mission_well_being_table[ - (options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"]) - ] - - decision_key = ( - options_dict.get("Exploitation"), - options_dict.get("Automatable"), - options_dict.get("Technical Impact"), - options_dict.get("Mission & Well-being"), - ) - - decision_points = { - "Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}}, - "Automatable": {"A": {"no": "N", "yes": "Y"}}, - "Technical Impact": {"T": {"partial": "P", "total": "T"}}, - "Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}}, - "Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}}, - "Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}}, - } - - # Create the SSVC vector - ssvc_vector = "SSVCv2/" - for key, value_map in options_dict.items(): - options_key = decision_points.get(key) - for lhs, rhs_map in options_key.items(): - ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/" - - # "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}}, - decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"} - - decision_lookup = { - ("none", "no", "partial", "low"): "Track", - ("none", "no", "partial", "medium"): "Track", - ("none", "no", "partial", "high"): "Track", - ("none", "no", "total", "low"): "Track", - ("none", "no", "total", "medium"): "Track", - ("none", "no", "total", "high"): "Track*", - ("none", "yes", "partial", "low"): "Track", - ("none", "yes", "partial", "medium"): "Track", - ("none", "yes", "partial", "high"): "Attend", - ("none", "yes", "total", "low"): "Track", - ("none", "yes", "total", "medium"): "Track", - ("none", "yes", "total", "high"): "Attend", - ("poc", "no", "partial", "low"): "Track", - ("poc", "no", "partial", "medium"): "Track", - ("poc", "no", "partial", "high"): "Track*", - ("poc", "no", "total", "low"): "Track", - ("poc", "no", "total", "medium"): "Track*", - ("poc", "no", "total", "high"): "Attend", - ("poc", "yes", "partial", "low"): "Track", - ("poc", "yes", "partial", "medium"): "Track", - ("poc", "yes", "partial", "high"): "Attend", - ("poc", "yes", "total", "low"): "Track", - ("poc", "yes", "total", "medium"): "Track*", - ("poc", "yes", "total", "high"): "Attend", - ("active", "no", "partial", "low"): "Track", - ("active", "no", "partial", "medium"): "Track", - ("active", "no", "partial", "high"): "Attend", - ("active", "no", "total", "low"): "Track", - ("active", "no", "total", "medium"): "Attend", - ("active", "no", "total", "high"): "Act", - ("active", "yes", "partial", "low"): "Attend", - ("active", "yes", "partial", "medium"): "Attend", - ("active", "yes", "partial", "high"): "Act", - ("active", "yes", "total", "low"): "Attend", - ("active", "yes", "total", "medium"): "Act", - ("active", "yes", "total", "high"): "Act", - } - - decision = decision_lookup.get(decision_key, "") - - if decision: - ssvc_vector += f"D:{decision_values.get(decision)}/" - - if timestamp: - timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ") - - ssvc_vector += f"{timestamp_formatted}/" - return ssvc_vector, decision diff --git a/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py new file mode 100644 index 000000000..ccc6ae455 --- /dev/null +++ b/vulnerabilities/pipelines/v2_improvers/collect_ssvc_trees.py @@ -0,0 +1,112 @@ +import json +import logging +from pathlib import Path +from typing import Iterable, List + +from fetchcode.vcs import fetch_via_vcs + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.models import SSVC, AdvisoryV2 +from vulnerabilities.pipelines import VulnerableCodePipeline +from vulnerabilities.severity_systems import SCORING_SYSTEMS +from vulnerabilities.utils import ssvc_calculator + +logger = logging.getLogger(__name__) + + +class CollectSSVCPipeline(VulnerableCodePipeline): + """ + Collect SSVC Pipeline + + This pipeline collects SSVC from Vulnrichment project and associates them with existing advisories. + """ + + pipeline_id = "collect_ssvc" + spdx_license_expression = "CC0-1.0" + license_url = "https://github.com/cisagov/vulnrichment/blob/develop/LICENSE" + repo_url = "git+https://github.com/cisagov/vulnrichment.git" + + @classmethod + def steps(cls): + return ( + cls.clone, + cls.collect_ssvc_data, + cls.clean_downloads, + ) + + def clone(self): + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) + + def collect_ssvc_data(self): + self.log(self.vcs_response.dest_dir) + base_path = Path(self.vcs_response.dest_dir) + for file_path in base_path.glob("**/**/*.json"): + self.log(f"Processing file: {file_path}") + if not file_path.name.startswith("CVE-"): + continue + with open(file_path) as f: + raw_data = json.load(f) + file_name = file_path.name + # strip .json from file name + cve_id = file_name[:-5] + advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) + if not advisories: + self.log(f"No advisories found for CVE ID: {cve_id}") + continue + self.parse_cve_advisory(raw_data, advisories) + + def parse_cve_advisory(self, raw_data, advisories: List[AdvisoryV2]): + self.log(f"Processing CVE data") + cve_metadata = raw_data.get("cveMetadata", {}) + cve_id = cve_metadata.get("cveId") + + containers = raw_data.get("containers", {}) + adp_data = containers.get("adp", {}) + self.log(f"Processing ADP") + + metrics = [ + adp_metrics for data in adp_data for adp_metrics in data.get("metrics", []) + ] + + vulnrichment_scoring_system = { + "other": { + "ssvc": SCORING_SYSTEMS["ssvc"], + }, # ignore kev + } + + for metric in metrics: + self.log(metric) + self.log(f"Processing metric") + for metric_type, metric_value in metric.items(): + if metric_type not in vulnrichment_scoring_system: + continue + + if metric_type == "other": + other_types = metric_value.get("type") + self.log(f"Processing SSVC") + if other_types == "ssvc": + content = metric_value.get("content", {}) + options = content.get("options", {}) + vector_string, decision = ssvc_calculator(content) + advisories = list(AdvisoryV2.objects.filter(advisory_id=cve_id)) + if not advisories: + continue + ssvc_trees = [] + for advisory in advisories: + obj = SSVC( + advisory=advisory, + options=options, + decision=decision, + vector=vector_string, + ) + ssvc_trees.append(obj) + SSVC.objects.bulk_create(ssvc_trees, ignore_conflicts=True, batch_size=1000) + + def clean_downloads(self): + if self.vcs_response: + self.log("Removing cloned repository") + self.vcs_response.delete() + + def on_failure(self): + self.clean_downloads() diff --git a/vulnerabilities/utils.py b/vulnerabilities/utils.py index b65726a5d..67c6bc10f 100644 --- a/vulnerabilities/utils.py +++ b/vulnerabilities/utils.py @@ -26,6 +26,7 @@ from unittest.mock import MagicMock from urllib.parse import urljoin +import dateparser import requests import saneyaml import toml @@ -678,3 +679,117 @@ def create_registry(pipelines): registry[key] = pipeline return registry + + +def ssvc_calculator(ssvc_data): + """ + Return the ssvc vector and the decision value + """ + options = ssvc_data.get("options", []) + timestamp = ssvc_data.get("timestamp") + + # Extract the options into a dictionary + options_dict = {k: v.lower() for option in options for k, v in option.items()} + + # We copied the table value from this link. + # https://www.cisa.gov/sites/default/files/publications/cisa-ssvc-guide%20508c.pdf + + # Determining Mission and Well-Being Impact Value + mission_well_being_table = { + # (Mission Prevalence, Public Well-being Impact) : "Mission & Well-being" + ("minimal", "minimal"): "low", + ("minimal", "material"): "medium", + ("minimal", "irreversible"): "high", + ("support", "minimal"): "medium", + ("support", "material"): "medium", + ("support", "irreversible"): "high", + ("essential", "minimal"): "high", + ("essential", "material"): "high", + ("essential", "irreversible"): "high", + } + + if "Mission Prevalence" not in options_dict: + options_dict["Mission Prevalence"] = "minimal" + + if "Public Well-being Impact" not in options_dict: + options_dict["Public Well-being Impact"] = "material" + + options_dict["Mission & Well-being"] = mission_well_being_table[ + (options_dict["Mission Prevalence"], options_dict["Public Well-being Impact"]) + ] + + decision_key = ( + options_dict.get("Exploitation"), + options_dict.get("Automatable"), + options_dict.get("Technical Impact"), + options_dict.get("Mission & Well-being"), + ) + + decision_points = { + "Exploitation": {"E": {"none": "N", "poc": "P", "active": "A"}}, + "Automatable": {"A": {"no": "N", "yes": "Y"}}, + "Technical Impact": {"T": {"partial": "P", "total": "T"}}, + "Public Well-being Impact": {"B": {"minimal": "M", "material": "A", "irreversible": "I"}}, + "Mission Prevalence": {"P": {"minimal": "M", "support": "S", "essential": "E"}}, + "Mission & Well-being": {"M": {"low": "L", "medium": "M", "high": "H"}}, + } + + # Create the SSVC vector + ssvc_vector = "SSVCv2/" + for key, value_map in options_dict.items(): + options_key = decision_points.get(key) + for lhs, rhs_map in options_key.items(): + ssvc_vector += f"{lhs}:{rhs_map.get(value_map)}/" + + # "Decision": {"D": {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"}}, + decision_values = {"Track": "T", "Track*": "R", "Attend": "A", "Act": "C"} + + decision_lookup = { + ("none", "no", "partial", "low"): "Track", + ("none", "no", "partial", "medium"): "Track", + ("none", "no", "partial", "high"): "Track", + ("none", "no", "total", "low"): "Track", + ("none", "no", "total", "medium"): "Track", + ("none", "no", "total", "high"): "Track*", + ("none", "yes", "partial", "low"): "Track", + ("none", "yes", "partial", "medium"): "Track", + ("none", "yes", "partial", "high"): "Attend", + ("none", "yes", "total", "low"): "Track", + ("none", "yes", "total", "medium"): "Track", + ("none", "yes", "total", "high"): "Attend", + ("poc", "no", "partial", "low"): "Track", + ("poc", "no", "partial", "medium"): "Track", + ("poc", "no", "partial", "high"): "Track*", + ("poc", "no", "total", "low"): "Track", + ("poc", "no", "total", "medium"): "Track*", + ("poc", "no", "total", "high"): "Attend", + ("poc", "yes", "partial", "low"): "Track", + ("poc", "yes", "partial", "medium"): "Track", + ("poc", "yes", "partial", "high"): "Attend", + ("poc", "yes", "total", "low"): "Track", + ("poc", "yes", "total", "medium"): "Track*", + ("poc", "yes", "total", "high"): "Attend", + ("active", "no", "partial", "low"): "Track", + ("active", "no", "partial", "medium"): "Track", + ("active", "no", "partial", "high"): "Attend", + ("active", "no", "total", "low"): "Track", + ("active", "no", "total", "medium"): "Attend", + ("active", "no", "total", "high"): "Act", + ("active", "yes", "partial", "low"): "Attend", + ("active", "yes", "partial", "medium"): "Attend", + ("active", "yes", "partial", "high"): "Act", + ("active", "yes", "total", "low"): "Attend", + ("active", "yes", "total", "medium"): "Act", + ("active", "yes", "total", "high"): "Act", + } + + decision = decision_lookup.get(decision_key, "") + + if decision: + ssvc_vector += f"D:{decision_values.get(decision)}/" + + if timestamp: + timestamp_formatted = dateparser.parse(timestamp).strftime("%Y-%m-%dT%H:%M:%SZ") + + ssvc_vector += f"{timestamp_formatted}/" + return ssvc_vector, decision