cve_check.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. #
  2. # Copyright OpenEmbedded Contributors
  3. #
  4. # SPDX-License-Identifier: MIT
  5. #
  6. import collections
  7. import functools
  8. import itertools
  9. import os.path
  10. import re
  11. import oe.patch
  12. _Version = collections.namedtuple(
  13. "_Version", ["release", "patch_l", "pre_l", "pre_v"]
  14. )
  15. @functools.total_ordering
  16. class Version():
  17. def __init__(self, version, suffix=None):
  18. suffixes = ["alphabetical", "patch"]
  19. if str(suffix) == "alphabetical":
  20. version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(?P<patch_l>[a-z]))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
  21. elif str(suffix) == "patch":
  22. version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<patch>[-_\.]?(p|patch)(?P<patch_l>[0-9]+))?(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
  23. else:
  24. version_pattern = r"""r?v?(?:(?P<release>[0-9]+(?:[-\.][0-9]+)*)(?P<pre>[-_\.]?(?P<pre_l>(rc|alpha|beta|pre|preview|dev))[-_\.]?(?P<pre_v>[0-9]+)?)?)(.*)?"""
  25. regex = re.compile(r"^\s*" + version_pattern + r"\s*$", re.VERBOSE | re.IGNORECASE)
  26. match = regex.search(version)
  27. if not match:
  28. raise Exception("Invalid version: '{0}'".format(version))
  29. self._version = _Version(
  30. release=tuple(int(i) for i in match.group("release").replace("-",".").split(".")),
  31. patch_l=match.group("patch_l") if str(suffix) in suffixes and match.group("patch_l") else "",
  32. pre_l=match.group("pre_l"),
  33. pre_v=match.group("pre_v")
  34. )
  35. self._key = _cmpkey(
  36. self._version.release,
  37. self._version.patch_l,
  38. self._version.pre_l,
  39. self._version.pre_v
  40. )
  41. def __eq__(self, other):
  42. if not isinstance(other, Version):
  43. return NotImplemented
  44. return self._key == other._key
  45. def __gt__(self, other):
  46. if not isinstance(other, Version):
  47. return NotImplemented
  48. return self._key > other._key
  49. def _cmpkey(release, patch_l, pre_l, pre_v):
  50. # remove leading 0
  51. _release = tuple(
  52. reversed(list(itertools.dropwhile(lambda x: x == 0, reversed(release))))
  53. )
  54. _patch = patch_l.upper()
  55. if pre_l is None and pre_v is None:
  56. _pre = float('inf')
  57. else:
  58. _pre = float(pre_v) if pre_v else float('-inf')
  59. return _release, _patch, _pre
  60. def parse_cve_from_filename(patch_filename):
  61. """
  62. Parses CVE ID from the filename
  63. Matches the last "CVE-YYYY-ID" in the file name, also if written
  64. in lowercase. Possible to have multiple CVE IDs in a single
  65. file name, but only the last one will be detected from the file name.
  66. Returns the last CVE ID foudn in the filename. If no CVE ID is found
  67. an empty string is returned.
  68. """
  69. cve_file_name_match = re.compile(r".*(CVE-\d{4}-\d{4,})", re.IGNORECASE)
  70. # Check patch file name for CVE ID
  71. fname_match = cve_file_name_match.search(patch_filename)
  72. return fname_match.group(1).upper() if fname_match else ""
  73. def parse_cves_from_patch_contents(patch_contents):
  74. """
  75. Parses CVE IDs from patch contents
  76. Matches all CVE IDs contained on a line that starts with "CVE: ". Any
  77. delimiter (',', '&', "and", etc.) can be used without any issues. Multiple
  78. "CVE:" lines can also exist.
  79. Returns a set of all CVE IDs found in the patch contents.
  80. """
  81. cve_ids = set()
  82. cve_match = re.compile(r"CVE-\d{4}-\d{4,}")
  83. # Search for one or more "CVE: " lines
  84. for line in patch_contents.split("\n"):
  85. if not line.startswith("CVE:"):
  86. continue
  87. cve_ids.update(cve_match.findall(line))
  88. return cve_ids
  89. def parse_cves_from_patch_file(patch_file):
  90. """
  91. Parses CVE IDs associated with a particular patch file, using both the filename
  92. and patch contents.
  93. Returns a set of all CVE IDs found in the patch filename and contents.
  94. """
  95. cve_ids = set()
  96. filename_cve = parse_cve_from_filename(patch_file)
  97. if filename_cve:
  98. bb.debug(2, "Found %s from patch file name %s" % (filename_cve, patch_file))
  99. cve_ids.add(parse_cve_from_filename(patch_file))
  100. # Remote patches won't be present and compressed patches won't be
  101. # unpacked, so say we're not scanning them
  102. if not os.path.isfile(patch_file):
  103. bb.note("%s is remote or compressed, not scanning content" % patch_file)
  104. return cve_ids
  105. with open(patch_file, "r", encoding="utf-8") as f:
  106. try:
  107. patch_text = f.read()
  108. except UnicodeDecodeError:
  109. bb.debug(
  110. 1,
  111. "Failed to read patch %s using UTF-8 encoding"
  112. " trying with iso8859-1" % patch_file,
  113. )
  114. f.close()
  115. with open(patch_file, "r", encoding="iso8859-1") as f:
  116. patch_text = f.read()
  117. cve_ids.update(parse_cves_from_patch_contents(patch_text))
  118. if not cve_ids:
  119. bb.debug(2, "Patch %s doesn't solve CVEs" % patch_file)
  120. else:
  121. bb.debug(2, "Patch %s solves %s" % (patch_file, ", ".join(sorted(cve_ids))))
  122. return cve_ids
  123. @bb.parse.vardeps("CVE_STATUS")
  124. def get_patched_cves(d):
  125. """
  126. Determines the CVE IDs that have been solved by either patches incuded within
  127. SRC_URI or by setting CVE_STATUS.
  128. Returns a dictionary with the CVE IDs as keys and an associated dictonary of
  129. relevant metadata as the value.
  130. """
  131. patched_cves = {}
  132. patches = oe.patch.src_patches(d)
  133. bb.debug(2, "Scanning %d patches for CVEs" % len(patches))
  134. # Check each patch file
  135. for url in patches:
  136. patch_file = bb.fetch.decodeurl(url)[2]
  137. for cve_id in parse_cves_from_patch_file(patch_file):
  138. if cve_id not in patched_cves:
  139. patched_cves[cve_id] = {
  140. "abbrev-status": "Patched",
  141. "status": "fix-file-included",
  142. "resource": [patch_file],
  143. }
  144. else:
  145. patched_cves[cve_id]["resource"].append(patch_file)
  146. # Search for additional patched CVEs
  147. for cve_id in d.getVarFlags("CVE_STATUS") or {}:
  148. decoded_status = decode_cve_status(d, cve_id)
  149. products = d.getVar("CVE_PRODUCT")
  150. if has_cve_product_match(decoded_status, products):
  151. if cve_id in patched_cves:
  152. bb.warn(
  153. 'CVE_STATUS[%s] = "%s" is overwriting previous status of "%s: %s"'
  154. % (
  155. cve_id,
  156. d.getVarFlag("CVE_STATUS", cve_id),
  157. patched_cves[cve_id]["abbrev-status"],
  158. patched_cves[cve_id]["status"],
  159. )
  160. )
  161. patched_cves[cve_id] = {
  162. "abbrev-status": decoded_status["mapping"],
  163. "status": decoded_status["detail"],
  164. "justification": decoded_status["description"],
  165. "affected-vendor": decoded_status["vendor"],
  166. "affected-product": decoded_status["product"],
  167. }
  168. return patched_cves
  169. def get_cpe_ids(cve_product, version):
  170. """
  171. Get list of CPE identifiers for the given product and version
  172. """
  173. version = version.split("+git")[0]
  174. cpe_ids = []
  175. for product in cve_product.split():
  176. # CVE_PRODUCT in recipes may include vendor information for CPE identifiers. If not,
  177. # use wildcard for vendor.
  178. if ":" in product:
  179. vendor, product = product.split(":", 1)
  180. else:
  181. vendor = "*"
  182. cpe_id = 'cpe:2.3:*:{}:{}:{}:*:*:*:*:*:*:*'.format(vendor, product, version)
  183. cpe_ids.append(cpe_id)
  184. return cpe_ids
  185. def cve_check_merge_jsons(output, data):
  186. """
  187. Merge the data in the "package" property to the main data file
  188. output
  189. """
  190. if output["version"] != data["version"]:
  191. bb.error("Version mismatch when merging JSON outputs")
  192. return
  193. for product in output["package"]:
  194. if product["name"] == data["package"][0]["name"]:
  195. bb.error("Error adding the same package %s twice" % product["name"])
  196. return
  197. output["package"].append(data["package"][0])
  198. def update_symlinks(target_path, link_path):
  199. """
  200. Update a symbolic link link_path to point to target_path.
  201. Remove the link and recreate it if exist and is different.
  202. """
  203. if link_path != target_path and os.path.exists(target_path):
  204. if os.path.exists(os.path.realpath(link_path)):
  205. os.remove(link_path)
  206. os.symlink(os.path.basename(target_path), link_path)
  207. def convert_cve_version(version):
  208. """
  209. This function converts from CVE format to Yocto version format.
  210. eg 8.3_p1 -> 8.3p1, 6.2_rc1 -> 6.2-rc1
  211. Unless it is redefined using CVE_VERSION in the recipe,
  212. cve_check uses the version in the name of the recipe (${PV})
  213. to check vulnerabilities against a CVE in the database downloaded from NVD.
  214. When the version has an update, i.e.
  215. "p1" in OpenSSH 8.3p1,
  216. "-rc1" in linux kernel 6.2-rc1,
  217. the database stores the version as version_update (8.3_p1, 6.2_rc1).
  218. Therefore, we must transform this version before comparing to the
  219. recipe version.
  220. In this case, the parameter of the function is 8.3_p1.
  221. If the version uses the Release Candidate format, "rc",
  222. this function replaces the '_' by '-'.
  223. If the version uses the Update format, "p",
  224. this function removes the '_' completely.
  225. """
  226. import re
  227. matches = re.match('^([0-9.]+)_((p|rc)[0-9]+)$', version)
  228. if not matches:
  229. return version
  230. version = matches.group(1)
  231. update = matches.group(2)
  232. if matches.group(3) == "rc":
  233. return version + '-' + update
  234. return version + update
  235. @bb.parse.vardeps("CVE_STATUS", "CVE_CHECK_STATUSMAP")
  236. def decode_cve_status(d, cve):
  237. """
  238. Convert CVE_STATUS into status, vendor, product, detail and description.
  239. """
  240. status = d.getVarFlag("CVE_STATUS", cve)
  241. if not status:
  242. return {}
  243. status_split = status.split(':', 4)
  244. status_out = {}
  245. status_out["detail"] = status_split[0]
  246. product = "*"
  247. vendor = "*"
  248. description = ""
  249. if len(status_split) >= 4 and status_split[1].strip() == "cpe":
  250. # Both vendor and product are mandatory if cpe: present, the syntax is then:
  251. # detail: cpe:vendor:product:description
  252. vendor = status_split[2].strip()
  253. product = status_split[3].strip()
  254. description = status_split[4].strip()
  255. elif len(status_split) >= 2 and status_split[1].strip() == "cpe":
  256. # Malformed CPE
  257. bb.warn(
  258. 'Invalid CPE information for CVE_STATUS[%s] = "%s", not setting CPE'
  259. % (cve, status)
  260. )
  261. else:
  262. # Other case: no CPE, the syntax is then:
  263. # detail: description
  264. description = status.split(':', 1)[1].strip() if (len(status_split) > 1) else ""
  265. status_out["vendor"] = vendor
  266. status_out["product"] = product
  267. status_out["description"] = description
  268. detail = status_out["detail"]
  269. status_mapping = d.getVarFlag("CVE_CHECK_STATUSMAP", detail)
  270. if status_mapping is None:
  271. bb.warn(
  272. 'Invalid detail "%s" for CVE_STATUS[%s] = "%s", fallback to Unpatched'
  273. % (detail, cve, status)
  274. )
  275. status_mapping = "Unpatched"
  276. status_out["mapping"] = status_mapping
  277. return status_out
  278. def has_cve_product_match(detailed_status, products):
  279. """
  280. Check product/vendor match between detailed_status from decode_cve_status and a string of
  281. products (like from CVE_PRODUCT)
  282. """
  283. for product in products.split():
  284. vendor = "*"
  285. if ":" in product:
  286. vendor, product = product.split(":", 1)
  287. if (vendor == detailed_status["vendor"] or detailed_status["vendor"] == "*") and \
  288. (product == detailed_status["product"] or detailed_status["product"] == "*"):
  289. return True
  290. #if no match, return False
  291. return False
  292. def extend_cve_status(d):
  293. # do this only once in case multiple classes use this
  294. if d.getVar("CVE_STATUS_EXTENDED"):
  295. return
  296. d.setVar("CVE_STATUS_EXTENDED", "1")
  297. # Fallback all CVEs from CVE_CHECK_IGNORE to CVE_STATUS
  298. cve_check_ignore = d.getVar("CVE_CHECK_IGNORE")
  299. if cve_check_ignore:
  300. bb.warn("CVE_CHECK_IGNORE is deprecated in favor of CVE_STATUS")
  301. for cve in (d.getVar("CVE_CHECK_IGNORE") or "").split():
  302. d.setVarFlag("CVE_STATUS", cve, "ignored")
  303. # Process CVE_STATUS_GROUPS to set multiple statuses and optional detail or description at once
  304. for cve_status_group in (d.getVar("CVE_STATUS_GROUPS") or "").split():
  305. cve_group = d.getVar(cve_status_group)
  306. if cve_group is not None:
  307. for cve in cve_group.split():
  308. d.setVarFlag("CVE_STATUS", cve, d.getVarFlag(cve_status_group, "status"))
  309. else:
  310. bb.warn("CVE_STATUS_GROUPS contains undefined variable %s" % cve_status_group)