diff options
author | Martin Pitt <martin.pitt@ubuntu.com> | 2014-08-19 11:15:03 +0200 |
---|---|---|
committer | Martin Pitt <martin.pitt@ubuntu.com> | 2014-08-19 11:15:03 +0200 |
commit | c6cb156f6afb1ab5967ca03db1fa79ad12130dc7 (patch) | |
tree | 47ccc32592159d21ae1fac40daf1849148b69658 | |
parent | eb1a56be346e49460b6131b7f546f00360e4a78f (diff) |
integration-test: Fix code formatting
Fix all pyflakes and PEP-8 errors.
-rwxr-xr-x | src/tests/integration-test | 204 |
1 files changed, 100 insertions, 104 deletions
diff --git a/src/tests/integration-test b/src/tests/integration-test index 215e20f..caf3231 100755 --- a/src/tests/integration-test +++ b/src/tests/integration-test @@ -4,10 +4,10 @@ # # Run in udisks built tree to test local built binaries (needs # --localstatedir=/var), or from anywhere else to test system installed -# binaries. +# binaries. # # Usage: -# - Run all tests: +# - Run all tests: # src/tests/integration-test # - Run only a particular class of tests: # src/tests/integration-test Drive @@ -32,7 +32,6 @@ import sys import os -import pwd srcdir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__)))) libdir = os.path.join(srcdir, 'udisks', '.libs') @@ -43,8 +42,8 @@ libdir = os.path.join(srcdir, 'udisks', '.libs') if 'LD_LIBRARY_PATH' not in os.environ and os.path.isdir(libdir): os.environ['LD_LIBRARY_PATH'] = libdir os.environ['GI_TYPELIB_PATH'] = '%s/udisks:%s' % ( - srcdir, - os.environ.get('GI_TYPELIB_PATH', '')) + srcdir, + os.environ.get('GI_TYPELIB_PATH', '')) os.execv(sys.argv[0], sys.argv) assert False, 'not expecting to land here' @@ -64,8 +63,8 @@ from gi.repository import GLib, Gio, UDisks sys.path.insert(0, os.path.dirname(__file__)) import test_polkitd -#GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs -VDEV_SIZE = 300000000 # size of virtual test device +# GI_TYPELIB_PATH=udisks LD_LIBRARY_PATH=udisks/.libs +VDEV_SIZE = 300000000 # size of virtual test device # Those file systems are known to have a broken handling of permissions, in # particular the executable bit @@ -81,11 +80,12 @@ workaround_syncs = False no_options = GLib.Variant('a{sv}', {}) + # ---------------------------------------------------------------------------- class UDisksTestCase(unittest.TestCase): '''Base class for udisks test cases. - + This provides static functions which are useful for all test cases. ''' daemon = None @@ -106,7 +106,7 @@ class UDisksTestCase(unittest.TestCase): # run from local build tree if we are in one, otherwise use system instance klass.daemon_path = os.path.join(srcdir, 'src', 'udisksd') - if (os.access (klass.daemon_path, os.X_OK)): + if (os.access(klass.daemon_path, os.X_OK)): print('Testing binaries from local build tree') klass.check_build_tree_config() else: @@ -144,7 +144,7 @@ class UDisksTestCase(unittest.TestCase): def cleanup(klass): '''stop daemon again and clean up test environment''' - subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed + subprocess.call(['umount', klass.device], stderr=subprocess.PIPE) # if a test failed klass.stop_daemon() @@ -156,9 +156,10 @@ class UDisksTestCase(unittest.TestCase): @classmethod def start_daemon(klass): - assert klass.daemon == None + assert klass.daemon is None klass.daemon = subprocess.Popen([klass.daemon_path, '--replace'], - stdout=klass.daemon_log, stderr=subprocess.STDOUT) + stdout=klass.daemon_log, + stderr=subprocess.STDOUT) assert klass.daemon.pid, 'daemon failed to start' # wait until the daemon has started up @@ -167,7 +168,7 @@ class UDisksTestCase(unittest.TestCase): while klass.manager is None and timeout > 0: time.sleep(0.2) klass.client = UDisks.Client.new_sync(None) - assert klass.client != None + assert klass.client is not None klass.manager = klass.client.get_manager() timeout -= 1 assert klass.manager, 'daemon failed to start' @@ -185,7 +186,7 @@ class UDisksTestCase(unittest.TestCase): @classmethod def sync(klass): '''Wait until pending events finished processing. - + This should only be called for situations where we genuinely have an asynchronous response, like invoking a CLI program and waiting for udev/udisks to catch up on the change events. @@ -206,7 +207,7 @@ class UDisksTestCase(unittest.TestCase): @classmethod def sync_workaround(klass): '''Wait until pending events finished processing (bug workaround). - + This should be called for race conditions in the D-BUS API which cause properties to not be up to date yet when a method call finishes. Those should eventually get fixed properly, but it's unnerving to have the @@ -219,8 +220,8 @@ class UDisksTestCase(unittest.TestCase): @classmethod def zero_device(klass): - subprocess.call(['dd', 'if=/dev/zero', 'of='+klass.device, 'bs=10M'], - stderr=subprocess.PIPE) + subprocess.call(['dd', 'if=/dev/zero', 'of=' + klass.device, 'bs=10M'], + stderr=subprocess.PIPE) time.sleep(0.5) klass.sync() @@ -258,7 +259,7 @@ class UDisksTestCase(unittest.TestCase): @classmethod def udisks_filesystem(klass, partition=None, cd=False): '''Get UDisksFilesystem object for test device or partition - + Return None if there is no file system on that device. If cd is True, return the CD device, otherwise the hard disk device. @@ -297,24 +298,21 @@ class UDisksTestCase(unittest.TestCase): # signature (mailed kzak about it) if type == 'swap': subprocess.check_call(['wipefs', '-a', klass.devname(partition)], - stdout=subprocess.PIPE) - - mkcmd = { 'swap': 'mkswap', - 'ntfs': 'mkntfs', - } - label_opt = { 'vfat': '-n', - 'exfat': '-n', - 'reiserfs': '-l', - } - extra_opt = { 'vfat': [ '-I', '-F', '32'], - 'swap': ['-f'], - 'xfs': ['-f'], # XFS complains if there's an existing FS, so force - 'ext2': ['-F'], # ext* complains about using entire device, so force - 'ext3': ['-F'], - 'ext4': ['-F'], - 'ntfs': ['-F'], - 'reiserfs': ['-ff'], - } + stdout=subprocess.PIPE) + + mkcmd = {'swap': 'mkswap', + 'ntfs': 'mkntfs'} + label_opt = {'vfat': '-n', + 'exfat': '-n', + 'reiserfs': '-l'} + extra_opt = {'vfat': ['-I', '-F', '32'], + 'swap': ['-f'], + 'xfs': ['-f'], # XFS complains if there's an existing FS, so force + 'ext2': ['-F'], # ext* complains about using entire device, so force + 'ext3': ['-F'], + 'ext4': ['-F'], + 'ntfs': ['-F'], + 'reiserfs': ['-ff']} cmd = [mkcmd.get(type, 'mkfs.' + type)] + extra_opt.get(type, []) if label: @@ -327,7 +325,7 @@ class UDisksTestCase(unittest.TestCase): # tell us when they are done; so do a little kludge here to know how # long we need to wait subprocess.call(['udevadm', 'trigger', '--action=change', - '--sysname-match=' + os.path.basename(klass.devname(partition))]) + '--sysname-match=' + os.path.basename(klass.devname(partition))]) klass.sync() @classmethod @@ -347,7 +345,7 @@ class UDisksTestCase(unittest.TestCase): try: return fn(*args) except GLib.GError as e: - if not 'UDisks2.Error.DeviceBusy' in e.message: + if 'UDisks2.Error.DeviceBusy' not in e.message: raise sys.stderr.write('[busy] ') time.sleep(0.3) @@ -360,8 +358,7 @@ class UDisksTestCase(unittest.TestCase): # read make variables make_vars = {} var_re = re.compile('^([a-zA-Z_]+) = (.*)$') - make = subprocess.Popen(['make', '-p', '/dev/null'], - stdout=subprocess.PIPE) + make = subprocess.Popen(['make', '-p', '/dev/null'], stdout=subprocess.PIPE) for l in make.stdout: l = l.decode('UTF-8') m = var_re.match(l) @@ -382,15 +379,15 @@ class UDisksTestCase(unittest.TestCase): # check localstatedir for d in (os.path.join(make_vars['localstatedir'], 'run', 'udisks2'), - os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')): + os.path.join(make_vars['localstatedir'], 'lib', 'udisks2')): if not os.path.exists(d): sys.stderr.write('The directory %s does not exist; please create it before running these tests.\n' % d) sys.exit(0) - + @classmethod def setup_vdev(klass): '''create virtual test devices - + It is zeroed out initially. Return a pair (writable HD device path, readonly CD device path). @@ -414,7 +411,7 @@ class UDisksTestCase(unittest.TestCase): # craete a fake SCSI hard drive assert subprocess.call(['modprobe', 'scsi_debug', 'dev_size_mb=%i' % ( - VDEV_SIZE/1048576)]) == 0, 'Failure to modprobe scsi_debug' + VDEV_SIZE / 1048576)]) == 0, 'Failure to modprobe scsi_debug' # wait until drive got created rw_dirs = [] @@ -460,8 +457,7 @@ class UDisksTestCase(unittest.TestCase): '''release and remove virtual test device''' klass.remove_device(device) - assert subprocess.call(['rmmod', 'scsi_debug']) == 0, \ - 'Failure to rmmod scsi_debug' + assert subprocess.call(['rmmod', 'scsi_debug']) == 0, 'Failure to rmmod scsi_debug' @classmethod def remove_device(klass, device): @@ -475,7 +471,7 @@ class UDisksTestCase(unittest.TestCase): while os.path.exists(device): time.sleep(0.1) klass.sync() - time.sleep(0.5) # TODO + time.sleep(0.5) # TODO @classmethod def readd_devices(klass): @@ -490,6 +486,7 @@ class UDisksTestCase(unittest.TestCase): time.sleep(0.5) klass.sync() + # ---------------------------------------------------------------------------- class Manager(UDisksTestCase): @@ -508,7 +505,7 @@ class Manager(UDisksTestCase): fd_list = Gio.UnixFDList.new_from_array([f.fileno()]) (path, out_fd_list) = self.manager.call_loop_setup_sync( - GLib.Variant('h', 0), # fd index + GLib.Variant('h', 0), # fd index no_options, fd_list, None) @@ -543,7 +540,7 @@ class Manager(UDisksTestCase): fd_list = Gio.UnixFDList.new_from_array([f.fileno()]) (path, out_fd_list) = self.manager.call_loop_setup_sync( - GLib.Variant('h', 0), # fd index + GLib.Variant('h', 0), # fd index GLib.Variant('a{sv}', {'read-only': GLib.Variant('b', True)}), fd_list, None) @@ -560,8 +557,7 @@ class Manager(UDisksTestCase): self.assertEqual(loop.get_property('backing-file'), f.name) # can't format due to permission error - self.assertRaises(GLib.GError, block.call_format_sync, 'ext2', - no_options, None) + self.assertRaises(GLib.GError, block.call_format_sync, 'ext2', no_options, None) self.assertEqual(block.get_property('id-label'), '') self.assertEqual(block.get_property('id-usage'), '') @@ -570,6 +566,7 @@ class Manager(UDisksTestCase): self.client.settle() loop.call_delete_sync(no_options, None) + # ---------------------------------------------------------------------------- class Drive(UDisksTestCase): @@ -584,13 +581,14 @@ class Drive(UDisksTestCase): self.assertEqual(self.drive.get_property('model'), 'scsi_debug') self.assertEqual(self.drive.get_property('vendor'), 'Linux') - self.assertAlmostEqual(self.drive.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0) + self.assertAlmostEqual(self.drive.get_property('size') / 1.e6, VDEV_SIZE / 1.e6, 0) self.assertEqual(self.drive.get_property('media-available'), True) self.assertEqual(self.drive.get_property('optical'), False) self.assertNotEqual(len(self.drive.get_property('serial')), 0) self.assertNotEqual(len(self.drive.get_property('revision')), 0) + # ---------------------------------------------------------------------------- class FS(UDisksTestCase): @@ -604,7 +602,7 @@ class FS(UDisksTestCase): def tearDown(self): if subprocess.call(['umount', self.device], stderr=subprocess.PIPE) == 0: sys.stderr.write('[cleanup unmount] ') - shutil.rmtree (self.workdir) + shutil.rmtree(self.workdir) def test_zero(self): '''properties of zeroed out device''' @@ -617,7 +615,7 @@ class FS(UDisksTestCase): self.assertEqual(self.block.get_property('id-usage'), '') self.assertEqual(self.block.get_property('id-type'), '') self.assertEqual(self.block.get_property('id-uuid'), '') - self.assertAlmostEqual(self.block.get_property('size')/1.e6, VDEV_SIZE/1.e6, 0) + self.assertAlmostEqual(self.block.get_property('size') / 1.e6, VDEV_SIZE / 1.e6, 0) obj = self.client.get_object(self.block.get_object_path()) self.assertEqual(obj.get_property('filesystem'), None) self.assertEqual(obj.get_property('partition'), None) @@ -807,7 +805,7 @@ class FS(UDisksTestCase): mkfs = 'mkfs.' + type if type != 'swap' and subprocess.call(['which', mkfs], - stdout=subprocess.PIPE) != 0: + stdout=subprocess.PIPE) != 0: sys.stderr.write('[no %s, skip] ' % mkfs) # check correct D-Bus exception @@ -875,7 +873,7 @@ class FS(UDisksTestCase): # mount it if type == 'ntfs' and subprocess.call(['which', 'mount.ntfs-3g'], - stdout=subprocess.PIPE) == 0: + stdout=subprocess.PIPE) == 0: # prefer mount.ntfs-3g if we have it (on Debian; Ubuntu # defaults to ntfs-3g if installed); TODO: check other distros mount_prog = 'mount.ntfs-3g' @@ -900,7 +898,7 @@ class FS(UDisksTestCase): def _do_udisks_check(self, type, label=None): '''udisks API correctly changes file system''' - # create fs + # create fs if label is not None: options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', label)}) else: @@ -961,23 +959,22 @@ class FS(UDisksTestCase): self.assertEqual(fs.get_property('mount-points'), [mount_path]) # create fs with taking ownership (daemon:mail == 1:8) - #if supports_unix_owners: - # options.append('take_ownership_uid=1') - # options.append('take_ownership_gid=8') - # self.fs_create(None, type, options) - # mount_path = iface.FilesystemMount('', []) - # st = os.stat(mount_path) - # self.assertEqual((st.st_uid, st.st_gid), (1, 8)) - # self.retry_busy(self.partition_iface().FilesystemUnmount, []) - # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed') + # if supports_unix_owners: + # options.append('take_ownership_uid=1') + # options.append('take_ownership_gid=8') + # self.fs_create(None, type, options) + # mount_path = iface.FilesystemMount('', []) + # st = os.stat(mount_path) + # self.assertEqual((st.st_uid, st.st_gid), (1, 8)) + # self.retry_busy(self.partition_iface().FilesystemUnmount, []) + # self.assertFalse(os.path.exists(mount_path), 'mount point was not removed') # change label supported = True l = 'n"a\m\\"e' + type if type == 'vfat': # VFAT does not support some characters - self.assertRaises(GLib.GError, fs.call_set_label_sync, l, - no_options, None) + self.assertRaises(GLib.GError, fs.call_set_label_sync, l, no_options, None) l = "n@a$me" try: fs.call_set_label_sync(l, no_options, None) @@ -992,7 +989,7 @@ class FS(UDisksTestCase): if supported: block = self.udisks_block() blkid_label = self.blkid().get('ID_FS_LABEL_ENC', '').replace('\\x22', '"').replace( - '\\x5c', '\\').replace('\\x24', '$') + '\\x5c', '\\').replace('\\x24', '$') self.sync_workaround() if type == 'vfat': # EXFAIL: often (but not always) the label appears in all upper case @@ -1009,7 +1006,7 @@ class FS(UDisksTestCase): self.assertEqual(block.get_property('id-label'), '') # check fs - Not implemented in udisks yet - #self.assertEqual(iface.FilesystemCheck([]), True) + # self.assertEqual(iface.FilesystemCheck([]), True) # check mounting of a read-only device # this is known-broken for reiserfs and xfs right now: @@ -1019,7 +1016,7 @@ class FS(UDisksTestCase): # the scsi_debug CD drive content is the same as for the HD drive, but # udev does not know about this; so give it a nudge to re-probe subprocess.call(['udevadm', 'trigger', '--action=change', - '--sysname-match=' + os.path.basename(self.cd_device)]) + '--sysname-match=' + os.path.basename(self.cd_device)]) self.sync() self.sync() cd_fs = self.udisks_filesystem(cd=True) @@ -1071,14 +1068,15 @@ class FS(UDisksTestCase): self.assertTrue(os.access(f, os.W_OK)) self.assertTrue(os.access(f, os.X_OK)) -## ---------------------------------------------------------------------------- + +# ---------------------------------------------------------------------------- class Smart(UDisksTestCase): '''Check SMART operation.''' def test_sda(self): '''SMART status of first internal hard disk - + This is a best-effort readonly test. ''' hd = '/dev/sda' @@ -1088,13 +1086,14 @@ class Smart(UDisksTestCase): return has_smart = subprocess.call(['skdump', '--can-smart', hd], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) == 0 + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) == 0 block = self.client.get_block_for_dev(os.stat(hd).st_rdev) self.assertNotEqual(block, None) drive = self.client.get_drive_for_block(block) ata = self.client.get_object(drive.get_object_path()).get_property('drive-ata') - self.assertEqual(ata != None, has_smart) + self.assertEqual(ata is not None, has_smart) if has_smart: sys.stderr.write('[avail] ') @@ -1138,8 +1137,7 @@ class Luks(UDisksTestCase): self.fs_create(None, 'ext4', GLib.Variant('a{sv}', { 'encrypt.passphrase': GLib.Variant('s', 's3kr1t'), - 'label': GLib.Variant('s', 'treasure'), - })) + 'label': GLib.Variant('s', 'treasure')})) try: block = self.udisks_block() @@ -1158,15 +1156,15 @@ class Luks(UDisksTestCase): # check whether we can lock/unlock; we also need this to get the # cleartext device encrypted.call_lock_sync(no_options, None) - self.assertRaises(GLib.GError, encrypted.call_lock_sync, - no_options, None) - + self.assertRaises(GLib.GError, encrypted.call_lock_sync, + no_options, None) + # wrong password - self.assertRaises(GLib.GError, encrypted.call_unlock_sync, - 'h4ckpassword', no_options, None) + self.assertRaises(GLib.GError, encrypted.call_unlock_sync, + 'h4ckpassword', no_options, None) # right password - clear_path = encrypted.call_unlock_sync('s3kr1t', - no_options, None) + clear_path = encrypted.call_unlock_sync('s3kr1t', + no_options, None) # check cleartext device info clear_obj = self.client.get_object(clear_path) @@ -1179,14 +1177,14 @@ class Luks(UDisksTestCase): clear_dev = clear_block.get_property('device') self.assertNotEqual(clear_dev, None) self.assertEqual(clear_block.get_property('id-uuid'), - self.blkid(device=clear_dev)['ID_FS_UUID']) + self.blkid(device=clear_dev)['ID_FS_UUID']) clear_fs = clear_obj.get_property('filesystem') self.assertEqual(clear_fs.get_property('mount-points'), []) # check that we do not leak key information udev_dump = subprocess.Popen(['udevadm', 'info', '--export-db'], - stdout=subprocess.PIPE) + stdout=subprocess.PIPE) out = udev_dump.communicate()[0] self.assertFalse(b's3kr1t' in out, 'password in udev properties') self.assertFalse(b'essiv:sha' in out, 'key information in udev properties') @@ -1202,8 +1200,7 @@ class Luks(UDisksTestCase): crypt_obj = self.client.get_object(self.udisks_block().get_object_path()) encrypted = crypt_obj.get_property('encrypted') - path = encrypted.call_unlock_sync('s3kr1t', - no_options, None) + path = encrypted.call_unlock_sync('s3kr1t', no_options, None) self.client.settle() obj = self.client.get_object(path) fs = obj.get_property('filesystem') @@ -1242,8 +1239,8 @@ class Luks(UDisksTestCase): # unlock and mount it crypt_obj = self.client.get_object(self.udisks_block().get_object_path()) - path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', - no_options, None) + path = crypt_obj.get_property('encrypted').call_unlock_sync( + 's3kr1t', no_options, None) try: fs = self.client.get_object(path).get_property('filesystem') mount_path = fs.call_mount_sync(no_options, None) @@ -1270,8 +1267,8 @@ class Luks(UDisksTestCase): # after putting it back, it should be mountable again crypt_obj = self.client.get_object(self.udisks_block().get_object_path()) - path = crypt_obj.get_property('encrypted').call_unlock_sync('s3kr1t', - no_options, None) + path = crypt_obj.get_property('encrypted').call_unlock_sync( + 's3kr1t', no_options, None) self.client.settle() fs = self.client.get_object(path).get_property('filesystem') mount_path = fs.call_mount_sync(no_options, None) @@ -1286,10 +1283,11 @@ class Luks(UDisksTestCase): finally: # lock crypt_obj.get_property('encrypted').call_lock_sync( - no_options, None) + no_options, None) self.client.settle() self.assertEqual(self.client.get_object(path), None) + # ---------------------------------------------------------------------------- class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): @@ -1304,7 +1302,7 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): with self.assertRaises(GLib.GError) as cm: self.fs_create(None, 'ext4', options) self.assertTrue('UDisks2.Error.NotAuthorized' in cm.exception.message, - cm.exception.message) + cm.exception.message) # did not actually do anything block = self.udisks_block() @@ -1314,7 +1312,7 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): '''Create FS on internal drive (allowed)''' self.start_polkitd(['org.freedesktop.udisks2.modify-device-system', - 'org.freedesktop.udisks2.modify-device']) + 'org.freedesktop.udisks2.modify-device']) options = GLib.Variant('a{sv}', {'label': GLib.Variant('s', 'polkityes')}) self.fs_create(None, 'ext4', options) @@ -1332,7 +1330,7 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): # the scsi_debug CD drive content is the same as for the HD drive, but # udev does not know about this; so give it a nudge to re-probe subprocess.call(['udevadm', 'trigger', '--action=change', - '--sysname-match=' + os.path.basename(self.cd_device)]) + '--sysname-match=' + os.path.basename(self.cd_device)]) self.sync() self.sync() @@ -1349,24 +1347,22 @@ class Polkit(UDisksTestCase, test_polkitd.PolkitTestCase): if __name__ == '__main__': argparser = argparse.ArgumentParser(description='udisks2 integration test suite') argparser.add_argument('-l', '--log-file', dest='logfile', - help='write daemon log to a file') - argparser.add_argument('-w', '--no-workarounds', - action="store_true", default=False, - help='Disable workarounds for race conditions in the D-BUS API') + help='write daemon log to a file') + argparser.add_argument('-w', '--no-workarounds', action="store_true", default=False, + help='Disable workarounds for race conditions in the D-BUS API') argparser.add_argument('testname', nargs='*', - help='name of test class or method (e. g. "Drive", "FS.test_ext2")') + help='name of test class or method (e. g. "Drive", "FS.test_ext2")') args = argparser.parse_args() workaround_syncs = not args.no_workarounds UDisksTestCase.init(logfile=args.logfile) if args.testname: - tests = unittest.TestLoader().loadTestsFromNames(args.testname, - __import__('__main__')) + tests = unittest.TestLoader().loadTestsFromNames( + args.testname, __import__('__main__')) else: tests = unittest.TestLoader().loadTestsFromName('__main__') if unittest.TextTestRunner(verbosity=2).run(tests).wasSuccessful(): sys.exit(0) else: sys.exit(1) - |