summaryrefslogtreecommitdiff
path: root/poky/bitbake/lib/hashserv/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'poky/bitbake/lib/hashserv/tests.py')
-rw-r--r--poky/bitbake/lib/hashserv/tests.py736
1 files changed, 712 insertions, 24 deletions
diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py
index f343c586b5..a9e6fdf9ff 100644
--- a/poky/bitbake/lib/hashserv/tests.py
+++ b/poky/bitbake/lib/hashserv/tests.py
@@ -6,6 +6,8 @@
#
from . import create_server, create_client
+from .server import DEFAULT_ANON_PERMS, ALL_PERMISSIONS
+from bb.asyncrpc import InvokeError
import hashlib
import logging
import multiprocessing
@@ -17,6 +19,14 @@ import unittest
import socket
import time
import signal
+import subprocess
+import json
+import re
+from pathlib import Path
+
+
+THIS_DIR = Path(__file__).parent
+BIN_DIR = THIS_DIR.parent.parent / "bin"
def server_prefunc(server, idx):
logging.basicConfig(level=logging.DEBUG, filename='bbhashserv-%d.log' % idx, filemode='w',
@@ -29,11 +39,12 @@ class HashEquivalenceTestSetup(object):
METHOD = 'TestMethod'
server_index = 0
+ client_index = 0
- def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc):
+ def start_server(self, dbpath=None, upstream=None, read_only=False, prefunc=server_prefunc, anon_perms=DEFAULT_ANON_PERMS, admin_username=None, admin_password=None):
self.server_index += 1
if dbpath is None:
- dbpath = os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
+ dbpath = self.make_dbpath()
def cleanup_server(server):
if server.process.exitcode is not None:
@@ -45,19 +56,41 @@ class HashEquivalenceTestSetup(object):
server = create_server(self.get_server_addr(self.server_index),
dbpath,
upstream=upstream,
- read_only=read_only)
+ read_only=read_only,
+ anon_perms=anon_perms,
+ admin_username=admin_username,
+ admin_password=admin_password)
server.dbpath = dbpath
server.serve_as_process(prefunc=prefunc, args=(self.server_index,))
self.addCleanup(cleanup_server, server)
+ return server
+
+ def make_dbpath(self):
+ return os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
+
+ def start_client(self, server_address, username=None, password=None):
def cleanup_client(client):
client.close()
- client = create_client(server.address)
+ client = create_client(server_address, username=username, password=password)
self.addCleanup(cleanup_client, client)
- return (client, server)
+ return client
+
+ def start_test_server(self):
+ self.server = self.start_server()
+ return self.server.address
+
+ def start_auth_server(self):
+ auth_server = self.start_server(self.server.dbpath, anon_perms=[], admin_username="admin", admin_password="password")
+ self.auth_server_address = auth_server.address
+ self.admin_client = self.start_client(auth_server.address, username="admin", password="password")
+ return self.admin_client
+
+ def auth_client(self, user):
+ return self.start_client(self.auth_server_address, user["username"], user["token"])
def setUp(self):
if sys.version_info < (3, 5, 0):
@@ -66,26 +99,83 @@ class HashEquivalenceTestSetup(object):
self.temp_dir = tempfile.TemporaryDirectory(prefix='bb-hashserv')
self.addCleanup(self.temp_dir.cleanup)
- (self.client, self.server) = self.start_server()
+ self.server_address = self.start_test_server()
+
+ self.client = self.start_client(self.server_address)
def assertClientGetHash(self, client, taskhash, unihash):
result = client.get_unihash(self.METHOD, taskhash)
self.assertEqual(result, unihash)
+ def assertUserPerms(self, user, permissions):
+ with self.auth_client(user) as client:
+ info = client.get_user()
+ self.assertEqual(info, {
+ "username": user["username"],
+ "permissions": permissions,
+ })
-class HashEquivalenceCommonTests(object):
- def test_create_hash(self):
+ def assertUserCanAuth(self, user):
+ with self.start_client(self.auth_server_address) as client:
+ client.auth(user["username"], user["token"])
+
+ def assertUserCannotAuth(self, user):
+ with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
+ client.auth(user["username"], user["token"])
+
+ def create_test_hash(self, client):
# Simple test that hashes can be created
taskhash = '35788efcb8dfb0a02659d81cf2bfd695fb30faf9'
outhash = '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f'
unihash = 'f46d3fbb439bd9b921095da657a4de906510d2cd'
- self.assertClientGetHash(self.client, taskhash, None)
+ self.assertClientGetHash(client, taskhash, None)
- result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
+ result = client.report_unihash(taskhash, self.METHOD, outhash, unihash)
self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
return taskhash, outhash, unihash
+ def run_hashclient(self, args, **kwargs):
+ try:
+ p = subprocess.run(
+ [BIN_DIR / "bitbake-hashclient"] + args,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ encoding="utf-8",
+ **kwargs
+ )
+ except subprocess.CalledProcessError as e:
+ print(e.output)
+ raise e
+
+ print(p.stdout)
+ return p
+
+
+class HashEquivalenceCommonTests(object):
+ def auth_perms(self, *permissions):
+ self.client_index += 1
+ user = self.create_user(f"user-{self.client_index}", permissions)
+ return self.auth_client(user)
+
+ def create_user(self, username, permissions, *, client=None):
+ def remove_user(username):
+ try:
+ self.admin_client.delete_user(username)
+ except bb.asyncrpc.InvokeError:
+ pass
+
+ if client is None:
+ client = self.admin_client
+
+ user = client.new_user(username, permissions)
+ self.addCleanup(remove_user, username)
+
+ return user
+
+ def test_create_hash(self):
+ return self.create_test_hash(self.client)
+
def test_create_equivalent(self):
# Tests that a second reported task with the same outhash will be
# assigned the same unihash
@@ -127,7 +217,7 @@ class HashEquivalenceCommonTests(object):
self.assertClientGetHash(self.client, taskhash, unihash)
def test_remove_taskhash(self):
- taskhash, outhash, unihash = self.test_create_hash()
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"taskhash": taskhash})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
@@ -136,13 +226,13 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_remove_unihash(self):
- taskhash, outhash, unihash = self.test_create_hash()
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"unihash": unihash})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
def test_remove_outhash(self):
- taskhash, outhash, unihash = self.test_create_hash()
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"outhash": outhash})
self.assertGreater(result["count"], 0)
@@ -150,7 +240,7 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_remove_method(self):
- taskhash, outhash, unihash = self.test_create_hash()
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
result = self.client.remove({"method": self.METHOD})
self.assertGreater(result["count"], 0)
self.assertClientGetHash(self.client, taskhash, None)
@@ -159,7 +249,7 @@ class HashEquivalenceCommonTests(object):
self.assertIsNone(result_outhash)
def test_clean_unused(self):
- taskhash, outhash, unihash = self.test_create_hash()
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
# Clean the database, which should not remove anything because all hashes an in-use
result = self.client.clean_unused(0)
@@ -206,7 +296,7 @@ class HashEquivalenceCommonTests(object):
def test_stress(self):
def query_server(failures):
- client = Client(self.server.address)
+ client = Client(self.server_address)
try:
for i in range(1000):
taskhash = hashlib.sha256()
@@ -245,8 +335,10 @@ class HashEquivalenceCommonTests(object):
# the side client. It also verifies that the results are pulled into
# the downstream database by checking that the downstream and side servers
# match after the downstream is done waiting for all backfill tasks
- (down_client, down_server) = self.start_server(upstream=self.server.address)
- (side_client, side_server) = self.start_server(dbpath=down_server.dbpath)
+ down_server = self.start_server(upstream=self.server_address)
+ down_client = self.start_client(down_server.address)
+ side_server = self.start_server(dbpath=down_server.dbpath)
+ side_client = self.start_client(side_server.address)
def check_hash(taskhash, unihash, old_sidehash):
nonlocal down_client
@@ -351,14 +443,18 @@ class HashEquivalenceCommonTests(object):
self.assertEqual(result['method'], self.METHOD)
def test_ro_server(self):
- (ro_client, ro_server) = self.start_server(dbpath=self.server.dbpath, read_only=True)
+ rw_server = self.start_server()
+ rw_client = self.start_client(rw_server.address)
+
+ ro_server = self.start_server(dbpath=rw_server.dbpath, read_only=True)
+ ro_client = self.start_client(ro_server.address)
# Report a hash via the read-write server
taskhash = '35788efcb8dfb0a02659d81cf2bfd695fb30faf9'
outhash = '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f'
unihash = 'f46d3fbb439bd9b921095da657a4de906510d2cd'
- result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash)
+ result = rw_client.report_unihash(taskhash, self.METHOD, outhash, unihash)
self.assertEqual(result['unihash'], unihash, 'Server returned bad unihash')
# Check the hash via the read-only server
@@ -369,11 +465,11 @@ class HashEquivalenceCommonTests(object):
outhash2 = '3c979c3db45c569f51ab7626a4651074be3a9d11a84b1db076f5b14f7d39db44'
unihash2 = '90e9bc1d1f094c51824adca7f8ea79a048d68824'
- with self.assertRaises(ConnectionError):
- ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
+ result = ro_client.report_unihash(taskhash2, self.METHOD, outhash2, unihash2)
+ self.assertEqual(result['unihash'], unihash2)
# Ensure that the database was not modified
- self.assertClientGetHash(self.client, taskhash2, None)
+ self.assertClientGetHash(rw_client, taskhash2, None)
def test_slow_server_start(self):
@@ -393,7 +489,7 @@ class HashEquivalenceCommonTests(object):
old_signal = signal.signal(signal.SIGTERM, do_nothing)
self.addCleanup(signal.signal, signal.SIGTERM, old_signal)
- _, server = self.start_server(prefunc=prefunc)
+ server = self.start_server(prefunc=prefunc)
server.process.terminate()
time.sleep(30)
event.set()
@@ -453,6 +549,524 @@ class HashEquivalenceCommonTests(object):
# shares a taskhash with Task 2
self.assertClientGetHash(self.client, taskhash2, unihash2)
+ def test_auth_read_perms(self):
+ admin_client = self.start_auth_server()
+
+ # Create hashes with non-authenticated server
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+
+ # Validate hash can be retrieved using authenticated client
+ with self.auth_perms("@read") as client:
+ self.assertClientGetHash(client, taskhash, unihash)
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ self.assertClientGetHash(client, taskhash, unihash)
+
+ def test_auth_report_perms(self):
+ admin_client = self.start_auth_server()
+
+ # Without read permission, the user is completely denied
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ self.create_test_hash(client)
+
+ # Read permission allows the call to succeed, but it doesn't record
+ # anythin in the database
+ with self.auth_perms("@read") as client:
+ taskhash, outhash, unihash = self.create_test_hash(client)
+ self.assertClientGetHash(client, taskhash, None)
+
+ # Report permission alone is insufficient
+ with self.auth_perms("@report") as client, self.assertRaises(InvokeError):
+ self.create_test_hash(client)
+
+ # Read and report permission actually modify the database
+ with self.auth_perms("@read", "@report") as client:
+ taskhash, outhash, unihash = self.create_test_hash(client)
+ self.assertClientGetHash(client, taskhash, unihash)
+
+ def test_auth_no_token_refresh_from_anon_user(self):
+ self.start_auth_server()
+
+ with self.start_client(self.auth_server_address) as client, self.assertRaises(InvokeError):
+ client.refresh_token()
+
+ def test_auth_self_token_refresh(self):
+ admin_client = self.start_auth_server()
+
+ # Create a new user with no permissions
+ user = self.create_user("test-user", [])
+
+ with self.auth_client(user) as client:
+ new_user = client.refresh_token()
+
+ self.assertEqual(user["username"], new_user["username"])
+ self.assertNotEqual(user["token"], new_user["token"])
+ self.assertUserCanAuth(new_user)
+ self.assertUserCannotAuth(user)
+
+ # Explicitly specifying with your own username is fine also
+ with self.auth_client(new_user) as client:
+ new_user2 = client.refresh_token(user["username"])
+
+ self.assertEqual(user["username"], new_user2["username"])
+ self.assertNotEqual(user["token"], new_user2["token"])
+ self.assertUserCanAuth(new_user2)
+ self.assertUserCannotAuth(new_user)
+ self.assertUserCannotAuth(user)
+
+ def test_auth_token_refresh(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ client.refresh_token(user["username"])
+
+ with self.auth_perms("@user-admin") as client:
+ new_user = client.refresh_token(user["username"])
+
+ self.assertEqual(user["username"], new_user["username"])
+ self.assertNotEqual(user["token"], new_user["token"])
+ self.assertUserCanAuth(new_user)
+ self.assertUserCannotAuth(user)
+
+ def test_auth_self_get_user(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+ user_info = user.copy()
+ del user_info["token"]
+
+ with self.auth_client(user) as client:
+ info = client.get_user()
+ self.assertEqual(info, user_info)
+
+ # Explicitly asking for your own username is fine also
+ info = client.get_user(user["username"])
+ self.assertEqual(info, user_info)
+
+ def test_auth_get_user(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+ user_info = user.copy()
+ del user_info["token"]
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ client.get_user(user["username"])
+
+ with self.auth_perms("@user-admin") as client:
+ info = client.get_user(user["username"])
+ self.assertEqual(info, user_info)
+
+ info = client.get_user("nonexist-user")
+ self.assertIsNone(info)
+
+ def test_auth_reconnect(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+ user_info = user.copy()
+ del user_info["token"]
+
+ with self.auth_client(user) as client:
+ info = client.get_user()
+ self.assertEqual(info, user_info)
+
+ client.disconnect()
+
+ info = client.get_user()
+ self.assertEqual(info, user_info)
+
+ def test_auth_delete_user(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+
+ # self service
+ with self.auth_client(user) as client:
+ client.delete_user(user["username"])
+
+ self.assertIsNone(admin_client.get_user(user["username"]))
+ user = self.create_user("test-user", [])
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ client.delete_user(user["username"])
+
+ with self.auth_perms("@user-admin") as client:
+ client.delete_user(user["username"])
+
+ # User doesn't exist, so even though the permission is correct, it's an
+ # error
+ with self.auth_perms("@user-admin") as client, self.assertRaises(InvokeError):
+ client.delete_user(user["username"])
+
+ def test_auth_set_user_perms(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+
+ self.assertUserPerms(user, [])
+
+ # No self service to change permissions
+ with self.auth_client(user) as client, self.assertRaises(InvokeError):
+ client.set_user_perms(user["username"], ["@all"])
+ self.assertUserPerms(user, [])
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ client.set_user_perms(user["username"], ["@all"])
+ self.assertUserPerms(user, [])
+
+ with self.auth_perms("@user-admin") as client:
+ client.set_user_perms(user["username"], ["@all"])
+ self.assertUserPerms(user, sorted(list(ALL_PERMISSIONS)))
+
+ # Bad permissions
+ with self.auth_perms("@user-admin") as client, self.assertRaises(InvokeError):
+ client.set_user_perms(user["username"], ["@this-is-not-a-permission"])
+ self.assertUserPerms(user, sorted(list(ALL_PERMISSIONS)))
+
+ def test_auth_get_all_users(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", [])
+
+ with self.auth_client(user) as client, self.assertRaises(InvokeError):
+ client.get_all_users()
+
+ # Give the test user the correct permission
+ admin_client.set_user_perms(user["username"], ["@user-admin"])
+
+ with self.auth_client(user) as client:
+ all_users = client.get_all_users()
+
+ # Convert to a dictionary for easier comparison
+ all_users = {u["username"]: u for u in all_users}
+
+ self.assertEqual(all_users,
+ {
+ "admin": {
+ "username": "admin",
+ "permissions": sorted(list(ALL_PERMISSIONS)),
+ },
+ "test-user": {
+ "username": "test-user",
+ "permissions": ["@user-admin"],
+ }
+ }
+ )
+
+ def test_auth_new_user(self):
+ self.start_auth_server()
+
+ permissions = ["@read", "@report", "@db-admin", "@user-admin"]
+ permissions.sort()
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ self.create_user("test-user", permissions, client=client)
+
+ with self.auth_perms("@user-admin") as client:
+ user = self.create_user("test-user", permissions, client=client)
+ self.assertIn("token", user)
+ self.assertEqual(user["username"], "test-user")
+ self.assertEqual(user["permissions"], permissions)
+
+ def test_auth_become_user(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", ["@read", "@report"])
+ user_info = user.copy()
+ del user_info["token"]
+
+ with self.auth_perms() as client, self.assertRaises(InvokeError):
+ client.become_user(user["username"])
+
+ with self.auth_perms("@user-admin") as client:
+ become = client.become_user(user["username"])
+ self.assertEqual(become, user_info)
+
+ info = client.get_user()
+ self.assertEqual(info, user_info)
+
+ # Verify become user is preserved across disconnect
+ client.disconnect()
+
+ info = client.get_user()
+ self.assertEqual(info, user_info)
+
+ # test-user doesn't have become_user permissions, so this should
+ # not work
+ with self.assertRaises(InvokeError):
+ client.become_user(user["username"])
+
+ # No self-service of become
+ with self.auth_client(user) as client, self.assertRaises(InvokeError):
+ client.become_user(user["username"])
+
+ # Give test user permissions to become
+ admin_client.set_user_perms(user["username"], ["@user-admin"])
+
+ # It's possible to become yourself (effectively a noop)
+ with self.auth_perms("@user-admin") as client:
+ become = client.become_user(client.username)
+
+ def test_get_db_usage(self):
+ usage = self.client.get_db_usage()
+
+ self.assertTrue(isinstance(usage, dict))
+ for name in usage.keys():
+ self.assertTrue(isinstance(usage[name], dict))
+ self.assertIn("rows", usage[name])
+ self.assertTrue(isinstance(usage[name]["rows"], int))
+
+ def test_get_db_query_columns(self):
+ columns = self.client.get_db_query_columns()
+
+ self.assertTrue(isinstance(columns, list))
+ self.assertTrue(len(columns) > 0)
+
+ for col in columns:
+ self.client.remove({col: ""})
+
+ def test_auth_is_owner(self):
+ admin_client = self.start_auth_server()
+
+ user = self.create_user("test-user", ["@read", "@report"])
+ with self.auth_client(user) as client:
+ taskhash, outhash, unihash = self.create_test_hash(client)
+ data = client.get_taskhash(self.METHOD, taskhash, True)
+ self.assertEqual(data["owner"], user["username"])
+
+
+class TestHashEquivalenceClient(HashEquivalenceTestSetup, unittest.TestCase):
+ def get_server_addr(self, server_idx):
+ return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)
+
+ def test_stats(self):
+ p = self.run_hashclient(["--address", self.server_address, "stats"], check=True)
+ json.loads(p.stdout)
+
+ def test_stress(self):
+ self.run_hashclient(["--address", self.server_address, "stress"], check=True)
+
+ def test_remove_taskhash(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+ self.run_hashclient([
+ "--address", self.server_address,
+ "remove",
+ "--where", "taskhash", taskhash,
+ ], check=True)
+ self.assertClientGetHash(self.client, taskhash, None)
+
+ result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
+ self.assertIsNone(result_outhash)
+
+ def test_remove_unihash(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+ self.run_hashclient([
+ "--address", self.server_address,
+ "remove",
+ "--where", "unihash", unihash,
+ ], check=True)
+ self.assertClientGetHash(self.client, taskhash, None)
+
+ def test_remove_outhash(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+ self.run_hashclient([
+ "--address", self.server_address,
+ "remove",
+ "--where", "outhash", outhash,
+ ], check=True)
+
+ result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
+ self.assertIsNone(result_outhash)
+
+ def test_remove_method(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+ self.run_hashclient([
+ "--address", self.server_address,
+ "remove",
+ "--where", "method", self.METHOD,
+ ], check=True)
+ self.assertClientGetHash(self.client, taskhash, None)
+
+ result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash)
+ self.assertIsNone(result_outhash)
+
+ def test_clean_unused(self):
+ taskhash, outhash, unihash = self.create_test_hash(self.client)
+
+ # Clean the database, which should not remove anything because all hashes an in-use
+ self.run_hashclient([
+ "--address", self.server_address,
+ "clean-unused", "0",
+ ], check=True)
+ self.assertClientGetHash(self.client, taskhash, unihash)
+
+ # Remove the unihash. The row in the outhash table should still be present
+ self.run_hashclient([
+ "--address", self.server_address,
+ "remove",
+ "--where", "unihash", unihash,
+ ], check=True)
+ result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash, False)
+ self.assertIsNotNone(result_outhash)
+
+ # Now clean with no minimum age which will remove the outhash
+ self.run_hashclient([
+ "--address", self.server_address,
+ "clean-unused", "0",
+ ], check=True)
+ result_outhash = self.client.get_outhash(self.METHOD, outhash, taskhash, False)
+ self.assertIsNone(result_outhash)
+
+ def test_refresh_token(self):
+ admin_client = self.start_auth_server()
+
+ user = admin_client.new_user("test-user", ["@read", "@report"])
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", user["username"],
+ "--password", user["token"],
+ "refresh-token"
+ ], check=True)
+
+ new_token = None
+ for l in p.stdout.splitlines():
+ l = l.rstrip()
+ m = re.match(r'Token: +(.*)$', l)
+ if m is not None:
+ new_token = m.group(1)
+
+ self.assertTrue(new_token)
+
+ print("New token is %r" % new_token)
+
+ self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", user["username"],
+ "--password", new_token,
+ "get-user"
+ ], check=True)
+
+ def test_set_user_perms(self):
+ admin_client = self.start_auth_server()
+
+ user = admin_client.new_user("test-user", ["@read"])
+
+ self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", admin_client.username,
+ "--password", admin_client.password,
+ "set-user-perms",
+ "-u", user["username"],
+ "@read", "@report",
+ ], check=True)
+
+ new_user = admin_client.get_user(user["username"])
+
+ self.assertEqual(set(new_user["permissions"]), {"@read", "@report"})
+
+ def test_get_user(self):
+ admin_client = self.start_auth_server()
+
+ user = admin_client.new_user("test-user", ["@read"])
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", admin_client.username,
+ "--password", admin_client.password,
+ "get-user",
+ "-u", user["username"],
+ ], check=True)
+
+ self.assertIn("Username:", p.stdout)
+ self.assertIn("Permissions:", p.stdout)
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", user["username"],
+ "--password", user["token"],
+ "get-user",
+ ], check=True)
+
+ self.assertIn("Username:", p.stdout)
+ self.assertIn("Permissions:", p.stdout)
+
+ def test_get_all_users(self):
+ admin_client = self.start_auth_server()
+
+ admin_client.new_user("test-user1", ["@read"])
+ admin_client.new_user("test-user2", ["@read"])
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", admin_client.username,
+ "--password", admin_client.password,
+ "get-all-users",
+ ], check=True)
+
+ self.assertIn("admin", p.stdout)
+ self.assertIn("test-user1", p.stdout)
+ self.assertIn("test-user2", p.stdout)
+
+ def test_new_user(self):
+ admin_client = self.start_auth_server()
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", admin_client.username,
+ "--password", admin_client.password,
+ "new-user",
+ "-u", "test-user",
+ "@read", "@report",
+ ], check=True)
+
+ new_token = None
+ for l in p.stdout.splitlines():
+ l = l.rstrip()
+ m = re.match(r'Token: +(.*)$', l)
+ if m is not None:
+ new_token = m.group(1)
+
+ self.assertTrue(new_token)
+
+ user = {
+ "username": "test-user",
+ "token": new_token,
+ }
+
+ self.assertUserPerms(user, ["@read", "@report"])
+
+ def test_delete_user(self):
+ admin_client = self.start_auth_server()
+
+ user = admin_client.new_user("test-user", ["@read"])
+
+ p = self.run_hashclient([
+ "--address", self.auth_server_address,
+ "--login", admin_client.username,
+ "--password", admin_client.password,
+ "delete-user",
+ "-u", user["username"],
+ ], check=True)
+
+ self.assertIsNone(admin_client.get_user(user["username"]))
+
+ def test_get_db_usage(self):
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "get-db-usage",
+ ], check=True)
+
+ def test_get_db_query_columns(self):
+ p = self.run_hashclient([
+ "--address", self.server_address,
+ "get-db-query-columns",
+ ], check=True)
+
+
class TestHashEquivalenceUnixServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
def get_server_addr(self, server_idx):
return "unix://" + os.path.join(self.temp_dir.name, 'sock%d' % server_idx)
@@ -483,3 +1097,77 @@ class TestHashEquivalenceTCPServer(HashEquivalenceTestSetup, HashEquivalenceComm
# If IPv6 is enabled, it should be safe to use localhost directly, in general
# case it is more reliable to resolve the IP address explicitly.
return socket.gethostbyname("localhost") + ":0"
+
+
+class TestHashEquivalenceWebsocketServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
+ def setUp(self):
+ try:
+ import websockets
+ except ImportError as e:
+ self.skipTest(str(e))
+
+ super().setUp()
+
+ def get_server_addr(self, server_idx):
+ # Some hosts cause asyncio module to misbehave, when IPv6 is not enabled.
+ # If IPv6 is enabled, it should be safe to use localhost directly, in general
+ # case it is more reliable to resolve the IP address explicitly.
+ host = socket.gethostbyname("localhost")
+ return "ws://%s:0" % host
+
+
+class TestHashEquivalenceWebsocketsSQLAlchemyServer(TestHashEquivalenceWebsocketServer):
+ def setUp(self):
+ try:
+ import sqlalchemy
+ import aiosqlite
+ except ImportError as e:
+ self.skipTest(str(e))
+
+ super().setUp()
+
+ def make_dbpath(self):
+ return "sqlite+aiosqlite:///%s" % os.path.join(self.temp_dir.name, "db%d.sqlite" % self.server_index)
+
+
+class TestHashEquivalenceExternalServer(HashEquivalenceTestSetup, HashEquivalenceCommonTests, unittest.TestCase):
+ def get_env(self, name):
+ v = os.environ.get(name)
+ if not v:
+ self.skipTest(f'{name} not defined to test an external server')
+ return v
+
+ def start_test_server(self):
+ return self.get_env('BB_TEST_HASHSERV')
+
+ def start_server(self, *args, **kwargs):
+ self.skipTest('Cannot start local server when testing external servers')
+
+ def start_auth_server(self):
+
+ self.auth_server_address = self.server_address
+ self.admin_client = self.start_client(
+ self.server_address,
+ username=self.get_env('BB_TEST_HASHSERV_USERNAME'),
+ password=self.get_env('BB_TEST_HASHSERV_PASSWORD'),
+ )
+ return self.admin_client
+
+ def setUp(self):
+ super().setUp()
+ if "BB_TEST_HASHSERV_USERNAME" in os.environ:
+ self.client = self.start_client(
+ self.server_address,
+ username=os.environ["BB_TEST_HASHSERV_USERNAME"],
+ password=os.environ["BB_TEST_HASHSERV_PASSWORD"],
+ )
+ self.client.remove({"method": self.METHOD})
+
+ def tearDown(self):
+ self.client.remove({"method": self.METHOD})
+ super().tearDown()
+
+
+ def test_auth_get_all_users(self):
+ self.skipTest("Cannot test all users with external server")
+