123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 |
- #
- # Copyright BitBake Contributors
- #
- # SPDX-License-Identifier: GPL-2.0-only
- #
- import os,sys,logging
- import signal, time
- import socket
- import io
- import sqlite3
- import prserv
- import prserv.db
- import errno
- from . import create_async_client, revision_smaller, increase_revision
- import bb.asyncrpc
- logger = logging.getLogger("BitBake.PRserv")
- PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
- singleton = None
- class PRServerClient(bb.asyncrpc.AsyncServerConnection):
- def __init__(self, socket, server):
- super().__init__(socket, "PRSERVICE", server.logger)
- self.server = server
- self.handlers.update({
- "get-pr": self.handle_get_pr,
- "test-pr": self.handle_test_pr,
- "test-package": self.handle_test_package,
- "max-package-pr": self.handle_max_package_pr,
- "import-one": self.handle_import_one,
- "export": self.handle_export,
- "is-readonly": self.handle_is_readonly,
- })
- def validate_proto_version(self):
- return (self.proto_version == (1, 0))
- async def dispatch_message(self, msg):
- try:
- return await super().dispatch_message(msg)
- except:
- raise
- async def handle_test_pr(self, request):
- '''Finds the PR value corresponding to the request. If not found, returns None and doesn't insert a new value'''
- version = request["version"]
- pkgarch = request["pkgarch"]
- checksum = request["checksum"]
- history = request["history"]
- value = self.server.table.find_value(version, pkgarch, checksum, history)
- return {"value": value}
- async def handle_test_package(self, request):
- '''Tells whether there are entries for (version, pkgarch) in the db. Returns True or False'''
- version = request["version"]
- pkgarch = request["pkgarch"]
- value = self.server.table.test_package(version, pkgarch)
- return {"value": value}
- async def handle_max_package_pr(self, request):
- '''Finds the greatest PR value for (version, pkgarch) in the db. Returns None if no entry was found'''
- version = request["version"]
- pkgarch = request["pkgarch"]
- value = self.server.table.find_package_max_value(version, pkgarch)
- return {"value": value}
- async def handle_get_pr(self, request):
- version = request["version"]
- pkgarch = request["pkgarch"]
- checksum = request["checksum"]
- history = request["history"]
- if self.upstream_client is None:
- value = self.server.table.get_value(version, pkgarch, checksum, history)
- return {"value": value}
- # We have an upstream server.
- # Check whether the local server already knows the requested configuration.
- # If the configuration is a new one, the generated value we will add will
- # depend on what's on the upstream server. That's why we're calling find_value()
- # instead of get_value() directly.
- value = self.server.table.find_value(version, pkgarch, checksum, history)
- upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
- if value is not None:
- # The configuration is already known locally.
- if history:
- value = self.server.table.get_value(version, pkgarch, checksum, history)
- else:
- existing_value = value
- # In "no history", we need to make sure the value doesn't decrease
- # and is at least greater than the maximum upstream value
- # and the maximum local value
- local_max = self.server.table.find_package_max_value(version, pkgarch)
- if revision_smaller(value, local_max):
- value = increase_revision(local_max)
- if revision_smaller(value, upstream_max):
- # Ask upstream whether it knows the checksum
- upstream_value = await self.upstream_client.test_pr(version, pkgarch, checksum)
- if upstream_value is None:
- # Upstream doesn't have our checksum, let create a new one
- value = upstream_max + ".0"
- else:
- # Fine to take the same value as upstream
- value = upstream_max
- if not value == existing_value and not self.server.read_only:
- self.server.table.store_value(version, pkgarch, checksum, value)
- return {"value": value}
- # The configuration is a new one for the local server
- # Let's ask the upstream server whether it knows it
- known_upstream = await self.upstream_client.test_package(version, pkgarch)
- if not known_upstream:
- # The package is not known upstream, must be a local-only package
- # Let's compute the PR number using the local-only method
- value = self.server.table.get_value(version, pkgarch, checksum, history)
- return {"value": value}
- # The package is known upstream, let's ask the upstream server
- # whether it knows our new output hash
- value = await self.upstream_client.test_pr(version, pkgarch, checksum)
- if value is not None:
- # Upstream knows this output hash, let's store it and use it too.
- if not self.server.read_only:
- self.server.table.store_value(version, pkgarch, checksum, value)
- # If the local server is read only, won't be able to store the new
- # value in the database and will have to keep asking the upstream server
- return {"value": value}
- # The output hash doesn't exist upstream, get the most recent number from upstream (x)
- # Then, we want to have a new PR value for the local server: x.y
- upstream_max = await self.upstream_client.max_package_pr(version, pkgarch)
- # Here we know that the package is known upstream, so upstream_max can't be None
- subvalue = self.server.table.find_new_subvalue(version, pkgarch, upstream_max)
- if not self.server.read_only:
- self.server.table.store_value(version, pkgarch, checksum, subvalue)
- return {"value": subvalue}
- async def process_requests(self):
- if self.server.upstream is not None:
- self.upstream_client = await create_async_client(self.server.upstream)
- else:
- self.upstream_client = None
- try:
- await super().process_requests()
- finally:
- if self.upstream_client is not None:
- await self.upstream_client.close()
- async def handle_import_one(self, request):
- response = None
- if not self.server.read_only:
- version = request["version"]
- pkgarch = request["pkgarch"]
- checksum = request["checksum"]
- value = request["value"]
- value = self.server.table.importone(version, pkgarch, checksum, value)
- if value is not None:
- response = {"value": value}
- return response
- async def handle_export(self, request):
- version = request["version"]
- pkgarch = request["pkgarch"]
- checksum = request["checksum"]
- colinfo = request["colinfo"]
- history = request["history"]
- try:
- (metainfo, datainfo) = self.server.table.export(version, pkgarch, checksum, colinfo, history)
- except sqlite3.Error as exc:
- self.logger.error(str(exc))
- metainfo = datainfo = None
- return {"metainfo": metainfo, "datainfo": datainfo}
- async def handle_is_readonly(self, request):
- return {"readonly": self.server.read_only}
- class PRServer(bb.asyncrpc.AsyncServer):
- def __init__(self, dbfile, read_only=False, upstream=None):
- super().__init__(logger)
- self.dbfile = dbfile
- self.table = None
- self.read_only = read_only
- self.upstream = upstream
- def accept_client(self, socket):
- return PRServerClient(socket, self)
- def start(self):
- tasks = super().start()
- self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
- self.table = self.db["PRMAIN"]
- self.logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
- (self.dbfile, self.address, str(os.getpid())))
- if self.upstream is not None:
- self.logger.info("And upstream PRServer: %s " % (self.upstream))
- return tasks
- async def stop(self):
- self.db.disconnect()
- await super().stop()
- class PRServSingleton(object):
- def __init__(self, dbfile, logfile, host, port, upstream):
- self.dbfile = dbfile
- self.logfile = logfile
- self.host = host
- self.port = port
- self.upstream = upstream
- def start(self):
- self.prserv = PRServer(self.dbfile, upstream=self.upstream)
- self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
- self.process = self.prserv.serve_as_process(log_level=logging.WARNING)
- if not self.prserv.address:
- raise PRServiceConfigError
- if not self.port:
- self.port = int(self.prserv.address.rsplit(":", 1)[1])
- def run_as_daemon(func, pidfile, logfile):
- """
- See Advanced Programming in the UNIX, Sec 13.3
- """
- try:
- pid = os.fork()
- if pid > 0:
- os.waitpid(pid, 0)
- #parent return instead of exit to give control
- return pid
- except OSError as e:
- raise Exception("%s [%d]" % (e.strerror, e.errno))
- os.setsid()
- """
- fork again to make sure the daemon is not session leader,
- which prevents it from acquiring controlling terminal
- """
- try:
- pid = os.fork()
- if pid > 0: #parent
- os._exit(0)
- except OSError as e:
- raise Exception("%s [%d]" % (e.strerror, e.errno))
- os.chdir("/")
- sys.stdout.flush()
- sys.stderr.flush()
- # We could be called from a python thread with io.StringIO as
- # stdout/stderr or it could be 'real' unix fd forking where we need
- # to physically close the fds to prevent the program launching us from
- # potentially hanging on a pipe. Handle both cases.
- si = open("/dev/null", "r")
- try:
- os.dup2(si.fileno(), sys.stdin.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stdin = si
- so = open(logfile, "a+")
- try:
- os.dup2(so.fileno(), sys.stdout.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stdout = so
- try:
- os.dup2(so.fileno(), sys.stderr.fileno())
- except (AttributeError, io.UnsupportedOperation):
- sys.stderr = so
- # Clear out all log handlers prior to the fork() to avoid calling
- # event handlers not part of the PRserver
- for logger_iter in logging.Logger.manager.loggerDict.keys():
- logging.getLogger(logger_iter).handlers = []
- # Ensure logging makes it to the logfile
- streamhandler = logging.StreamHandler()
- streamhandler.setLevel(logging.DEBUG)
- formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
- streamhandler.setFormatter(formatter)
- logger.addHandler(streamhandler)
- # write pidfile
- pid = str(os.getpid())
- with open(pidfile, "w") as pf:
- pf.write("%s\n" % pid)
- func()
- os.remove(pidfile)
- os._exit(0)
- def start_daemon(dbfile, host, port, logfile, read_only=False, upstream=None):
- ip = socket.gethostbyname(host)
- pidfile = PIDPREFIX % (ip, port)
- try:
- with open(pidfile) as pf:
- pid = int(pf.readline().strip())
- except IOError:
- pid = None
- if pid:
- sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
- % pidfile)
- return 1
- dbfile = os.path.abspath(dbfile)
- def daemon_main():
- server = PRServer(dbfile, read_only=read_only, upstream=upstream)
- server.start_tcp_server(ip, port)
- server.serve_forever()
- run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
- return 0
- def stop_daemon(host, port):
- import glob
- ip = socket.gethostbyname(host)
- pidfile = PIDPREFIX % (ip, port)
- try:
- with open(pidfile) as pf:
- pid = int(pf.readline().strip())
- except IOError:
- pid = None
- if not pid:
- # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
- # so at least advise the user which ports the corresponding server is listening
- ports = []
- portstr = ""
- for pf in glob.glob(PIDPREFIX % (ip, "*")):
- bn = os.path.basename(pf)
- root, _ = os.path.splitext(bn)
- ports.append(root.split("_")[-1])
- if len(ports):
- portstr = "Wrong port? Other ports listening at %s: %s" % (host, " ".join(ports))
- sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
- % (pidfile, portstr))
- return 1
- try:
- if is_running(pid):
- print("Sending SIGTERM to pr-server.")
- os.kill(pid, signal.SIGTERM)
- time.sleep(0.1)
- try:
- os.remove(pidfile)
- except FileNotFoundError:
- # The PID file might have been removed by the exiting process
- pass
- except OSError as e:
- err = str(e)
- if err.find("No such process") <= 0:
- raise e
- return 0
- def is_running(pid):
- try:
- os.kill(pid, 0)
- except OSError as err:
- if err.errno == errno.ESRCH:
- return False
- return True
- def is_local_special(host, port):
- if (host == "localhost" or host == "127.0.0.1") and not port:
- return True
- else:
- return False
- class PRServiceConfigError(Exception):
- pass
- def auto_start(d):
- global singleton
- host_params = list(filter(None, (d.getVar("PRSERV_HOST") or "").split(":")))
- if not host_params:
- # Shutdown any existing PR Server
- auto_shutdown()
- return None
- if len(host_params) != 2:
- # Shutdown any existing PR Server
- auto_shutdown()
- logger.critical("\n".join(["PRSERV_HOST: incorrect format",
- 'Usage: PRSERV_HOST = "<hostname>:<port>"']))
- raise PRServiceConfigError
- host = host_params[0].strip().lower()
- port = int(host_params[1])
- upstream = d.getVar("PRSERV_UPSTREAM") or None
- if is_local_special(host, port):
- import bb.utils
- cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
- if not cachedir:
- logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
- raise PRServiceConfigError
- dbfile = os.path.join(cachedir, "prserv.sqlite3")
- logfile = os.path.join(cachedir, "prserv.log")
- if singleton:
- if singleton.dbfile != dbfile:
- # Shutdown any existing PR Server as doesn't match config
- auto_shutdown()
- if not singleton:
- bb.utils.mkdirhier(cachedir)
- singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port, upstream)
- singleton.start()
- if singleton:
- host = singleton.host
- port = singleton.port
- try:
- ping(host, port)
- return str(host) + ":" + str(port)
- except Exception:
- logger.critical("PRservice %s:%d not available" % (host, port))
- raise PRServiceConfigError
- def auto_shutdown():
- global singleton
- if singleton and singleton.process:
- singleton.process.terminate()
- singleton.process.join()
- singleton = None
- def ping(host, port):
- from . import client
- with client.PRClient() as conn:
- conn.connect_tcp(host, port)
- return conn.ping()
- def connect(host, port):
- from . import client
- global singleton
- if host.strip().lower() == "localhost" and not port:
- host = "localhost"
- port = singleton.port
- conn = client.PRClient()
- conn.connect_tcp(host, port)
- return conn
|