client.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. #
  2. # Copyright BitBake Contributors
  3. #
  4. # SPDX-License-Identifier: GPL-2.0-only
  5. #
  6. import logging
  7. import bb.asyncrpc
  8. from . import create_async_client
  9. logger = logging.getLogger("BitBake.PRserv")
  10. class PRAsyncClient(bb.asyncrpc.AsyncClient):
  11. def __init__(self):
  12. super().__init__("PRSERVICE", "1.0", logger)
  13. async def getPR(self, version, pkgarch, checksum, history=False):
  14. response = await self.invoke(
  15. {"get-pr": {"version": version, "pkgarch": pkgarch, "checksum": checksum, "history": history}}
  16. )
  17. if response:
  18. return response["value"]
  19. async def test_pr(self, version, pkgarch, checksum, history=False):
  20. response = await self.invoke(
  21. {"test-pr": {"version": version, "pkgarch": pkgarch, "checksum": checksum, "history": history}}
  22. )
  23. if response:
  24. return response["value"]
  25. async def test_package(self, version, pkgarch):
  26. response = await self.invoke(
  27. {"test-package": {"version": version, "pkgarch": pkgarch}}
  28. )
  29. if response:
  30. return response["value"]
  31. async def max_package_pr(self, version, pkgarch):
  32. response = await self.invoke(
  33. {"max-package-pr": {"version": version, "pkgarch": pkgarch}}
  34. )
  35. if response:
  36. return response["value"]
  37. async def importone(self, version, pkgarch, checksum, value):
  38. response = await self.invoke(
  39. {"import-one": {"version": version, "pkgarch": pkgarch, "checksum": checksum, "value": value}}
  40. )
  41. if response:
  42. return response["value"]
  43. async def export(self, version, pkgarch, checksum, colinfo, history=False):
  44. response = await self.invoke(
  45. {"export": {"version": version, "pkgarch": pkgarch, "checksum": checksum, "colinfo": colinfo, "history": history}}
  46. )
  47. if response:
  48. return (response["metainfo"], response["datainfo"])
  49. async def is_readonly(self):
  50. response = await self.invoke(
  51. {"is-readonly": {}}
  52. )
  53. if response:
  54. return response["readonly"]
  55. class PRClient(bb.asyncrpc.Client):
  56. def __init__(self):
  57. super().__init__()
  58. self._add_methods("getPR", "test_pr", "test_package", "max_package_pr", "importone", "export", "is_readonly")
  59. def _get_async_client(self):
  60. return PRAsyncClient()