summaryrefslogtreecommitdiff
path: root/frontend/tko/rpc_interface_unittest.py
diff options
context:
space:
mode:
Diffstat (limited to 'frontend/tko/rpc_interface_unittest.py')
-rw-r--r--frontend/tko/rpc_interface_unittest.py576
1 files changed, 576 insertions, 0 deletions
diff --git a/frontend/tko/rpc_interface_unittest.py b/frontend/tko/rpc_interface_unittest.py
new file mode 100644
index 00000000..4cc2eb12
--- /dev/null
+++ b/frontend/tko/rpc_interface_unittest.py
@@ -0,0 +1,576 @@
+#!/usr/bin/python
+
+import re, unittest
+import common
+from autotest_lib.frontend import setup_django_environment
+from autotest_lib.frontend import setup_test_environment
+from autotest_lib.client.common_lib.test_utils import mock
+from django.db import connection
+from autotest_lib.frontend.tko import models, rpc_interface
+
+# this will need to be updated when the view changes for the test to be
+# consistent with reality
+_CREATE_TEST_VIEW = """
+CREATE VIEW tko_test_view_2 AS
+SELECT tko_tests.test_idx AS test_idx,
+ tko_tests.job_idx AS job_idx,
+ tko_tests.test AS test_name,
+ tko_tests.subdir AS subdir,
+ tko_tests.kernel_idx AS kernel_idx,
+ tko_tests.status AS status_idx,
+ tko_tests.reason AS reason,
+ tko_tests.machine_idx AS machine_idx,
+ tko_tests.started_time AS test_started_time,
+ tko_tests.finished_time AS test_finished_time,
+ tko_jobs.tag AS job_tag,
+ tko_jobs.label AS job_name,
+ tko_jobs.username AS job_owner,
+ tko_jobs.queued_time AS job_queued_time,
+ tko_jobs.started_time AS job_started_time,
+ tko_jobs.finished_time AS job_finished_time,
+ tko_jobs.afe_job_id AS afe_job_id,
+ tko_machines.hostname AS hostname,
+ tko_machines.machine_group AS platform,
+ tko_machines.owner AS machine_owner,
+ tko_kernels.kernel_hash AS kernel_hash,
+ tko_kernels.base AS kernel_base,
+ tko_kernels.printable AS kernel,
+ tko_status.word AS status
+FROM tko_tests
+INNER JOIN tko_jobs ON tko_jobs.job_idx = tko_tests.job_idx
+INNER JOIN tko_machines ON tko_machines.machine_idx = tko_jobs.machine_idx
+INNER JOIN tko_kernels ON tko_kernels.kernel_idx = tko_tests.kernel_idx
+INNER JOIN tko_status ON tko_status.status_idx = tko_tests.status;
+"""
+
+# this will need to be updated if the table schemas change (or removed if we
+# add proper primary keys)
+_CREATE_ITERATION_ATTRIBUTES = """
+CREATE TABLE "tko_iteration_attributes" (
+ "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"),
+ "iteration" integer NOT NULL,
+ "attribute" varchar(90) NOT NULL,
+ "value" varchar(300) NOT NULL
+);
+"""
+
+_CREATE_ITERATION_RESULTS = """
+CREATE TABLE "tko_iteration_result" (
+ "test_idx" integer NOT NULL REFERENCES "tko_tests" ("test_idx"),
+ "iteration" integer NOT NULL,
+ "attribute" varchar(90) NOT NULL,
+ "value" numeric(12, 31) NULL
+);
+"""
+
+
+def setup_test_view():
+ """
+ Django has no way to actually represent a view; we simply create a model for
+ TestView. This means when we syncdb, Django will create a table for it.
+ So manually remove that table and replace it with a view.
+ """
+ cursor = connection.cursor()
+ cursor.execute('DROP TABLE tko_test_view_2')
+ cursor.execute(_CREATE_TEST_VIEW)
+
+
+def fix_iteration_tables():
+ """
+ Since iteration tables don't have any real primary key, we "fake" one in the
+ Django models. So fix up the generated schema to match the real schema.
+ """
+ cursor = connection.cursor()
+ cursor.execute('DROP TABLE tko_iteration_attributes')
+ cursor.execute(_CREATE_ITERATION_ATTRIBUTES)
+ cursor.execute('DROP TABLE tko_iteration_result')
+ cursor.execute(_CREATE_ITERATION_RESULTS)
+
+
+class RpcInterfaceTest(unittest.TestCase):
+ def setUp(self):
+ self._god = mock.mock_god()
+ self._god.stub_with(models.TempManager, '_get_column_names',
+ self._get_column_names_for_sqlite3)
+ self._god.stub_with(models.TempManager, '_cursor_rowcount',
+ self._cursor_rowcount_for_sqlite3)
+
+ # add some functions to SQLite for MySQL compatibility
+ connection.cursor() # ensure connection is alive
+ connection.connection.create_function('if', 3, self._sqlite_if)
+ connection.connection.create_function('find_in_set', 2,
+ self._sqlite_find_in_set)
+
+ setup_test_environment.set_up()
+ fix_iteration_tables()
+ setup_test_view()
+ self._create_initial_data()
+
+
+ def _cursor_rowcount_for_sqlite3(self, cursor):
+ return len(cursor.fetchall())
+
+
+ def _sqlite_find_in_set(self, needle, haystack):
+ return needle in haystack.split(',')
+
+
+ def _sqlite_if(self, condition, true_result, false_result):
+ if condition:
+ return true_result
+ return false_result
+
+
+ # sqlite takes any columns that don't have aliases and names them
+ # "table_name"."column_name". we map these to just column_name.
+ _SQLITE_AUTO_COLUMN_ALIAS_RE = re.compile(r'".+"\."(.+)"')
+
+
+ def _get_column_names_for_sqlite3(self, cursor):
+ names = [column_info[0] for column_info in cursor.description]
+
+ # replace all "table_name"."column_name" constructs with just
+ # column_name
+ for i, name in enumerate(names):
+ match = self._SQLITE_AUTO_COLUMN_ALIAS_RE.match(name)
+ if match:
+ names[i] = match.group(1)
+
+ return names
+
+
+ def tearDown(self):
+ setup_test_environment.tear_down()
+ self._god.unstub_all()
+
+
+ def _create_initial_data(self):
+ machine = models.Machine.objects.create(hostname='myhost')
+
+ # create basic objects
+ kernel_name = 'mykernel1'
+ kernel1 = models.Kernel.objects.create(kernel_hash=kernel_name,
+ base=kernel_name,
+ printable=kernel_name)
+
+ kernel_name = 'mykernel2'
+ kernel2 = models.Kernel.objects.create(kernel_hash=kernel_name,
+ base=kernel_name,
+ printable=kernel_name)
+
+ good_status = models.Status.objects.create(word='GOOD')
+ failed_status = models.Status.objects.create(word='FAILED')
+
+ job1 = models.Job.objects.create(tag='1-myjobtag1', label='myjob1',
+ username='myuser', machine=machine)
+ job2 = models.Job.objects.create(tag='2-myjobtag2', label='myjob2',
+ username='myuser', machine=machine)
+
+ job1_test1 = models.Test.objects.create(job=job1, test='mytest1',
+ kernel=kernel1,
+ status=good_status,
+ machine=machine)
+ self.first_test = job1_test1
+ job1_test2 = models.Test.objects.create(job=job1, test='mytest2',
+ kernel=kernel1,
+ status=failed_status,
+ machine=machine)
+ job2_test1 = models.Test.objects.create(job=job2, test='kernbench',
+ kernel=kernel2,
+ status=good_status,
+ machine=machine)
+
+ # create test attributes, test labels, and iterations
+ # like Noah's Ark, include two of each...just in case there's a bug with
+ # multiple related items
+ models.TestAttribute.objects.create(test=job1_test1, attribute='myattr',
+ value='myval')
+ models.TestAttribute.objects.create(test=job1_test1,
+ attribute='myattr2', value='myval2')
+
+ self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1,
+ iteration=1, attribute='iattr',
+ value='ival')
+ self._add_iteration_keyval('tko_iteration_attributes', test=job1_test1,
+ iteration=1, attribute='iattr2',
+ value='ival2')
+ self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
+ iteration=1, attribute='iresult', value=1)
+ self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
+ iteration=1, attribute='iresult2', value=2)
+ self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
+ iteration=2, attribute='iresult', value=3)
+ self._add_iteration_keyval('tko_iteration_result', test=job1_test1,
+ iteration=2, attribute='iresult2', value=4)
+
+ label1 = models.TestLabel.objects.create(name='testlabel1')
+ label2 = models.TestLabel.objects.create(name='testlabel2')
+
+ label1.tests.add(job1_test1)
+ label2.tests.add(job1_test1)
+
+
+ def _add_iteration_keyval(self, table, test, iteration, attribute, value):
+ cursor = connection.cursor()
+ cursor.execute('INSERT INTO %s ' 'VALUES (%%s, %%s, %%s, %%s)' % table,
+ (test.test_idx, iteration, attribute, value))
+
+
+ def _check_for_get_test_views(self, test):
+ self.assertEquals(test['test_name'], 'mytest1')
+ self.assertEquals(test['job_tag'], '1-myjobtag1')
+ self.assertEquals(test['job_name'], 'myjob1')
+ self.assertEquals(test['job_owner'], 'myuser')
+ self.assertEquals(test['status'], 'GOOD')
+ self.assertEquals(test['hostname'], 'myhost')
+ self.assertEquals(test['kernel'], 'mykernel1')
+
+
+ def test_get_detailed_test_views(self):
+ test = rpc_interface.get_detailed_test_views()[0]
+
+ self._check_for_get_test_views(test)
+
+ self.assertEquals(test['attributes'], {'myattr': 'myval',
+ 'myattr2': 'myval2'})
+ self.assertEquals(test['iterations'], [{'attr': {'iattr': 'ival',
+ 'iattr2': 'ival2'},
+ 'perf': {'iresult': 1,
+ 'iresult2': 2}},
+ {'attr': {},
+ 'perf': {'iresult': 3,
+ 'iresult2': 4}}])
+ self.assertEquals(test['labels'], ['testlabel1', 'testlabel2'])
+
+
+ def test_test_attributes(self):
+ rpc_interface.set_test_attribute('foo', 'bar', test_name='mytest1')
+ test = rpc_interface.get_detailed_test_views()[0]
+ self.assertEquals(test['attributes'], {'foo': 'bar',
+ 'myattr': 'myval',
+ 'myattr2': 'myval2'})
+
+ rpc_interface.set_test_attribute('foo', 'goo', test_name='mytest1')
+ test = rpc_interface.get_detailed_test_views()[0]
+ self.assertEquals(test['attributes'], {'foo': 'goo',
+ 'myattr': 'myval',
+ 'myattr2': 'myval2'})
+
+ rpc_interface.set_test_attribute('foo', None, test_name='mytest1')
+ test = rpc_interface.get_detailed_test_views()[0]
+ self.assertEquals(test['attributes'], {'myattr': 'myval',
+ 'myattr2': 'myval2'})
+
+
+ def test_immutable_attributes(self):
+ self.assertRaises(ValueError, rpc_interface.set_test_attribute,
+ 'myattr', 'foo', test_name='mytest1')
+
+
+ def test_get_test_views(self):
+ tests = rpc_interface.get_test_views()
+
+ self.assertEquals(len(tests), 3)
+ test = rpc_interface.get_test_views(
+ job_name='myjob1', test_name='mytest1')[0]
+ self.assertEquals(tests[0], test)
+
+ self._check_for_get_test_views(test)
+
+ self.assertEquals(
+ [], rpc_interface.get_test_views(hostname='fakehost'))
+
+
+ def _check_test_names(self, tests, expected_names):
+ self.assertEquals(set(test['test_name'] for test in tests),
+ set(expected_names))
+
+
+ def test_get_test_views_filter_on_labels(self):
+ tests = rpc_interface.get_test_views(include_labels=['testlabel1'])
+ self._check_test_names(tests, ['mytest1'])
+
+ tests = rpc_interface.get_test_views(exclude_labels=['testlabel1'])
+ self._check_test_names(tests, ['mytest2', 'kernbench'])
+
+
+ def test_get_test_views_filter_on_attributes(self):
+ tests = rpc_interface.get_test_views(
+ include_attributes_where='attribute = "myattr" '
+ 'and value = "myval"')
+ self._check_test_names(tests, ['mytest1'])
+
+ tests = rpc_interface.get_test_views(
+ exclude_attributes_where='attribute="myattr2"')
+ self._check_test_names(tests, ['mytest2', 'kernbench'])
+
+
+ def test_get_num_test_views(self):
+ self.assertEquals(rpc_interface.get_num_test_views(), 3)
+ self.assertEquals(rpc_interface.get_num_test_views(
+ job_name='myjob1', test_name='mytest1'), 1)
+
+
+ def test_get_group_counts(self):
+ self.assertEquals(rpc_interface.get_num_groups(['job_name']), 2)
+
+ counts = rpc_interface.get_group_counts(['job_name'])
+ groups = counts['groups']
+ self.assertEquals(len(groups), 2)
+ group1 = groups[0]
+ group2 = groups[1]
+
+ self.assertEquals(group1['group_count'], 2)
+ self.assertEquals(group1['job_name'], 'myjob1')
+ self.assertEquals(group2['group_count'], 1)
+ self.assertEquals(group2['job_name'], 'myjob2')
+
+ extra = {'extra' : 'kernel_hash'}
+ counts = rpc_interface.get_group_counts(['job_name'],
+ header_groups=[('job_name',)],
+ extra_select_fields=extra)
+ groups = counts['groups']
+ self.assertEquals(len(groups), 2)
+ group1 = groups[0]
+ group2 = groups[1]
+
+ self.assertEquals(group1['group_count'], 2)
+ self.assertEquals(group1['header_indices'], [0])
+ self.assertEquals(group1['extra'], 'mykernel1')
+ self.assertEquals(group2['group_count'], 1)
+ self.assertEquals(group2['header_indices'], [1])
+ self.assertEquals(group2['extra'], 'mykernel2')
+
+
+ def test_get_status_counts(self):
+ """\
+ This method cannot be tested with a sqlite3 test framework. The method
+ relies on the IF function, which is not present in sqlite3.
+ """
+
+
+ def test_get_latest_tests(self):
+ """\
+ This method cannot be tested with a sqlite3 test framework. The method
+ relies on the IF function, which is not present in sqlite3.
+ """
+
+
+ def test_get_job_ids(self):
+ self.assertEquals([1,2], rpc_interface.get_job_ids())
+ self.assertEquals([1], rpc_interface.get_job_ids(test_name='mytest2'))
+
+
+ def test_get_hosts_and_tests(self):
+ host_info = rpc_interface.get_hosts_and_tests()
+ self.assertEquals(len(host_info), 1)
+ info = host_info['myhost']
+
+ self.assertEquals(info['tests'], ['kernbench'])
+ self.assertEquals(info['id'], 1)
+
+
+ def _check_for_get_test_labels(self, label, label_num):
+ self.assertEquals(label['id'], label_num)
+ self.assertEquals(label['description'], '')
+ self.assertEquals(label['name'], 'testlabel%d' % label_num)
+
+
+ def test_test_labels(self):
+ labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
+ self.assertEquals(len(labels), 2)
+ label1 = labels[0]
+ label2 = labels[1]
+
+ self._check_for_get_test_labels(label1, 1)
+ self._check_for_get_test_labels(label2, 2)
+
+ rpc_interface.test_label_remove_tests(label1['id'], test_name='mytest1')
+
+ labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
+ self.assertEquals(len(labels), 1)
+ label = labels[0]
+
+ self._check_for_get_test_labels(label, 2)
+
+ rpc_interface.test_label_add_tests(label1['id'], test_name='mytest1')
+
+ labels = rpc_interface.get_test_labels_for_tests(test_name='mytest1')
+ self.assertEquals(len(labels), 2)
+ label1 = labels[0]
+ label2 = labels[1]
+
+ self._check_for_get_test_labels(label1, 1)
+ self._check_for_get_test_labels(label2, 2)
+
+
+ def test_get_test_attribute_fields(self):
+ tests = rpc_interface.get_test_views(
+ test_attribute_fields=['myattr', 'myattr2'])
+ self.assertEquals(len(tests), 3)
+
+ self.assertEquals(tests[0]['attribute_myattr'], 'myval')
+ self.assertEquals(tests[0]['attribute_myattr2'], 'myval2')
+
+ for index in (1, 2):
+ self.assertEquals(tests[index]['attribute_myattr'], None)
+ self.assertEquals(tests[index]['attribute_myattr2'], None)
+
+
+ def test_filtering_on_test_attribute_fields(self):
+ tests = rpc_interface.get_test_views(
+ extra_where='attribute_myattr.value = "myval"',
+ test_attribute_fields=['myattr'])
+ self.assertEquals(len(tests), 1)
+
+
+ def test_grouping_with_test_attribute_fields(self):
+ num_groups = rpc_interface.get_num_groups(
+ ['attribute_myattr'], test_attribute_fields=['myattr'])
+ self.assertEquals(num_groups, 2)
+
+ counts = rpc_interface.get_group_counts(
+ ['attribute_myattr'], test_attribute_fields=['myattr'])
+ groups = counts['groups']
+ self.assertEquals(len(groups), num_groups)
+ self.assertEquals(groups[0]['attribute_myattr'], None)
+ self.assertEquals(groups[0]['group_count'], 2)
+ self.assertEquals(groups[1]['attribute_myattr'], 'myval')
+ self.assertEquals(groups[1]['group_count'], 1)
+
+
+ def test_get_test_label_fields(self):
+ tests = rpc_interface.get_test_views(
+ test_label_fields=['testlabel1', 'testlabel2'])
+ self.assertEquals(len(tests), 3)
+
+ self.assertEquals(tests[0]['label_testlabel1'], 'testlabel1')
+ self.assert_(tests[0]['label_testlabel2'], 'testlabel2')
+
+ for index in (1, 2):
+ self.assertEquals(tests[index]['label_testlabel1'], None)
+ self.assertEquals(tests[index]['label_testlabel2'], None)
+
+
+ def test_filtering_on_test_label_fields(self):
+ tests = rpc_interface.get_test_views(
+ extra_where='label_testlabel1 = "testlabel1"',
+ test_label_fields=['testlabel1'])
+ self.assertEquals(len(tests), 1)
+
+
+ def test_grouping_on_test_label_fields(self):
+ num_groups = rpc_interface.get_num_groups(
+ ['label_testlabel1'], test_label_fields=['testlabel1'])
+ self.assertEquals(num_groups, 2)
+
+ counts = rpc_interface.get_group_counts(
+ ['label_testlabel1'], test_label_fields=['testlabel1'])
+ groups = counts['groups']
+ self.assertEquals(len(groups), 2)
+ self.assertEquals(groups[0]['label_testlabel1'], None)
+ self.assertEquals(groups[0]['group_count'], 2)
+ self.assertEquals(groups[1]['label_testlabel1'], 'testlabel1')
+ self.assertEquals(groups[1]['group_count'], 1)
+
+
+ def test_get_iteration_fields(self):
+ num_iterations = rpc_interface.get_num_test_views(
+ iteration_fields=['iresult', 'iresult2'])
+ self.assertEquals(num_iterations, 2)
+
+ iterations = rpc_interface.get_test_views(
+ iteration_fields=['iresult', 'iresult2'])
+ self.assertEquals(len(iterations), 2)
+
+ for index in (0, 1):
+ self.assertEquals(iterations[index]['test_idx'], 1)
+
+ self.assertEquals(iterations[0]['iteration_index'], 1)
+ self.assertEquals(iterations[0]['iteration_iresult'], 1)
+ self.assertEquals(iterations[0]['iteration_iresult2'], 2)
+
+ self.assertEquals(iterations[1]['iteration_index'], 2)
+ self.assertEquals(iterations[1]['iteration_iresult'], 3)
+ self.assertEquals(iterations[1]['iteration_iresult2'], 4)
+
+
+ def test_filtering_on_iteration_fields(self):
+ iterations = rpc_interface.get_test_views(
+ extra_where='iteration_iresult.value = 1',
+ iteration_fields=['iresult'])
+ self.assertEquals(len(iterations), 1)
+
+
+ def test_grouping_with_iteration_fields(self):
+ num_groups = rpc_interface.get_num_groups(['iteration_iresult'],
+ iteration_fields=['iresult'])
+ self.assertEquals(num_groups, 2)
+
+ counts = rpc_interface.get_group_counts(['iteration_iresult'],
+ iteration_fields=['iresult'])
+ groups = counts['groups']
+ self.assertEquals(len(groups), 2)
+ self.assertEquals(groups[0]['iteration_iresult'], 1)
+ self.assertEquals(groups[0]['group_count'], 1)
+ self.assertEquals(groups[1]['iteration_iresult'], 3)
+ self.assertEquals(groups[1]['group_count'], 1)
+
+
+ def _setup_machine_labels(self):
+ models.TestAttribute.objects.create(test=self.first_test,
+ attribute='host-labels',
+ value='label1,label2')
+
+
+ def test_get_machine_label_fields(self):
+ self._setup_machine_labels()
+
+ tests = rpc_interface.get_test_views(
+ machine_label_fields=['label1', 'otherlabel'])
+ self.assertEquals(len(tests), 3)
+
+ self.assertEquals(tests[0]['machine_label_label1'], 'label1')
+ self.assertEquals(tests[0]['machine_label_otherlabel'], None)
+
+ for index in (1, 2):
+ self.assertEquals(tests[index]['machine_label_label1'], None)
+ self.assertEquals(tests[index]['machine_label_otherlabel'], None)
+
+
+ def test_grouping_with_machine_label_fields(self):
+ self._setup_machine_labels()
+
+ counts = rpc_interface.get_group_counts(['machine_label_label1'],
+ machine_label_fields=['label1'])
+ groups = counts['groups']
+ self.assertEquals(len(groups), 2)
+ self.assertEquals(groups[0]['machine_label_label1'], None)
+ self.assertEquals(groups[0]['group_count'], 2)
+ self.assertEquals(groups[1]['machine_label_label1'], 'label1')
+ self.assertEquals(groups[1]['group_count'], 1)
+
+
+ def test_filtering_on_machine_label_fields(self):
+ self._setup_machine_labels()
+
+ tests = rpc_interface.get_test_views(
+ extra_where='machine_label_label1 = "label1"',
+ machine_label_fields=['label1'])
+ self.assertEquals(len(tests), 1)
+
+
+ def test_quoting_fields(self):
+ # ensure fields with special characters are properly quoted throughout
+ rpc_interface.add_test_label('hyphen-label')
+ rpc_interface.get_group_counts(
+ ['attribute_hyphen-attr', 'label_hyphen-label',
+ 'machine_label_hyphen-label', 'iteration_hyphen-result'],
+ test_attribute_fields=['hyphen-attr'],
+ test_label_fields=['hyphen-label'],
+ machine_label_fields=['hyphen-label'],
+ iteration_fields=['hyphen-result'])
+
+
+if __name__ == '__main__':
+ unittest.main()