123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- #
- # Copyright OpenEmbedded Contributors
- #
- # SPDX-License-Identifier: MIT
- #
- import collections
- import functools
- import itertools
- import os.path
- import re
- import oe.patch
- _Version = collections.namedtuple(
- "_Version", ["release", "patch_l", "pre_l", "pre_v"]
- )
- @functools.total_ordering
- class Version():
- def __init__(self, version, suffix=None):
- suffixes = ["alphabetical", "patch"]
- if str(suffix) == "alphabetical":
- version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
- elif str(suffix) == "patch":
- version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
- else:
- version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
- regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
- match = regex.search(version)
- if not match:
- raise Exception("Invalid version: '{0}'".format(version))
- self._version = _Version(
- release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
- patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
- pre_l=match.group("pre_l"),
- pre_v=match.group("pre_v")
- )
- self._key = _cmpkey(
- self._version.release,
- self._version.patch_l,
- self._version.pre_l,
- self._version.pre_v
- )
- def __eq__(self, other):
- if not isinstance(other, Version):
- return NotImplemented
- return self._key == other._key
- def __gt__(self, other):
- if not isinstance(other, Version):
- return NotImplemented
- return self._key > other._key
- def _cmpkey(release, patch_l, pre_l, pre_v):
- # remove leading 0
- _release = tuple(
- reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
- )
- _patch = patch_l.upper()
- if pre_l is None and pre_v is None:
- _pre = float('inf')
- else:
- _pre = float(pre_v) if pre_v else float('-inf')
- return _release, _patch, _pre
- def parse_cve_from_filename(patch_filename):
- """
- Parses CVE ID from the filename
- Matches the last "CVE-YYYY-ID" in the file name, also if written
- in lowercase. Possible to have multiple CVE IDs in a single
- file name, but only the last one will be detected from the file name.
- Returns the last CVE ID foudn in the filename. If no CVE ID is found
- an empty string is returned.
- """
- cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
- # Check patch file name for CVE ID
- fname_match = cve_file_name_match.search(patch_filename)
- return fname_match.group(1).upper() if fname_match else ""
- def parse_cves_from_patch_contents(patch_contents):
- """
- Parses CVE IDs from patch contents
- Matches all CVE IDs contained on a line that starts with "CVE: ". Any
- delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
- "CVE:" lines can also exist.
- Returns a set of all CVE IDs found in the patch contents.
- """
- cve_ids = set()
- cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
- # Search for one or more "CVE: " lines
- for line in patch_contents.split("\n"):
- if not line.startswith("CVE:"):
- continue
- cve_ids.update(cve_match.findall(line))
- return cve_ids
- def parse_cves_from_patch_file(patch_file):
- """
- Parses CVE IDs associated with a particular patch file, using both the filename
- and patch contents.
- Returns a set of all CVE IDs found in the patch filename and contents.
- """
- cve_ids = set()
- filename_cve = parse_cve_from_filename(patch_file)
- if filename_cve:
- bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
- cve_ids.add(parse_cve_from_filename(patch_file))
- # Remote patches won't be present and compressed patches won't be
- # unpacked, so say we're not scanning them
- if not os.path.isfile(patch_file):
- bb.note("%s is remote or compressed, not scanning content" % patch_file)
- return cve_ids
- with open(patch_file, "r", encoding="utf-8") as f:
- try:
- patch_text = f.read()
- except UnicodeDecodeError:
- bb.debug(
- 1,
- "Failed to read patch %s using UTF-8 encoding"
- " trying with iso8859-1" % patch_file,
- )
- f.close()
- with open(patch_file, "r", encoding="iso8859-1") as f:
- patch_text = f.read()
- cve_ids.update(parse_cves_from_patch_contents(patch_text))
- if not cve_ids:
- bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
- else:
- bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
- return cve_ids
- @bb.parse.vardeps("CVE_STATUS")
- def get_patched_cves(d):
- """
- Determines the CVE IDs that have been solved by either patches incuded within
- SRC_URI or by setting CVE_STATUS.
- Returns a dictionary with the CVE IDs as keys and an associated dictonary of
- relevant metadata as the value.
- """
- patched_cves = {}
- patches = oe.patch.src_patches(d)
- bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
- # Check each patch file
- for url in patches:
- patch_file = bb.fetch.decodeurl(url)[2]
- for cve_id in parse_cves_from_patch_file(patch_file):
- if cve_id not in patched_cves:
- patched_cves[cve_id] = {
- "abbrev-status": "Patched",
- "status": "fix-file-included",
- "resource": [patch_file],
- }
- else:
- patched_cves[cve_id]["resource"].append(patch_file)
- # Search for additional patched CVEs
- for cve_id in d.getVarFlags("CVE_STATUS") or {}:
- decoded_status = decode_cve_status(d, cve_id)
- products = d.getVar("CVE_PRODUCT")
- if has_cve_product_match(decoded_status, products):
- if cve_id in patched_cves:
- bb.warn(
- 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
- % (
- cve_id,
- d.getVarFlag("CVE_STATUS", cve_id),
- patched_cves[cve_id]["abbrev-status"],
- patched_cves[cve_id]["status"],
- )
- )
- patched_cves[cve_id] = {
- "abbrev-status": decoded_status["mapping"],
- "status": decoded_status["detail"],
- "justification": decoded_status["description"],
- "affected-vendor": decoded_status["vendor"],
- "affected-product": decoded_status["product"],
- }
- return patched_cves
- def get_cpe_ids(cve_product, version):
- """
- Get list of CPE identifiers for the given product and version
- """
- version = version.split("+git")[0]
- cpe_ids = []
- for product in cve_product.split():
- # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
- # use wildcard for vendor.
- if ":" in product:
- vendor, product = product.split(":", 1)
- else:
- vendor = "*"
- cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
- cpe_ids.append(cpe_id)
- return cpe_ids
- def cve_check_merge_jsons(output, data):
- """
- Merge the data in the "package" property to the main data file
- output
- """
- if output["version"] != data["version"]:
- bb.error("Version mismatch when merging JSON outputs")
- return
- for product in output["package"]:
- if product["name"] == data["package"][0]["name"]:
- bb.error("Error adding the same package %s twice" % product["name"])
- return
- output["package"].append(data["package"][0])
- def update_symlinks(target_path, link_path):
- """
- Update a symbolic link link_path to point to target_path.
- Remove the link and recreate it if exist and is different.
- """
- if link_path != target_path and os.path.exists(target_path):
- if os.path.exists(os.path.realpath(link_path)):
- os.remove(link_path)
- os.symlink(os.path.basename(target_path), link_path)
- def convert_cve_version(version):
- """
- This function converts from CVE format to Yocto version format.
- eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
- Unless it is redefined using CVE_VERSION in the recipe,
- cve_check uses the version in the name of the recipe (${PV})
- to check vulnerabilities against a CVE in the database downloaded from NVD.
- When the version has an update, i.e.
- "p1" in OpenSSH 8.3p1,
- "-rc1" in linux kernel 6.2-rc1,
- the database stores the version as version_update (8.3_p1, 6.2_rc1).
- Therefore, we must transform this version before comparing to the
- recipe version.
- In this case, the parameter of the function is 8.3_p1.
- If the version uses the Release Candidate format, "rc",
- this function replaces the '_' by '-'.
- If the version uses the Update format, "p",
- this function removes the '_' completely.
- """
- import re
- matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
- if not matches:
- return version
- version = matches.group(1)
- update = matches.group(2)
- if matches.group(3) == "rc":
- return version + '-' + update
- return version + update
- @bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP")
- def decode_cve_status(d, cve):
- """
- Convert CVE_STATUS into status, vendor, product, detail and description.
- """
- status = d.getVarFlag("CVE_STATUS", cve)
- if not status:
- return {}
- status_split = status.split(':', 4)
- status_out = {}
- status_out["detail"] = status_split[0]
- product = "*"
- vendor = "*"
- description = ""
- if len(status_split) >= 4 and status_split[1].strip() == "cpe":
- # Both vendor and product are mandatory if cpe: present, the syntax is then:
- # detail: cpe:vendor:product:description
- vendor = status_split[2].strip()
- product = status_split[3].strip()
- description = status_split[4].strip()
- elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
- # Malformed CPE
- bb.warn(
- 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
- % (cve, status)
- )
- else:
- # Other case: no CPE, the syntax is then:
- # detail: description
- description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
- status_out["vendor"] = vendor
- status_out["product"] = product
- status_out["description"] = description
- detail = status_out["detail"]
- status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
- if status_mapping is None:
- bb.warn(
- 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
- % (detail, cve, status)
- )
- status_mapping = "Unpatched"
- status_out["mapping"] = status_mapping
- return status_out
- def has_cve_product_match(detailed_status, products):
- """
- Check product/vendor match between detailed_status from decode_cve_status and a string of
- products (like from CVE_PRODUCT)
- """
- for product in products.split():
- vendor = "*"
- if ":" in product:
- vendor, product = product.split(":", 1)
- if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
- (product == detailed_status["product"] or detailed_status["product"] == "*"):
- return True
- #if no match, return False
- return False
- def extend_cve_status(d):
- # do this only once in case multiple classes use this
- if d.getVar("CVE_STATUS_EXTENDED"):
- return
- d.setVar("CVE_STATUS_EXTENDED", "1")
- # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS
- cve_check_ignore = d.getVar("CVE_CHECK_IGNORE")
- if cve_check_ignore:
- bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS")
- for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split():
- d.setVarFlag("CVE_STATUS", cve, "ignored")
- # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once
- for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split():
- cve_group = d.getVar(cve_status_group)
- if cve_group is not None:
- for cve in cve_group.split():
- d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status"))
- else:
- bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group)
|