|
@@ -0,0 +1,467 @@
|
|
|
+#! /usr/bin/env python3
|
|
|
+#
|
|
|
+# Copyright OpenEmbedded Contributors
|
|
|
+#
|
|
|
+# The script uses another source of CVE information from linux-vulns
|
|
|
+# to enrich the cve-summary from cve-check or vex.
|
|
|
+# It can also use the list of compiled files from the kernel spdx to ignore CVEs
|
|
|
+# that are not affected since the files are not compiled.
|
|
|
+#
|
|
|
+# It creates a new json file with updated CVE information
|
|
|
+#
|
|
|
+# Compiled files can be extracted adding the following in local.conf
|
|
|
+# SPDX_INCLUDE_COMPILED_SOURCES:pn-linux-yocto = "1"
|
|
|
+#
|
|
|
+# Tested with the following CVE sources:
|
|
|
+# - https://git.kernel.org/pub/scm/linux/security/vulns.git
|
|
|
+# - https://github.com/CVEProject/cvelistV5
|
|
|
+#
|
|
|
+# Example:
|
|
|
+# python3 ./openembedded-core/scripts/contrib/improve_kernel_cve_report.py --spdx tmp/deploy/spdx/3.0.1/qemux86_64/recipes/recipe-linux-yocto.spdx.json --kernel-version 6.12.27 --datadir ./vulns
|
|
|
+# python3 ./openembedded-core/scripts/contrib/improve_kernel_cve_report.py --spdx tmp/deploy/spdx/3.0.1/qemux86_64/recipes/recipe-linux-yocto.spdx.json --datadir ./vulns --old-cve-report build/tmp/log/cve/cve-summary.json
|
|
|
+#
|
|
|
+# SPDX-License-Identifier: GPLv2
|
|
|
+
|
|
|
+import argparse
|
|
|
+import json
|
|
|
+import sys
|
|
|
+import logging
|
|
|
+import glob
|
|
|
+import os
|
|
|
+import pathlib
|
|
|
+from packaging.version import Version
|
|
|
+
|
|
|
+def is_linux_cve(cve_info):
|
|
|
+ '''Return true is the CVE belongs to Linux'''
|
|
|
+ if not "affected" in cve_info["containers"]["cna"]:
|
|
|
+ return False
|
|
|
+ for affected in cve_info["containers"]["cna"]["affected"]:
|
|
|
+ if not "product" in affected:
|
|
|
+ return False
|
|
|
+ if affected["product"] == "Linux" and affected["vendor"] == "Linux":
|
|
|
+ return True
|
|
|
+ return False
|
|
|
+
|
|
|
+def get_kernel_cves(datadir, compiled_files, version):
|
|
|
+ """
|
|
|
+ Get CVEs for the kernel
|
|
|
+ """
|
|
|
+ cves = {}
|
|
|
+
|
|
|
+ check_config = len(compiled_files) > 0
|
|
|
+
|
|
|
+ base_version = Version(f"{version.major}.{version.minor}")
|
|
|
+
|
|
|
+ # Check all CVES from kernel vulns
|
|
|
+ pattern = os.path.join(datadir, '**', "CVE-*.json")
|
|
|
+ cve_files = glob.glob(pattern, recursive=True)
|
|
|
+ not_applicable_config = 0
|
|
|
+ fixed_as_later_backport = 0
|
|
|
+ vulnerable = 0
|
|
|
+ not_vulnerable = 0
|
|
|
+ for cve_file in sorted(cve_files):
|
|
|
+ cve_info = {}
|
|
|
+ with open(cve_file, "r", encoding='ISO-8859-1') as f:
|
|
|
+ cve_info = json.load(f)
|
|
|
+
|
|
|
+ if len(cve_info) == 0:
|
|
|
+ logging.error("Not valid data in %s. Aborting", cve_file)
|
|
|
+ break
|
|
|
+
|
|
|
+ if not is_linux_cve(cve_info):
|
|
|
+ continue
|
|
|
+ cve_id = os.path.basename(cve_file)[:-5]
|
|
|
+ description = cve_info["containers"]["cna"]["descriptions"][0]["value"]
|
|
|
+ if cve_file.find("rejected") >= 0:
|
|
|
+ logging.debug("%s is rejected by the CNA", cve_id)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Ignored",
|
|
|
+ "detail": "rejected",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"Rejected by CNA"
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ if any(elem in cve_file for elem in ["review", "reverved", "testing"]):
|
|
|
+ continue
|
|
|
+
|
|
|
+ is_vulnerable, first_affected, last_affected, better_match_first, better_match_last, affected_versions = get_cpe_applicability(cve_info, version)
|
|
|
+
|
|
|
+ logging.debug("%s: %s (%s - %s) (%s - %s)", cve_id, is_vulnerable, better_match_first, better_match_last, first_affected, last_affected)
|
|
|
+
|
|
|
+ if is_vulnerable is None:
|
|
|
+ logging.warning("%s doesn't have good metadata", cve_id)
|
|
|
+ if is_vulnerable:
|
|
|
+ is_affected = True
|
|
|
+ affected_files = []
|
|
|
+ if check_config:
|
|
|
+ is_affected, affected_files = check_kernel_compiled_files(compiled_files, cve_info)
|
|
|
+
|
|
|
+ if not is_affected and len(affected_files) > 0:
|
|
|
+ logging.debug(
|
|
|
+ "%s - not applicable configuration since affected files not compiled: %s",
|
|
|
+ cve_id, affected_files)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Ignored",
|
|
|
+ "detail": "not-applicable-config",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"Source code not compiled by config. {affected_files}"
|
|
|
+ }
|
|
|
+ not_applicable_config +=1
|
|
|
+ # Check if we have backport
|
|
|
+ else:
|
|
|
+ if not better_match_last:
|
|
|
+ fixed_in = last_affected
|
|
|
+ else:
|
|
|
+ fixed_in = better_match_last
|
|
|
+ logging.debug("%s needs backporting (fixed from %s)", cve_id, fixed_in)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Unpatched",
|
|
|
+ "detail": "version-in-range",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"Needs backporting (fixed from {fixed_in})"
|
|
|
+ }
|
|
|
+ vulnerable += 1
|
|
|
+ if (better_match_last and
|
|
|
+ Version(f"{better_match_last.major}.{better_match_last.minor}") == base_version):
|
|
|
+ fixed_as_later_backport += 1
|
|
|
+ # Not vulnerable
|
|
|
+ else:
|
|
|
+ if not first_affected:
|
|
|
+ logging.debug("%s - not known affected %s",
|
|
|
+ cve_id,
|
|
|
+ better_match_last)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Patched",
|
|
|
+ "detail": "version-not-in-range",
|
|
|
+ "summary": description,
|
|
|
+ "description": "No CPE match"
|
|
|
+ }
|
|
|
+ not_vulnerable += 1
|
|
|
+ continue
|
|
|
+ backport_base = Version(f"{better_match_last.major}.{better_match_last.minor}")
|
|
|
+ if version < first_affected:
|
|
|
+ logging.debug('%s - fixed-version: only affects %s onwards',
|
|
|
+ cve_id,
|
|
|
+ first_affected)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Patched",
|
|
|
+ "detail": "fixed-version",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"only affects {first_affected} onwards"
|
|
|
+ }
|
|
|
+ not_vulnerable += 1
|
|
|
+ elif last_affected <= version:
|
|
|
+ logging.debug("%s - fixed-version: Fixed from version %s",
|
|
|
+ cve_id,
|
|
|
+ last_affected)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Patched",
|
|
|
+ "detail": "fixed-version",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"fixed-version: Fixed from version {last_affected}"
|
|
|
+ }
|
|
|
+ not_vulnerable += 1
|
|
|
+ elif backport_base == base_version:
|
|
|
+ logging.debug("%s - cpe-stable-backport: Backported in %s",
|
|
|
+ cve_id,
|
|
|
+ better_match_last)
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Patched",
|
|
|
+ "detail": "cpe-stable-backport",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"Backported in {better_match_last}"
|
|
|
+ }
|
|
|
+ not_vulnerable += 1
|
|
|
+ else:
|
|
|
+ logging.debug("%s - version not affected %s", cve_id, str(affected_versions))
|
|
|
+ cves[cve_id] = {
|
|
|
+ "id": cve_id,
|
|
|
+ "status": "Patched",
|
|
|
+ "detail": "version-not-in-range",
|
|
|
+ "summary": description,
|
|
|
+ "description": f"Range {affected_versions}"
|
|
|
+ }
|
|
|
+ not_vulnerable += 1
|
|
|
+
|
|
|
+ logging.info("Total CVEs ignored due to not applicable config: %d", not_applicable_config)
|
|
|
+ logging.info("Total CVEs not vulnerable due version-not-in-range: %d", not_vulnerable)
|
|
|
+ logging.info("Total vulnerable CVEs: %d", vulnerable)
|
|
|
+
|
|
|
+ logging.info("Total CVEs already backported in %s: %s", base_version,
|
|
|
+ fixed_as_later_backport)
|
|
|
+ return cves
|
|
|
+
|
|
|
+def read_spdx(spdx_file):
|
|
|
+ '''Open SPDX file and extract compiled files'''
|
|
|
+ with open(spdx_file, 'r', encoding='ISO-8859-1') as f:
|
|
|
+ spdx = json.load(f)
|
|
|
+ if "spdxVersion" in spdx:
|
|
|
+ if spdx["spdxVersion"] == "SPDX-2.2":
|
|
|
+ return read_spdx2(spdx)
|
|
|
+ if "@graph" in spdx:
|
|
|
+ return read_spdx3(spdx)
|
|
|
+ return []
|
|
|
+
|
|
|
+def read_spdx2(spdx):
|
|
|
+ '''
|
|
|
+ Read spdx2 compiled files from spdx
|
|
|
+ '''
|
|
|
+ cfiles = []
|
|
|
+ if 'files' not in spdx:
|
|
|
+ return cfiles
|
|
|
+ for item in spdx['files']:
|
|
|
+ for ftype in item['fileTypes']:
|
|
|
+ if ftype == "SOURCE":
|
|
|
+ filename = item["fileName"][item["fileName"].find("/")+1:]
|
|
|
+ cfiles.append(filename)
|
|
|
+ return cfiles
|
|
|
+
|
|
|
+def read_spdx3(spdx):
|
|
|
+ '''
|
|
|
+ Read spdx3 compiled files from spdx
|
|
|
+ '''
|
|
|
+ cfiles = []
|
|
|
+ for item in spdx["@graph"]:
|
|
|
+ if "software_primaryPurpose" not in item:
|
|
|
+ continue
|
|
|
+ if item["software_primaryPurpose"] == "source":
|
|
|
+ filename = item['name'][item['name'].find("/")+1:]
|
|
|
+ cfiles.append(filename)
|
|
|
+ return cfiles
|
|
|
+
|
|
|
+def check_kernel_compiled_files(compiled_files, cve_info):
|
|
|
+ """
|
|
|
+ Return if a CVE affected us depending on compiled files
|
|
|
+ """
|
|
|
+ files_affected = []
|
|
|
+ is_affected = False
|
|
|
+
|
|
|
+ for item in cve_info['containers']['cna']['affected']:
|
|
|
+ if "programFiles" in item:
|
|
|
+ for f in item['programFiles']:
|
|
|
+ if f not in files_affected:
|
|
|
+ files_affected.append(f)
|
|
|
+
|
|
|
+ if len(files_affected) > 0:
|
|
|
+ for f in files_affected:
|
|
|
+ if f in compiled_files:
|
|
|
+ logging.debug("File match: %s", f)
|
|
|
+ is_affected = True
|
|
|
+ return is_affected, files_affected
|
|
|
+
|
|
|
+def get_cpe_applicability(cve_info, v):
|
|
|
+ '''
|
|
|
+ Check if version is affected and return affected versions
|
|
|
+ '''
|
|
|
+ base_branch = Version(f"{v.major}.{v.minor}")
|
|
|
+ affected = []
|
|
|
+ if not 'cpeApplicability' in cve_info["containers"]["cna"]:
|
|
|
+ return None, None, None, None, None, None
|
|
|
+
|
|
|
+ for nodes in cve_info["containers"]["cna"]["cpeApplicability"]:
|
|
|
+ for node in nodes.values():
|
|
|
+ vulnerable = False
|
|
|
+ matched_branch = False
|
|
|
+ first_affected = Version("5000")
|
|
|
+ last_affected = Version("0")
|
|
|
+ better_match_first = Version("0")
|
|
|
+ better_match_last = Version("5000")
|
|
|
+
|
|
|
+ if len(node[0]['cpeMatch']) == 0:
|
|
|
+ first_affected = None
|
|
|
+ last_affected = None
|
|
|
+ better_match_first = None
|
|
|
+ better_match_last = None
|
|
|
+
|
|
|
+ for cpe_match in node[0]['cpeMatch']:
|
|
|
+ version_start_including = Version("0")
|
|
|
+ version_end_excluding = Version("0")
|
|
|
+ if 'versionStartIncluding' in cpe_match:
|
|
|
+ version_start_including = Version(cpe_match['versionStartIncluding'])
|
|
|
+ else:
|
|
|
+ version_start_including = Version("0")
|
|
|
+ # if versionEndExcluding is missing we are in a branch, which is not fixed.
|
|
|
+ if "versionEndExcluding" in cpe_match:
|
|
|
+ version_end_excluding = Version(cpe_match["versionEndExcluding"])
|
|
|
+ else:
|
|
|
+ # if versionEndExcluding is missing we are in a branch, which is not fixed.
|
|
|
+ version_end_excluding = Version(
|
|
|
+ f"{version_start_including.major}.{version_start_including.minor}.5000"
|
|
|
+ )
|
|
|
+ affected.append(f" {version_start_including}-{version_end_excluding}")
|
|
|
+ # Detect if versionEnd is in fixed in base branch. It has precedence over the rest
|
|
|
+ branch_end = Version(f"{version_end_excluding.major}.{version_end_excluding.minor}")
|
|
|
+ if branch_end == base_branch:
|
|
|
+ if version_start_including <= v < version_end_excluding:
|
|
|
+ vulnerable = cpe_match['vulnerable']
|
|
|
+ # If we don't match in our branch, we are not vulnerable,
|
|
|
+ # since we have a backport
|
|
|
+ matched_branch = True
|
|
|
+ better_match_first = version_start_including
|
|
|
+ better_match_last = version_end_excluding
|
|
|
+ if version_start_including <= v < version_end_excluding and not matched_branch:
|
|
|
+ if version_end_excluding < better_match_last:
|
|
|
+ better_match_first = max(version_start_including, better_match_first)
|
|
|
+ better_match_last = min(better_match_last, version_end_excluding)
|
|
|
+ vulnerable = cpe_match['vulnerable']
|
|
|
+ matched_branch = True
|
|
|
+
|
|
|
+ first_affected = min(version_start_including, first_affected)
|
|
|
+ last_affected = max(version_end_excluding, last_affected)
|
|
|
+ # Not a better match, we use the first and last affected instead of the fake .5000
|
|
|
+ if vulnerable and better_match_last == Version(f"{base_branch}.5000"):
|
|
|
+ better_match_last = last_affected
|
|
|
+ better_match_first = first_affected
|
|
|
+ return vulnerable, first_affected, last_affected, better_match_first, better_match_last, affected
|
|
|
+
|
|
|
+def copy_data(old, new):
|
|
|
+ '''Update dictionary with new entries, while keeping the old ones'''
|
|
|
+ for k in new.keys():
|
|
|
+ old[k] = new[k]
|
|
|
+ return old
|
|
|
+
|
|
|
+# Function taken from cve_check.bbclass. Adapted to cve fields
|
|
|
+def cve_update(cve_data, cve, entry):
|
|
|
+ # If no entry, just add it
|
|
|
+ if cve not in cve_data:
|
|
|
+ cve_data[cve] = entry
|
|
|
+ return
|
|
|
+ # If we are updating, there might be change in the status
|
|
|
+ if cve_data[cve]['status'] == "Unknown":
|
|
|
+ cve_data[cve] = copy_data(cve_data[cve], entry)
|
|
|
+ return
|
|
|
+ if cve_data[cve]['status'] == entry['status']:
|
|
|
+ return
|
|
|
+ if entry['status'] == "Unpatched" and cve_data[cve]['status'] == "Patched":
|
|
|
+ logging.warning("CVE entry %s update from Patched to Unpatched from the scan result", cve)
|
|
|
+ cve_data[cve] = copy_data(cve_data[cve], entry)
|
|
|
+ return
|
|
|
+ if entry['status'] == "Patched" and cve_data[cve]['status'] == "Unpatched":
|
|
|
+ logging.warning("CVE entry %s update from Unpatched to Patched from the scan result", cve)
|
|
|
+ cve_data[cve] = copy_data(cve_data[cve], entry)
|
|
|
+ return
|
|
|
+ # If we have an "Ignored", it has a priority
|
|
|
+ if cve_data[cve]['status'] == "Ignored":
|
|
|
+ logging.debug("CVE %s not updating because Ignored", cve)
|
|
|
+ return
|
|
|
+ # If we have an "Ignored", it has a priority
|
|
|
+ if entry['status'] == "Ignored":
|
|
|
+ cve_data[cve] = copy_data(cve_data[cve], entry)
|
|
|
+ logging.debug("CVE entry %s updated from Unpatched to Ignored", cve)
|
|
|
+ return
|
|
|
+ logging.warning("Unhandled CVE entry update for %s %s from %s %s to %s",
|
|
|
+ cve, cve_data[cve]['status'], cve_data[cve]['detail'], entry['status'], entry['detail'])
|
|
|
+
|
|
|
+def main():
|
|
|
+ parser = argparse.ArgumentParser(
|
|
|
+ description="Update cve-summary with kernel compiled files and kernel CVE information"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "-s",
|
|
|
+ "--spdx",
|
|
|
+ help="SPDX2/3 for the kernel. Needs to include compiled sources",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--datadir",
|
|
|
+ type=pathlib.Path,
|
|
|
+ help="Directory where CVE data is",
|
|
|
+ required=True
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--old-cve-report",
|
|
|
+ help="CVE report to update. (Optional)",
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--kernel-version",
|
|
|
+ help="Kernel version. Needed if old cve_report is not provided (Optional)",
|
|
|
+ type=Version
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "--new-cve-report",
|
|
|
+ help="Output file",
|
|
|
+ default="cve-summary-enhance.json"
|
|
|
+ )
|
|
|
+ parser.add_argument(
|
|
|
+ "-D",
|
|
|
+ "--debug",
|
|
|
+ help='Enable debug ',
|
|
|
+ action="store_true")
|
|
|
+
|
|
|
+ args = parser.parse_args()
|
|
|
+
|
|
|
+ if args.debug:
|
|
|
+ log_level=logging.DEBUG
|
|
|
+ else:
|
|
|
+ log_level=logging.INFO
|
|
|
+ logging.basicConfig(format='[%(filename)s:%(lineno)d] %(message)s', level=log_level)
|
|
|
+
|
|
|
+ if not args.kernel_version and not args.old_cve_report:
|
|
|
+ parser.error("either --kernel-version or --old-cve-report are needed")
|
|
|
+ return -1
|
|
|
+
|
|
|
+ # by default we don't check the compiled files, unless provided
|
|
|
+ compiled_files = []
|
|
|
+ if args.spdx:
|
|
|
+ compiled_files = read_spdx(args.spdx)
|
|
|
+ logging.info("Total compiled files %d", len(compiled_files))
|
|
|
+
|
|
|
+ if args.old_cve_report:
|
|
|
+ with open(args.old_cve_report, encoding='ISO-8859-1') as f:
|
|
|
+ cve_report = json.load(f)
|
|
|
+ else:
|
|
|
+ #If summary not provided, we create one
|
|
|
+ cve_report = {
|
|
|
+ "version": "1",
|
|
|
+ "package": [
|
|
|
+ {
|
|
|
+ "name": "linux-yocto",
|
|
|
+ "version": str(args.kernel_version),
|
|
|
+ "products": [
|
|
|
+ {
|
|
|
+ "product": "linux_kernel",
|
|
|
+ "cvesInRecord": "Yes"
|
|
|
+ }
|
|
|
+ ],
|
|
|
+ "issue": []
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ }
|
|
|
+
|
|
|
+ for pkg in cve_report['package']:
|
|
|
+ is_kernel = False
|
|
|
+ for product in pkg['products']:
|
|
|
+ if product['product'] == "linux_kernel":
|
|
|
+ is_kernel=True
|
|
|
+ if not is_kernel:
|
|
|
+ continue
|
|
|
+
|
|
|
+ kernel_cves = get_kernel_cves(args.datadir,
|
|
|
+ compiled_files,
|
|
|
+ Version(pkg["version"]))
|
|
|
+ logging.info("Total kernel cves from kernel CNA: %s", len(kernel_cves))
|
|
|
+ cves = {issue["id"]: issue for issue in pkg["issue"]}
|
|
|
+ logging.info("Total kernel before processing cves: %s", len(cves))
|
|
|
+
|
|
|
+ for cve in kernel_cves:
|
|
|
+ cve_update(cves, cve, kernel_cves[cve])
|
|
|
+
|
|
|
+ pkg["issue"] = []
|
|
|
+ for cve in sorted(cves):
|
|
|
+ pkg["issue"].extend([cves[cve]])
|
|
|
+ logging.info("Total kernel cves after processing: %s", len(pkg['issue']))
|
|
|
+
|
|
|
+ with open(args.new_cve_report, "w", encoding='ISO-8859-1') as f:
|
|
|
+ json.dump(cve_report, f, indent=2)
|
|
|
+
|
|
|
+ return 0
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ sys.exit(main())
|
|
|
+
|