123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- #! /usr/bin/env python3
- #
- # Copyright (C) 2019 Garmin Ltd.
- #
- # SPDX-License-Identifier: GPL-2.0-only
- #
- import argparse
- import hashlib
- import logging
- import os
- import pprint
- import sys
- import threading
- import time
- import warnings
- import netrc
- import json
- import statistics
- import textwrap
- warnings.simplefilter("default")
- try:
- import tqdm
- ProgressBar = tqdm.tqdm
- except ImportError:
- class ProgressBar(object):
- def __init__(self, *args, **kwargs):
- pass
- def __enter__(self):
- return self
- def __exit__(self, *args, **kwargs):
- pass
- def update(self):
- pass
- sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(__file__)), 'lib'))
- import hashserv
- import bb.asyncrpc
- DEFAULT_ADDRESS = 'unix://./hashserve.sock'
- METHOD = 'stress.test.method'
- def print_user(u):
- print(f"Username: {u['username']}")
- if "permissions" in u:
- print("Permissions: " + " ".join(u["permissions"]))
- if "token" in u:
- print(f"Token: {u['token']}")
- def main():
- def handle_get(args, client):
- result = client.get_taskhash(args.method, args.taskhash, all_properties=True)
- if not result:
- return 0
- print(json.dumps(result, sort_keys=True, indent=4))
- return 0
- def handle_get_outhash(args, client):
- result = client.get_outhash(args.method, args.outhash, args.taskhash)
- if not result:
- return 0
- print(json.dumps(result, sort_keys=True, indent=4))
- return 0
- def handle_stats(args, client):
- if args.reset:
- s = client.reset_stats()
- else:
- s = client.get_stats()
- print(json.dumps(s, sort_keys=True, indent=4))
- return 0
- def handle_stress(args, client):
- def thread_main(pbar, lock):
- nonlocal found_hashes
- nonlocal missed_hashes
- nonlocal max_time
- nonlocal times
- with hashserv.create_client(args.address) as client:
- for i in range(args.requests):
- taskhash = hashlib.sha256()
- taskhash.update(args.taskhash_seed.encode('utf-8'))
- taskhash.update(str(i).encode('utf-8'))
- start_time = time.perf_counter()
- l = client.get_unihash(METHOD, taskhash.hexdigest())
- elapsed = time.perf_counter() - start_time
- with lock:
- if l:
- found_hashes += 1
- else:
- missed_hashes += 1
- times.append(elapsed)
- pbar.update()
- max_time = 0
- found_hashes = 0
- missed_hashes = 0
- lock = threading.Lock()
- times = []
- start_time = time.perf_counter()
- with ProgressBar(total=args.clients * args.requests) as pbar:
- threads = [threading.Thread(target=thread_main, args=(pbar, lock), daemon=False) for _ in range(args.clients)]
- for t in threads:
- t.start()
- for t in threads:
- t.join()
- total_elapsed = time.perf_counter() - start_time
- with lock:
- mean = statistics.mean(times)
- median = statistics.median(times)
- stddev = statistics.pstdev(times)
- print(f"Number of clients: {args.clients}")
- print(f"Requests per client: {args.requests}")
- print(f"Number of requests: {len(times)}")
- print(f"Total elapsed time: {total_elapsed:.3f}s")
- print(f"Total request rate: {len(times)/total_elapsed:.3f} req/s")
- print(f"Average request time: {mean:.3f}s")
- print(f"Median request time: {median:.3f}s")
- print(f"Request time std dev: {stddev:.3f}s")
- print(f"Maximum request time: {max(times):.3f}s")
- print(f"Minimum request time: {min(times):.3f}s")
- print(f"Hashes found: {found_hashes}")
- print(f"Hashes missed: {missed_hashes}")
- if args.report:
- with ProgressBar(total=args.requests) as pbar:
- for i in range(args.requests):
- taskhash = hashlib.sha256()
- taskhash.update(args.taskhash_seed.encode('utf-8'))
- taskhash.update(str(i).encode('utf-8'))
- outhash = hashlib.sha256()
- outhash.update(args.outhash_seed.encode('utf-8'))
- outhash.update(str(i).encode('utf-8'))
- client.report_unihash(taskhash.hexdigest(), METHOD, outhash.hexdigest(), taskhash.hexdigest())
- with lock:
- pbar.update()
- def handle_remove(args, client):
- where = {k: v for k, v in args.where}
- if where:
- result = client.remove(where)
- print("Removed %d row(s)" % (result["count"]))
- else:
- print("No query specified")
- def handle_clean_unused(args, client):
- result = client.clean_unused(args.max_age)
- print("Removed %d rows" % (result["count"]))
- return 0
- def handle_refresh_token(args, client):
- r = client.refresh_token(args.username)
- print_user(r)
- def handle_set_user_permissions(args, client):
- r = client.set_user_perms(args.username, args.permissions)
- print_user(r)
- def handle_get_user(args, client):
- r = client.get_user(args.username)
- print_user(r)
- def handle_get_all_users(args, client):
- users = client.get_all_users()
- print("{username:20}| {permissions}".format(username="Username", permissions="Permissions"))
- print(("-" * 20) + "+" + ("-" * 20))
- for u in users:
- print("{username:20}| {permissions}".format(username=u["username"], permissions=" ".join(u["permissions"])))
- def handle_new_user(args, client):
- r = client.new_user(args.username, args.permissions)
- print_user(r)
- def handle_delete_user(args, client):
- r = client.delete_user(args.username)
- print_user(r)
- def handle_get_db_usage(args, client):
- usage = client.get_db_usage()
- print(usage)
- tables = sorted(usage.keys())
- print("{name:20}| {rows:20}".format(name="Table name", rows="Rows"))
- print(("-" * 20) + "+" + ("-" * 20))
- for t in tables:
- print("{name:20}| {rows:<20}".format(name=t, rows=usage[t]["rows"]))
- print()
- total_rows = sum(t["rows"] for t in usage.values())
- print(f"Total rows: {total_rows}")
- def handle_get_db_query_columns(args, client):
- columns = client.get_db_query_columns()
- print("\n".join(sorted(columns)))
- def handle_gc_status(args, client):
- result = client.gc_status()
- if not result["mark"]:
- print("No Garbage collection in progress")
- return 0
- print("Current Mark: %s" % result["mark"])
- print("Total hashes to keep: %d" % result["keep"])
- print("Total hashes to remove: %s" % result["remove"])
- return 0
- def handle_gc_mark(args, client):
- where = {k: v for k, v in args.where}
- result = client.gc_mark(args.mark, where)
- print("New hashes marked: %d" % result["count"])
- return 0
- def handle_gc_mark_stream(args, client):
- stdin = (l.strip() for l in sys.stdin)
- marked_hashes = 0
- try:
- result = client.gc_mark_stream(args.mark, stdin)
- marked_hashes = result["count"]
- except ConnectionError:
- logger.warning(
- "Server doesn't seem to support `gc-mark-stream`. Sending "
- "hashes sequentially using `gc-mark` API."
- )
- for line in stdin:
- pairs = line.split()
- condition = dict(zip(pairs[::2], pairs[1::2]))
- result = client.gc_mark(args.mark, condition)
- marked_hashes += result["count"]
- print("New hashes marked: %d" % marked_hashes)
- return 0
- def handle_gc_sweep(args, client):
- result = client.gc_sweep(args.mark)
- print("Removed %d rows" % result["count"])
- return 0
- def handle_unihash_exists(args, client):
- result = client.unihash_exists(args.unihash)
- if args.quiet:
- return 0 if result else 1
- print("true" if result else "false")
- return 0
- def handle_ping(args, client):
- times = []
- for i in range(1, args.count + 1):
- if not args.quiet:
- print(f"Ping {i} of {args.count}... ", end="")
- start_time = time.perf_counter()
- client.ping()
- elapsed = time.perf_counter() - start_time
- times.append(elapsed)
- if not args.quiet:
- print(f"{elapsed:.3f}s")
- mean = statistics.mean(times)
- median = statistics.median(times)
- std_dev = statistics.pstdev(times)
- if not args.quiet:
- print("------------------------")
- print(f"Number of pings: {len(times)}")
- print(f"Average round trip time: {mean:.3f}s")
- print(f"Median round trip time: {median:.3f}s")
- print(f"Round trip time std dev: {std_dev:.3f}s")
- print(f"Min time is: {min(times):.3f}s")
- print(f"Max time is: {max(times):.3f}s")
- return 0
- parser = argparse.ArgumentParser(
- formatter_class=argparse.RawDescriptionHelpFormatter,
- description='Hash Equivalence Client',
- epilog=textwrap.dedent(
- """
- Possible ADDRESS options are:
- unix://PATH Connect to UNIX domain socket at PATH
- ws://HOST[:PORT] Connect to websocket at HOST:PORT (default port is 80)
- wss://HOST[:PORT] Connect to secure websocket at HOST:PORT (default port is 443)
- HOST:PORT Connect to TCP server at HOST:PORT
- """
- ),
- )
- parser.add_argument('--address', default=DEFAULT_ADDRESS, help='Server address (default "%(default)s")')
- parser.add_argument('--log', default='WARNING', help='Set logging level')
- parser.add_argument('--login', '-l', metavar="USERNAME", help="Authenticate as USERNAME")
- parser.add_argument('--password', '-p', metavar="TOKEN", help="Authenticate using token TOKEN")
- parser.add_argument('--become', '-b', metavar="USERNAME", help="Impersonate user USERNAME (if allowed) when performing actions")
- parser.add_argument('--no-netrc', '-n', action="store_false", dest="netrc", help="Do not use .netrc")
- subparsers = parser.add_subparsers()
- get_parser = subparsers.add_parser('get', help="Get the unihash for a taskhash")
- get_parser.add_argument("method", help="Method to query")
- get_parser.add_argument("taskhash", help="Task hash to query")
- get_parser.set_defaults(func=handle_get)
- get_outhash_parser = subparsers.add_parser('get-outhash', help="Get output hash information")
- get_outhash_parser.add_argument("method", help="Method to query")
- get_outhash_parser.add_argument("outhash", help="Output hash to query")
- get_outhash_parser.add_argument("taskhash", help="Task hash to query")
- get_outhash_parser.set_defaults(func=handle_get_outhash)
- stats_parser = subparsers.add_parser('stats', help='Show server stats')
- stats_parser.add_argument('--reset', action='store_true',
- help='Reset server stats')
- stats_parser.set_defaults(func=handle_stats)
- stress_parser = subparsers.add_parser('stress', help='Run stress test')
- stress_parser.add_argument('--clients', type=int, default=10,
- help='Number of simultaneous clients')
- stress_parser.add_argument('--requests', type=int, default=1000,
- help='Number of requests each client will perform')
- stress_parser.add_argument('--report', action='store_true',
- help='Report new hashes')
- stress_parser.add_argument('--taskhash-seed', default='',
- help='Include string in taskhash')
- stress_parser.add_argument('--outhash-seed', default='',
- help='Include string in outhash')
- stress_parser.set_defaults(func=handle_stress)
- remove_parser = subparsers.add_parser('remove', help="Remove hash entries")
- remove_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[],
- help="Remove entries from table where KEY == VALUE")
- remove_parser.set_defaults(func=handle_remove)
- clean_unused_parser = subparsers.add_parser('clean-unused', help="Remove unused database entries")
- clean_unused_parser.add_argument("max_age", metavar="SECONDS", type=int, help="Remove unused entries older than SECONDS old")
- clean_unused_parser.set_defaults(func=handle_clean_unused)
- refresh_token_parser = subparsers.add_parser('refresh-token', help="Refresh auth token")
- refresh_token_parser.add_argument("--username", "-u", help="Refresh the token for another user (if authorized)")
- refresh_token_parser.set_defaults(func=handle_refresh_token)
- set_user_perms_parser = subparsers.add_parser('set-user-perms', help="Set new permissions for user")
- set_user_perms_parser.add_argument("--username", "-u", help="Username", required=True)
- set_user_perms_parser.add_argument("permissions", metavar="PERM", nargs="*", default=[], help="New permissions")
- set_user_perms_parser.set_defaults(func=handle_set_user_permissions)
- get_user_parser = subparsers.add_parser('get-user', help="Get user")
- get_user_parser.add_argument("--username", "-u", help="Username")
- get_user_parser.set_defaults(func=handle_get_user)
- get_all_users_parser = subparsers.add_parser('get-all-users', help="List all users")
- get_all_users_parser.set_defaults(func=handle_get_all_users)
- new_user_parser = subparsers.add_parser('new-user', help="Create new user")
- new_user_parser.add_argument("--username", "-u", help="Username", required=True)
- new_user_parser.add_argument("permissions", metavar="PERM", nargs="*", default=[], help="New permissions")
- new_user_parser.set_defaults(func=handle_new_user)
- delete_user_parser = subparsers.add_parser('delete-user', help="Delete user")
- delete_user_parser.add_argument("--username", "-u", help="Username", required=True)
- delete_user_parser.set_defaults(func=handle_delete_user)
- db_usage_parser = subparsers.add_parser('get-db-usage', help="Database Usage")
- db_usage_parser.set_defaults(func=handle_get_db_usage)
- db_query_columns_parser = subparsers.add_parser('get-db-query-columns', help="Show columns that can be used in database queries")
- db_query_columns_parser.set_defaults(func=handle_get_db_query_columns)
- gc_status_parser = subparsers.add_parser("gc-status", help="Show garbage collection status")
- gc_status_parser.set_defaults(func=handle_gc_status)
- gc_mark_parser = subparsers.add_parser('gc-mark', help="Mark hashes to be kept for garbage collection")
- gc_mark_parser.add_argument("mark", help="Mark for this garbage collection operation")
- gc_mark_parser.add_argument("--where", "-w", metavar="KEY VALUE", nargs=2, action="append", default=[],
- help="Keep entries in table where KEY == VALUE")
- gc_mark_parser.set_defaults(func=handle_gc_mark)
- gc_mark_parser_stream = subparsers.add_parser(
- 'gc-mark-stream',
- help=(
- "Mark multiple hashes to be retained for garbage collection. Input should be provided via stdin, "
- "with each line formatted as key-value pairs separated by spaces, for example 'column1 foo column2 bar'."
- )
- )
- gc_mark_parser_stream.add_argument("mark", help="Mark for this garbage collection operation")
- gc_mark_parser_stream.set_defaults(func=handle_gc_mark_stream)
- gc_sweep_parser = subparsers.add_parser('gc-sweep', help="Perform garbage collection and delete any entries that are not marked")
- gc_sweep_parser.add_argument("mark", help="Mark for this garbage collection operation")
- gc_sweep_parser.set_defaults(func=handle_gc_sweep)
- unihash_exists_parser = subparsers.add_parser('unihash-exists', help="Check if a unihash is known to the server")
- unihash_exists_parser.add_argument("--quiet", action="store_true", help="Don't print status. Instead, exit with 0 if unihash exists and 1 if it does not")
- unihash_exists_parser.add_argument("unihash", help="Unihash to check")
- unihash_exists_parser.set_defaults(func=handle_unihash_exists)
- ping_parser = subparsers.add_parser('ping', help="Ping server")
- ping_parser.add_argument("-n", "--count", type=int, help="Number of pings. Default is %(default)s", default=10)
- ping_parser.add_argument("-q", "--quiet", action="store_true", help="Don't print each ping; only print results")
- ping_parser.set_defaults(func=handle_ping)
- args = parser.parse_args()
- logger = logging.getLogger('hashserv')
- level = getattr(logging, args.log.upper(), None)
- if not isinstance(level, int):
- raise ValueError('Invalid log level: %s' % args.log)
- logger.setLevel(level)
- console = logging.StreamHandler()
- console.setLevel(level)
- logger.addHandler(console)
- login = args.login
- password = args.password
- if login is None and args.netrc:
- try:
- n = netrc.netrc()
- auth = n.authenticators(args.address)
- if auth is not None:
- login, _, password = auth
- except FileNotFoundError:
- pass
- except netrc.NetrcParseError as e:
- sys.stderr.write(f"Error parsing {e.filename}:{e.lineno}: {e.msg}\n")
- func = getattr(args, 'func', None)
- if func:
- try:
- with hashserv.create_client(args.address, login, password) as client:
- if args.become:
- client.become_user(args.become)
- return func(args, client)
- except bb.asyncrpc.InvokeError as e:
- print(f"ERROR: {e}")
- return 1
- return 0
- if __name__ == '__main__':
- try:
- ret = main()
- except Exception:
- ret = 1
- import traceback
- traceback.print_exc()
- sys.exit(ret)
|