patchtest 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. #!/usr/bin/env python3
  2. # ex:ts=4:sw=4:sts=4:et
  3. # -*- tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*-
  4. #
  5. # patchtest: execute all unittest test cases discovered for a single patch
  6. #
  7. # Copyright (C) 2016 Intel Corporation
  8. #
  9. # SPDX-License-Identifier: GPL-2.0-only
  10. #
  11. import sys
  12. import os
  13. import unittest
  14. import fileinput
  15. import logging
  16. import traceback
  17. import json
  18. # Include current path so test cases can see it
  19. sys.path.insert(0, os.path.dirname(os.path.realpath(__file__)))
  20. # Include patchtest library
  21. sys.path.insert(0, os.path.join(os.path.dirname(os.path.realpath(__file__)), '../meta/lib/patchtest'))
  22. from data import PatchTestInput
  23. from repo import PatchTestRepo
  24. import utils
  25. logger = utils.logger_create('patchtest')
  26. info = logger.info
  27. error = logger.error
  28. import repo
  29. def getResult(patch, mergepatch, logfile=None):
  30. class PatchTestResult(unittest.TextTestResult):
  31. """ Patchtest TextTestResult """
  32. shouldStop = True
  33. longMessage = False
  34. success = 'PASS'
  35. fail = 'FAIL'
  36. skip = 'SKIP'
  37. def startTestRun(self):
  38. # let's create the repo already, it can be used later on
  39. repoargs = {
  40. 'repodir': PatchTestInput.repodir,
  41. 'commit' : PatchTestInput.basecommit,
  42. 'branch' : PatchTestInput.basebranch,
  43. 'patch' : patch,
  44. }
  45. self.repo_error = False
  46. self.test_error = False
  47. self.test_failure = False
  48. try:
  49. self.repo = PatchTestInput.repo = PatchTestRepo(**repoargs)
  50. except:
  51. logger.error(traceback.print_exc())
  52. self.repo_error = True
  53. self.stop()
  54. return
  55. if mergepatch:
  56. self.repo.merge()
  57. def addError(self, test, err):
  58. self.test_error = True
  59. (ty, va, trace) = err
  60. logger.error(traceback.print_exc())
  61. def addFailure(self, test, err):
  62. test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
  63. "Signed-off-by").replace("upstream status",
  64. "Upstream-Status").replace("non auh",
  65. "non-AUH").replace("presence format", "presence")
  66. self.test_failure = True
  67. fail_str = '{}: {}: {} ({})'.format(self.fail,
  68. test_description, json.loads(str(err[1]))["issue"],
  69. test.id())
  70. print(fail_str)
  71. if logfile:
  72. with open(logfile, "a") as f:
  73. f.write(fail_str + "\n")
  74. def addSuccess(self, test):
  75. test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
  76. "Signed-off-by").replace("upstream status",
  77. "Upstream-Status").replace("non auh",
  78. "non-AUH").replace("presence format", "presence")
  79. success_str = '{}: {} ({})'.format(self.success,
  80. test_description, test.id())
  81. print(success_str)
  82. if logfile:
  83. with open(logfile, "a") as f:
  84. f.write(success_str + "\n")
  85. def addSkip(self, test, reason):
  86. test_description = test.id().split('.')[-1].replace('_', ' ').replace("cve", "CVE").replace("signed off by",
  87. "Signed-off-by").replace("upstream status",
  88. "Upstream-Status").replace("non auh",
  89. "non-AUH").replace("presence format", "presence")
  90. skip_str = '{}: {}: {} ({})'.format(self.skip,
  91. test_description, json.loads(str(reason))["issue"],
  92. test.id())
  93. print(skip_str)
  94. if logfile:
  95. with open(logfile, "a") as f:
  96. f.write(skip_str + "\n")
  97. def stopTestRun(self):
  98. # in case there was an error on repo object creation, just return
  99. if self.repo_error:
  100. return
  101. self.repo.clean()
  102. return PatchTestResult
  103. def _runner(resultklass, prefix=None):
  104. # load test with the corresponding prefix
  105. loader = unittest.TestLoader()
  106. if prefix:
  107. loader.testMethodPrefix = prefix
  108. # create the suite with discovered tests and the corresponding runner
  109. suite = loader.discover(start_dir=PatchTestInput.testdir, pattern=PatchTestInput.pattern, top_level_dir=PatchTestInput.topdir)
  110. ntc = suite.countTestCases()
  111. # if there are no test cases, just quit
  112. if not ntc:
  113. return 2
  114. runner = unittest.TextTestRunner(resultclass=resultklass, verbosity=0)
  115. try:
  116. result = runner.run(suite)
  117. except:
  118. logger.error(traceback.print_exc())
  119. logger.error('patchtest: something went wrong')
  120. return 1
  121. return 0
  122. def run(patch, logfile=None):
  123. """ Load, setup and run pre and post-merge tests """
  124. # Get the result class and install the control-c handler
  125. unittest.installHandler()
  126. # run pre-merge tests, meaning those methods with 'pretest' as prefix
  127. premerge_resultklass = getResult(patch, False, logfile)
  128. premerge_result = _runner(premerge_resultklass, 'pretest')
  129. # run post-merge tests, meaning those methods with 'test' as prefix
  130. postmerge_resultklass = getResult(patch, True, logfile)
  131. postmerge_result = _runner(postmerge_resultklass, 'test')
  132. if premerge_result == 2 and postmerge_result == 2:
  133. logger.error('patchtest: any test cases found - did you specify the correct suite directory?')
  134. return premerge_result or postmerge_result
  135. def main():
  136. tmp_patch = False
  137. patch_path = PatchTestInput.patch_path
  138. log_results = PatchTestInput.log_results
  139. log_path = None
  140. patch_list = None
  141. if os.path.isdir(patch_path):
  142. patch_list = [os.path.join(patch_path, filename) for filename in os.listdir(patch_path)]
  143. else:
  144. patch_list = [patch_path]
  145. for patch in patch_list:
  146. if os.path.getsize(patch) == 0:
  147. logger.error('patchtest: patch is empty')
  148. return 1
  149. logger.info('Testing patch %s' % patch)
  150. if log_results:
  151. log_path = patch + ".testresult"
  152. with open(log_path, "a") as f:
  153. f.write("Patchtest results for patch '%s':\n\n" % patch)
  154. try:
  155. if log_path:
  156. run(patch, log_path)
  157. else:
  158. run(patch)
  159. finally:
  160. if tmp_patch:
  161. os.remove(patch)
  162. if __name__ == '__main__':
  163. ret = 1
  164. # Parse the command line arguments and store it on the PatchTestInput namespace
  165. PatchTestInput.set_namespace()
  166. # set debugging level
  167. if PatchTestInput.debug:
  168. logger.setLevel(logging.DEBUG)
  169. # if topdir not define, default it to testdir
  170. if not PatchTestInput.topdir:
  171. PatchTestInput.topdir = PatchTestInput.testdir
  172. try:
  173. ret = main()
  174. except Exception:
  175. import traceback
  176. traceback.print_exc(5)
  177. sys.exit(ret)