浏览代码

bitbake: hashserv: server: Add support for SO_REUSEPORT

SO_REUSEPORT is a socket option that allows multiple servers to listen
on the same TCP port, and the kernel will automatically load balance the
connections between them. This is particularly helpful for the hash
server since it runs in a single thread. To take advantage of a
multi-core server, multiple servers can be started in parallel with this
option (up to 1 per CPU) and the kernel will load balance between them.

(Bitbake rev: d72d5a7decb489e2af0ebc43cfea0ca3e4353e9b)

Signed-off-by: Joshua Watt <JPEWhacker@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
Joshua Watt 1 年之前
父节点
当前提交
e16d690e77
共有 3 个文件被更改,包括 40 次插入10 次删除
  1. 9 1
      bitbake/bin/bitbake-hashserv
  2. 27 7
      bitbake/lib/bb/asyncrpc/serv.py
  3. 4 2
      bitbake/lib/hashserv/__init__.py

+ 9 - 1
bitbake/bin/bitbake-hashserv

@@ -125,6 +125,11 @@ The following permissions are supported by the server:
         default=os.environ.get("HASHSERVER_ADMIN_PASSWORD", None),
         default=os.environ.get("HASHSERVER_ADMIN_PASSWORD", None),
         help="Create default admin user with password ADMIN_PASSWORD ($HASHSERVER_ADMIN_PASSWORD)",
         help="Create default admin user with password ADMIN_PASSWORD ($HASHSERVER_ADMIN_PASSWORD)",
     )
     )
+    parser.add_argument(
+        "--reuseport",
+        action="store_true",
+        help="Enable SO_REUSEPORT, allowing multiple servers to bind to the same port for load balancing",
+    )
 
 
     args = parser.parse_args()
     args = parser.parse_args()
 
 
@@ -132,7 +137,9 @@ The following permissions are supported by the server:
 
 
     level = getattr(logging, args.log.upper(), None)
     level = getattr(logging, args.log.upper(), None)
     if not isinstance(level, int):
     if not isinstance(level, int):
-        raise ValueError("Invalid log level: %s (Try ERROR/WARNING/INFO/DEBUG)" % args.log)
+        raise ValueError(
+            "Invalid log level: %s (Try ERROR/WARNING/INFO/DEBUG)" % args.log
+        )
 
 
     logger.setLevel(level)
     logger.setLevel(level)
     console = logging.StreamHandler()
     console = logging.StreamHandler()
@@ -155,6 +162,7 @@ The following permissions are supported by the server:
         anon_perms=anon_perms,
         anon_perms=anon_perms,
         admin_username=args.admin_user,
         admin_username=args.admin_user,
         admin_password=args.admin_password,
         admin_password=args.admin_password,
+        reuseport=args.reuseport,
     )
     )
     server.serve_forever()
     server.serve_forever()
     return 0
     return 0

+ 27 - 7
bitbake/lib/bb/asyncrpc/serv.py

@@ -138,14 +138,20 @@ class StreamServer(object):
 
 
 
 
 class TCPStreamServer(StreamServer):
 class TCPStreamServer(StreamServer):
-    def __init__(self, host, port, handler, logger):
+    def __init__(self, host, port, handler, logger, *, reuseport=False):
         super().__init__(handler, logger)
         super().__init__(handler, logger)
         self.host = host
         self.host = host
         self.port = port
         self.port = port
+        self.reuseport = reuseport
 
 
     def start(self, loop):
     def start(self, loop):
         self.server = loop.run_until_complete(
         self.server = loop.run_until_complete(
-            asyncio.start_server(self.handle_stream_client, self.host, self.port)
+            asyncio.start_server(
+                self.handle_stream_client,
+                self.host,
+                self.port,
+                reuse_port=self.reuseport,
+            )
         )
         )
 
 
         for s in self.server.sockets:
         for s in self.server.sockets:
@@ -209,11 +215,12 @@ class UnixStreamServer(StreamServer):
 
 
 
 
 class WebsocketsServer(object):
 class WebsocketsServer(object):
-    def __init__(self, host, port, handler, logger):
+    def __init__(self, host, port, handler, logger, *, reuseport=False):
         self.host = host
         self.host = host
         self.port = port
         self.port = port
         self.handler = handler
         self.handler = handler
         self.logger = logger
         self.logger = logger
+        self.reuseport = reuseport
 
 
     def start(self, loop):
     def start(self, loop):
         import websockets.server
         import websockets.server
@@ -224,6 +231,7 @@ class WebsocketsServer(object):
                 self.host,
                 self.host,
                 self.port,
                 self.port,
                 ping_interval=None,
                 ping_interval=None,
+                reuse_port=self.reuseport,
             )
             )
         )
         )
 
 
@@ -262,14 +270,26 @@ class AsyncServer(object):
         self.loop = None
         self.loop = None
         self.run_tasks = []
         self.run_tasks = []
 
 
-    def start_tcp_server(self, host, port):
-        self.server = TCPStreamServer(host, port, self._client_handler, self.logger)
+    def start_tcp_server(self, host, port, *, reuseport=False):
+        self.server = TCPStreamServer(
+            host,
+            port,
+            self._client_handler,
+            self.logger,
+            reuseport=reuseport,
+        )
 
 
     def start_unix_server(self, path):
     def start_unix_server(self, path):
         self.server = UnixStreamServer(path, self._client_handler, self.logger)
         self.server = UnixStreamServer(path, self._client_handler, self.logger)
 
 
-    def start_websocket_server(self, host, port):
-        self.server = WebsocketsServer(host, port, self._client_handler, self.logger)
+    def start_websocket_server(self, host, port, reuseport=False):
+        self.server = WebsocketsServer(
+            host,
+            port,
+            self._client_handler,
+            self.logger,
+            reuseport=reuseport,
+        )
 
 
     async def _client_handler(self, socket):
     async def _client_handler(self, socket):
         address = socket.address
         address = socket.address

+ 4 - 2
bitbake/lib/hashserv/__init__.py

@@ -13,6 +13,7 @@ from bb.asyncrpc.client import parse_address, ADDR_TYPE_UNIX, ADDR_TYPE_WS
 
 
 User = namedtuple("User", ("username", "permissions"))
 User = namedtuple("User", ("username", "permissions"))
 
 
+
 def create_server(
 def create_server(
     addr,
     addr,
     dbname,
     dbname,
@@ -25,6 +26,7 @@ def create_server(
     anon_perms=None,
     anon_perms=None,
     admin_username=None,
     admin_username=None,
     admin_password=None,
     admin_password=None,
+    reuseport=False,
 ):
 ):
     def sqlite_engine():
     def sqlite_engine():
         from .sqlite import DatabaseEngine
         from .sqlite import DatabaseEngine
@@ -60,9 +62,9 @@ def create_server(
         s.start_unix_server(*a)
         s.start_unix_server(*a)
     elif typ == ADDR_TYPE_WS:
     elif typ == ADDR_TYPE_WS:
         url = urlparse(a[0])
         url = urlparse(a[0])
-        s.start_websocket_server(url.hostname, url.port)
+        s.start_websocket_server(url.hostname, url.port, reuseport=reuseport)
     else:
     else:
-        s.start_tcp_server(*a)
+        s.start_tcp_server(*a, reuseport=reuseport)
 
 
     return s
     return s