serv.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. #
  2. # Copyright BitBake Contributors
  3. #
  4. # SPDX-License-Identifier: GPL-2.0-only
  5. #
  6. import os,sys,logging
  7. import signal, time
  8. import socket
  9. import io
  10. import sqlite3
  11. import prserv
  12. import prserv.db
  13. import errno
  14. from . import create_async_client, revision_smaller, increase_revision
  15. import bb.asyncrpc
  16. logger = logging.getLogger("BitBake.PRserv")
  17. PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
  18. singleton = None
  19. class PRServerClient(bb.asyncrpc.AsyncServerConnection):
  20. def __init__(self, socket, server):
  21. super().__init__(socket, "PRSERVICE", server.logger)
  22. self.server = server
  23. self.handlers.update({
  24. "get-pr": self.handle_get_pr,
  25. "test-pr": self.handle_test_pr,
  26. "test-package": self.handle_test_package,
  27. "max-package-pr": self.handle_max_package_pr,
  28. "import-one": self.handle_import_one,
  29. "export": self.handle_export,
  30. "is-readonly": self.handle_is_readonly,
  31. })
  32. def validate_proto_version(self):
  33. return (self.proto_version == (1, 0))
  34. async def dispatch_message(self, msg):
  35. try:
  36. return await super().dispatch_message(msg)
  37. except:
  38. raise
  39. async def handle_test_pr(self, request):
  40. '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
  41. version = request["version"]
  42. pkgarch = request["pkgarch"]
  43. checksum = request["checksum"]
  44. history = request["history"]
  45. value = self.server.table.find_value(version, pkgarch, checksum, history)
  46. return {"value": value}
  47. async def handle_test_package(self, request):
  48. '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False'''
  49. version = request["version"]
  50. pkgarch = request["pkgarch"]
  51. value = self.server.table.test_package(version, pkgarch)
  52. return {"value": value}
  53. async def handle_max_package_pr(self, request):
  54. '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found'''
  55. version = request["version"]
  56. pkgarch = request["pkgarch"]
  57. value = self.server.table.find_package_max_value(version, pkgarch)
  58. return {"value": value}
  59. async def handle_get_pr(self, request):
  60. version = request["version"]
  61. pkgarch = request["pkgarch"]
  62. checksum = request["checksum"]
  63. history = request["history"]
  64. if self.upstream_client is None:
  65. value = self.server.table.get_value(version, pkgarch, checksum, history)
  66. return {"value": value}
  67. # We have an upstream server.
  68. # Check whether the local server already knows the requested configuration.
  69. # If the configuration is a new one, the generated value we will add will
  70. # depend on what's on the upstream server. That's why we're calling find_value()
  71. # instead of get_value() directly.
  72. value = self.server.table.find_value(version, pkgarch, checksum, history)
  73. upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
  74. if value is not None:
  75. # The configuration is already known locally.
  76. if history:
  77. value = self.server.table.get_value(version, pkgarch, checksum, history)
  78. else:
  79. existing_value = value
  80. # In "no history", we need to make sure the value doesn't decrease
  81. # and is at least greater than the maximum upstream value
  82. # and the maximum local value
  83. local_max = self.server.table.find_package_max_value(version, pkgarch)
  84. if revision_smaller(value, local_max):
  85. value = increase_revision(local_max)
  86. if revision_smaller(value, upstream_max):
  87. # Ask upstream whether it knows the checksum
  88. upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
  89. if upstream_value is None:
  90. # Upstream doesn't have our checksum, let create a new one
  91. value = upstream_max + ".0"
  92. else:
  93. # Fine to take the same value as upstream
  94. value = upstream_max
  95. if not value == existing_value and not self.server.read_only:
  96. self.server.table.store_value(version, pkgarch, checksum, value)
  97. return {"value": value}
  98. # The configuration is a new one for the local server
  99. # Let's ask the upstream server whether it knows it
  100. known_upstream = await self.upstream_client.test_package(version, pkgarch)
  101. if not known_upstream:
  102. # The package is not known upstream, must be a local-only package
  103. # Let's compute the PR number using the local-only method
  104. value = self.server.table.get_value(version, pkgarch, checksum, history)
  105. return {"value": value}
  106. # The package is known upstream, let's ask the upstream server
  107. # whether it knows our new output hash
  108. value = await self.upstream_client.test_pr(version, pkgarch, checksum)
  109. if value is not None:
  110. # Upstream knows this output hash, let's store it and use it too.
  111. if not self.server.read_only:
  112. self.server.table.store_value(version, pkgarch, checksum, value)
  113. # If the local server is read only, won't be able to store the new
  114. # value in the database and will have to keep asking the upstream server
  115. return {"value": value}
  116. # The output hash doesn't exist upstream, get the most recent number from upstream (x)
  117. # Then, we want to have a new PR value for the local server: x.y
  118. upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
  119. # Here we know that the package is known upstream, so upstream_max can't be None
  120. subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
  121. if not self.server.read_only:
  122. self.server.table.store_value(version, pkgarch, checksum, subvalue)
  123. return {"value": subvalue}
  124. async def process_requests(self):
  125. if self.server.upstream is not None:
  126. self.upstream_client = await create_async_client(self.server.upstream)
  127. else:
  128. self.upstream_client = None
  129. try:
  130. await super().process_requests()
  131. finally:
  132. if self.upstream_client is not None:
  133. await self.upstream_client.close()
  134. async def handle_import_one(self, request):
  135. response = None
  136. if not self.server.read_only:
  137. version = request["version"]
  138. pkgarch = request["pkgarch"]
  139. checksum = request["checksum"]
  140. value = request["value"]
  141. value = self.server.table.importone(version, pkgarch, checksum, value)
  142. if value is not None:
  143. response = {"value": value}
  144. return response
  145. async def handle_export(self, request):
  146. version = request["version"]
  147. pkgarch = request["pkgarch"]
  148. checksum = request["checksum"]
  149. colinfo = request["colinfo"]
  150. history = request["history"]
  151. try:
  152. (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
  153. except sqlite3.Error as exc:
  154. self.logger.error(str(exc))
  155. metainfo = datainfo = None
  156. return {"metainfo": metainfo, "datainfo": datainfo}
  157. async def handle_is_readonly(self, request):
  158. return {"readonly": self.server.read_only}
  159. class PRServer(bb.asyncrpc.AsyncServer):
  160. def __init__(self, dbfile, read_only=False, upstream=None):
  161. super().__init__(logger)
  162. self.dbfile = dbfile
  163. self.table = None
  164. self.read_only = read_only
  165. self.upstream = upstream
  166. def accept_client(self, socket):
  167. return PRServerClient(socket, self)
  168. def start(self):
  169. tasks = super().start()
  170. self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
  171. self.table = self.db["PRMAIN"]
  172. self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
  173. (self.dbfile, self.address, str(os.getpid())))
  174. if self.upstream is not None:
  175. self.logger.info("And upstream PRServer: %s " % (self.upstream))
  176. return tasks
  177. async def stop(self):
  178. self.db.disconnect()
  179. await super().stop()
  180. class PRServSingleton(object):
  181. def __init__(self, dbfile, logfile, host, port, upstream):
  182. self.dbfile = dbfile
  183. self.logfile = logfile
  184. self.host = host
  185. self.port = port
  186. self.upstream = upstream
  187. def start(self):
  188. self.prserv = PRServer(self.dbfile, upstream=self.upstream)
  189. self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
  190. self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
  191. if not self.prserv.address:
  192. raise PRServiceConfigError
  193. if not self.port:
  194. self.port = int(self.prserv.address.rsplit(":", 1)[1])
  195. def run_as_daemon(func, pidfile, logfile):
  196. """
  197. See Advanced Programming in the UNIX, Sec 13.3
  198. """
  199. try:
  200. pid = os.fork()
  201. if pid > 0:
  202. os.waitpid(pid, 0)
  203. #parent return instead of exit to give control
  204. return pid
  205. except OSError as e:
  206. raise Exception("%s [%d]" % (e.strerror, e.errno))
  207. os.setsid()
  208. """
  209. fork again to make sure the daemon is not session leader,
  210. which prevents it from acquiring controlling terminal
  211. """
  212. try:
  213. pid = os.fork()
  214. if pid > 0: #parent
  215. os._exit(0)
  216. except OSError as e:
  217. raise Exception("%s [%d]" % (e.strerror, e.errno))
  218. os.chdir("/")
  219. sys.stdout.flush()
  220. sys.stderr.flush()
  221. # We could be called from a python thread with io.StringIO as
  222. # stdout/stderr or it could be 'real' unix fd forking where we need
  223. # to physically close the fds to prevent the program launching us from
  224. # potentially hanging on a pipe. Handle both cases.
  225. si = open("/dev/null", "r")
  226. try:
  227. os.dup2(si.fileno(), sys.stdin.fileno())
  228. except (AttributeError, io.UnsupportedOperation):
  229. sys.stdin = si
  230. so = open(logfile, "a+")
  231. try:
  232. os.dup2(so.fileno(), sys.stdout.fileno())
  233. except (AttributeError, io.UnsupportedOperation):
  234. sys.stdout = so
  235. try:
  236. os.dup2(so.fileno(), sys.stderr.fileno())
  237. except (AttributeError, io.UnsupportedOperation):
  238. sys.stderr = so
  239. # Clear out all log handlers prior to the fork() to avoid calling
  240. # event handlers not part of the PRserver
  241. for logger_iter in logging.Logger.manager.loggerDict.keys():
  242. logging.getLogger(logger_iter).handlers = []
  243. # Ensure logging makes it to the logfile
  244. streamhandler = logging.StreamHandler()
  245. streamhandler.setLevel(logging.DEBUG)
  246. formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
  247. streamhandler.setFormatter(formatter)
  248. logger.addHandler(streamhandler)
  249. # write pidfile
  250. pid = str(os.getpid())
  251. with open(pidfile, "w") as pf:
  252. pf.write("%s\n" % pid)
  253. func()
  254. os.remove(pidfile)
  255. os._exit(0)
  256. def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
  257. ip = socket.gethostbyname(host)
  258. pidfile = PIDPREFIX % (ip, port)
  259. try:
  260. with open(pidfile) as pf:
  261. pid = int(pf.readline().strip())
  262. except IOError:
  263. pid = None
  264. if pid:
  265. sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
  266. % pidfile)
  267. return 1
  268. dbfile = os.path.abspath(dbfile)
  269. def daemon_main():
  270. server = PRServer(dbfile, read_only=read_only, upstream=upstream)
  271. server.start_tcp_server(ip, port)
  272. server.serve_forever()
  273. run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
  274. return 0
  275. def stop_daemon(host, port):
  276. import glob
  277. ip = socket.gethostbyname(host)
  278. pidfile = PIDPREFIX % (ip, port)
  279. try:
  280. with open(pidfile) as pf:
  281. pid = int(pf.readline().strip())
  282. except IOError:
  283. pid = None
  284. if not pid:
  285. # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
  286. # so at least advise the user which ports the corresponding server is listening
  287. ports = []
  288. portstr = ""
  289. for pf in glob.glob(PIDPREFIX % (ip, "*")):
  290. bn = os.path.basename(pf)
  291. root, _ = os.path.splitext(bn)
  292. ports.append(root.split("_")[-1])
  293. if len(ports):
  294. portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports))
  295. sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
  296. % (pidfile, portstr))
  297. return 1
  298. try:
  299. if is_running(pid):
  300. print("Sending SIGTERM to pr-server.")
  301. os.kill(pid, signal.SIGTERM)
  302. time.sleep(0.1)
  303. try:
  304. os.remove(pidfile)
  305. except FileNotFoundError:
  306. # The PID file might have been removed by the exiting process
  307. pass
  308. except OSError as e:
  309. err = str(e)
  310. if err.find("No such process") <= 0:
  311. raise e
  312. return 0
  313. def is_running(pid):
  314. try:
  315. os.kill(pid, 0)
  316. except OSError as err:
  317. if err.errno == errno.ESRCH:
  318. return False
  319. return True
  320. def is_local_special(host, port):
  321. if (host == "localhost" or host == "127.0.0.1") and not port:
  322. return True
  323. else:
  324. return False
  325. class PRServiceConfigError(Exception):
  326. pass
  327. def auto_start(d):
  328. global singleton
  329. host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":")))
  330. if not host_params:
  331. # Shutdown any existing PR Server
  332. auto_shutdown()
  333. return None
  334. if len(host_params) != 2:
  335. # Shutdown any existing PR Server
  336. auto_shutdown()
  337. logger.critical("\n".join(["PRSERV_HOST: incorrect format",
  338. 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
  339. raise PRServiceConfigError
  340. host = host_params[0].strip().lower()
  341. port = int(host_params[1])
  342. upstream = d.getVar("PRSERV_UPSTREAM") or None
  343. if is_local_special(host, port):
  344. import bb.utils
  345. cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
  346. if not cachedir:
  347. logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
  348. raise PRServiceConfigError
  349. dbfile = os.path.join(cachedir, "prserv.sqlite3")
  350. logfile = os.path.join(cachedir, "prserv.log")
  351. if singleton:
  352. if singleton.dbfile != dbfile:
  353. # Shutdown any existing PR Server as doesn't match config
  354. auto_shutdown()
  355. if not singleton:
  356. bb.utils.mkdirhier(cachedir)
  357. singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
  358. singleton.start()
  359. if singleton:
  360. host = singleton.host
  361. port = singleton.port
  362. try:
  363. ping(host, port)
  364. return str(host) + ":" + str(port)
  365. except Exception:
  366. logger.critical("PRservice %s:%d not available" % (host, port))
  367. raise PRServiceConfigError
  368. def auto_shutdown():
  369. global singleton
  370. if singleton and singleton.process:
  371. singleton.process.terminate()
  372. singleton.process.join()
  373. singleton = None
  374. def ping(host, port):
  375. from . import client
  376. with client.PRClient() as conn:
  377. conn.connect_tcp(host, port)
  378. return conn.ping()
  379. def connect(host, port):
  380. from . import client
  381. global singleton
  382. if host.strip().lower() == "localhost" and not port:
  383. host = "localhost"
  384. port = singleton.port
  385. conn = client.PRClient()
  386. conn.connect_tcp(host, port)
  387. return conn