PyORAm
[iotcloud.git] / PyORAM / src / pyoram / tests / test_encrypted_block_storage.py
diff --git a/PyORAM/src/pyoram/tests/test_encrypted_block_storage.py b/PyORAM/src/pyoram/tests/test_encrypted_block_storage.py
new file mode 100644 (file)
index 0000000..af9a4f0
--- /dev/null
@@ -0,0 +1,814 @@
+import os
+import unittest2
+import tempfile
+import struct
+
+from pyoram.storage.block_storage import \
+    BlockStorageTypeFactory
+from pyoram.encrypted_storage.encrypted_block_storage import \
+    EncryptedBlockStorage
+from pyoram.crypto.aes import AES
+
+from six.moves import xrange
+
+thisdir = os.path.dirname(os.path.abspath(__file__))
+
+class _TestEncryptedBlockStorage(object):
+
+    _type_name = None
+    _aes_mode = None
+    _test_key = None
+    _test_key_size = None
+
+    @classmethod
+    def setUpClass(cls):
+        assert cls._type_name is not None
+        assert cls._aes_mode is not None
+        assert not ((cls._test_key is not None) and \
+                    (cls._test_key_size is not None))
+        fd, cls._dummy_name = tempfile.mkstemp()
+        os.close(fd)
+        try:
+            os.remove(cls._dummy_name)
+        except OSError:                                # pragma: no cover
+            pass                                       # pragma: no cover
+        cls._block_size = 25
+        cls._block_count = 5
+        cls._testfname = cls.__name__ + "_testfile.bin"
+        cls._blocks = []
+        f = EncryptedBlockStorage.setup(
+            cls._testfname,
+            cls._block_size,
+            cls._block_count,
+            key_size=cls._test_key_size,
+            key=cls._test_key,
+            storage_type=cls._type_name,
+            aes_mode=cls._aes_mode,
+            initialize=lambda i: bytes(bytearray([i])*cls._block_size),
+            ignore_existing=True)
+        f.close()
+        cls._key = f.key
+        for i in range(cls._block_count):
+            data = bytearray([i])*cls._block_size
+            cls._blocks.append(data)
+
+    @classmethod
+    def tearDownClass(cls):
+        try:
+            os.remove(cls._testfname)
+        except OSError:                                # pragma: no cover
+            pass                                       # pragma: no cover
+        try:
+            os.remove(cls._dummy_name)
+        except OSError:                                # pragma: no cover
+            pass                                       # pragma: no cover
+
+    def test_setup_fails(self):
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(IOError):
+            EncryptedBlockStorage.setup(
+                os.path.join(thisdir,
+                             "baselines",
+                             "exists.empty"),
+                block_size=10,
+                block_count=10,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(IOError):
+            EncryptedBlockStorage.setup(
+                os.path.join(thisdir,
+                             "baselines",
+                             "exists.empty"),
+                block_size=10,
+                block_count=10,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                storage_type=self._type_name,
+                aes_mode=self._aes_mode,
+                ignore_existing=False)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=0,
+                block_count=1,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=0,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(TypeError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name,
+                header_data=2)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key=self._test_key,
+                key_size=self._test_key_size,
+                aes_mode=None,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key_size=-1,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(TypeError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key=-1,
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key=AES.KeyGen(AES.key_sizes[-1]),
+                key_size=AES.key_sizes[-1],
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(ValueError):
+            EncryptedBlockStorage.setup(
+                self._dummy_name,
+                block_size=1,
+                block_count=1,
+                key=os.urandom(AES.key_sizes[-1]+100),
+                aes_mode=self._aes_mode,
+                storage_type=self._type_name)
+
+    def test_setup(self):
+        fname = ".".join(self.id().split(".")[1:])
+        fname += ".bin"
+        fname = os.path.join(thisdir, fname)
+        if os.path.exists(fname):
+            os.remove(fname)                           # pragma: no cover
+        bsize = 10
+        bcount = 11
+        fsetup = EncryptedBlockStorage.setup(
+            fname,
+            bsize,
+            bcount,
+            key=self._test_key,
+            key_size=self._test_key_size,
+            aes_mode=self._aes_mode,
+            storage_type=self._type_name)
+        fsetup.close()
+        self.assertEqual(type(fsetup.raw_storage),
+                         BlockStorageTypeFactory(self._type_name))
+        with open(fname, 'rb') as f:
+            flen = len(f.read())
+            self.assertEqual(
+                flen,
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name))
+            self.assertEqual(
+                flen >
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name,
+                    ignore_header=True),
+                True)
+        with EncryptedBlockStorage(fname,
+                                   key=fsetup.key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.header_data, bytes())
+            self.assertEqual(fsetup.header_data, bytes())
+            self.assertEqual(f.key, fsetup.key)
+            self.assertEqual(f.block_size, bsize)
+            self.assertEqual(fsetup.block_size, bsize)
+            self.assertEqual(f.block_count, bcount)
+            self.assertEqual(fsetup.block_count, bcount)
+            self.assertEqual(f.storage_name, fname)
+            self.assertEqual(fsetup.storage_name, fname)
+        # tamper with the plaintext index
+        with open(fname, 'r+b') as f:
+            f.seek(0)
+            f.write(struct.pack("!L",0))
+        with self.assertRaises(ValueError):
+            with EncryptedBlockStorage(fname,
+                                       key=fsetup.key,
+                                       storage_type=self._type_name) as f:
+                pass                                       # pragma: no cover
+        os.remove(fname)
+
+    def test_setup_withdata(self):
+        fname = ".".join(self.id().split(".")[1:])
+        fname += ".bin"
+        fname = os.path.join(thisdir, fname)
+        if os.path.exists(fname):
+            os.remove(fname)                           # pragma: no cover
+        bsize = 10
+        bcount = 11
+        header_data = bytes(bytearray([0,1,2]))
+        fsetup = EncryptedBlockStorage.setup(
+            fname,
+            block_size=bsize,
+            block_count=bcount,
+            key=self._test_key,
+            key_size=self._test_key_size,
+            aes_mode=self._aes_mode,
+            storage_type=self._type_name,
+            header_data=header_data)
+        fsetup.close()
+        self.assertEqual(type(fsetup.raw_storage),
+                         BlockStorageTypeFactory(self._type_name))
+        with open(fname, 'rb') as f:
+            flen = len(f.read())
+            self.assertEqual(
+                flen,
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name,
+                    header_data=header_data))
+            self.assertTrue(len(header_data) > 0)
+            self.assertEqual(
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name) <
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name,
+                    header_data=header_data),
+                True)
+            self.assertEqual(
+                flen >
+                EncryptedBlockStorage.compute_storage_size(
+                    bsize,
+                    bcount,
+                    aes_mode=self._aes_mode,
+                    storage_type=self._type_name,
+                    header_data=header_data,
+                    ignore_header=True),
+                True)
+        with EncryptedBlockStorage(fname,
+                                   key=fsetup.key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.header_data, header_data)
+            self.assertEqual(fsetup.header_data, header_data)
+            self.assertEqual(f.key, fsetup.key)
+            self.assertEqual(f.block_size, bsize)
+            self.assertEqual(fsetup.block_size, bsize)
+            self.assertEqual(f.block_count, bcount)
+            self.assertEqual(fsetup.block_count, bcount)
+            self.assertEqual(f.storage_name, fname)
+            self.assertEqual(fsetup.storage_name, fname)
+        os.remove(fname)
+
+    def test_init_noexists(self):
+        self.assertEqual(os.path.exists(self._dummy_name), False)
+        with self.assertRaises(IOError):
+            with EncryptedBlockStorage(
+                    self._dummy_name,
+                    key=self._key,
+                    storage_type=self._type_name) as f:
+                pass                                   # pragma: no cover
+
+    def test_init_exists(self):
+        self.assertEqual(os.path.exists(self._testfname), True)
+        with open(self._testfname, 'rb') as f:
+            databefore = f.read()
+        with self.assertRaises(ValueError):
+            with EncryptedBlockStorage(self._testfname,
+                                       storage_type=self._type_name) as f:
+                pass                                   # pragma: no cover
+        with self.assertRaises(ValueError):
+            with BlockStorageTypeFactory(self._type_name)(self._testfname) as fb:
+                with EncryptedBlockStorage(fb,
+                                           key=self._key,
+                                           storage_type=self._type_name) as f:
+                    pass                               # pragma: no cover
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.key, self._key)
+            self.assertEqual(f.block_size, self._block_size)
+            self.assertEqual(f.block_count, self._block_count)
+            self.assertEqual(f.storage_name, self._testfname)
+            self.assertEqual(f.header_data, bytes())
+        self.assertEqual(os.path.exists(self._testfname), True)
+        with open(self._testfname, 'rb') as f:
+            dataafter = f.read()
+        self.assertEqual(databefore, dataafter)
+
+    def test_read_block(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            for i, data in enumerate(self._blocks):
+                self.assertEqual(list(bytearray(f.read_block(i))),
+                                 list(self._blocks[i]))
+            for i, data in enumerate(self._blocks):
+                self.assertEqual(list(bytearray(f.read_block(i))),
+                                 list(self._blocks[i]))
+            for i, data in reversed(list(enumerate(self._blocks))):
+                self.assertEqual(list(bytearray(f.read_block(i))),
+                                 list(self._blocks[i]))
+            for i, data in reversed(list(enumerate(self._blocks))):
+                self.assertEqual(list(bytearray(f.read_block(i))),
+                                 list(self._blocks[i]))
+
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received,
+                             self._block_count*f._storage.block_size*4)
+
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            self.assertEqual(list(bytearray(f.read_block(0))),
+                             list(self._blocks[0]))
+            self.assertEqual(list(bytearray(f.read_block(self._block_count-1))),
+                             list(self._blocks[-1]))
+
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received,
+                             f._storage.block_size*2)
+
+    def test_write_block(self):
+        data = bytearray([self._block_count])*self._block_size
+        self.assertEqual(len(data) > 0, True)
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            for i in xrange(self._block_count):
+                self.assertNotEqual(list(bytearray(f.read_block(i))),
+                                    list(data))
+            for i in xrange(self._block_count):
+                f.write_block(i, bytes(data))
+            for i in xrange(self._block_count):
+                self.assertEqual(list(bytearray(f.read_block(i))),
+                                 list(data))
+            for i, block in enumerate(self._blocks):
+                f.write_block(i, bytes(block))
+
+            self.assertEqual(f.bytes_sent,
+                             self._block_count*f._storage.block_size*2)
+            self.assertEqual(f.bytes_received,
+                             self._block_count*f._storage.block_size*2)
+
+    def test_read_blocks(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            data = f.read_blocks(list(xrange(self._block_count)))
+            self.assertEqual(len(data), self._block_count)
+            for i, block in enumerate(data):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+            data = f.read_blocks([0])
+            self.assertEqual(len(data), 1)
+            self.assertEqual(list(bytearray(data[0])),
+                             list(self._blocks[0]))
+            self.assertEqual(len(self._blocks) > 1, True)
+            data = f.read_blocks(list(xrange(1, self._block_count)) + [0])
+            self.assertEqual(len(data), self._block_count)
+            for i, block in enumerate(data[:-1], 1):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+            self.assertEqual(list(bytearray(data[-1])),
+                             list(self._blocks[0]))
+
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received,
+                             (2*self._block_count+1)*f._storage.block_size)
+
+    def test_yield_blocks(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            data = list(f.yield_blocks(list(xrange(self._block_count))))
+            self.assertEqual(len(data), self._block_count)
+            for i, block in enumerate(data):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+            data = list(f.yield_blocks([0]))
+            self.assertEqual(len(data), 1)
+            self.assertEqual(list(bytearray(data[0])),
+                             list(self._blocks[0]))
+            self.assertEqual(len(self._blocks) > 1, True)
+            data = list(f.yield_blocks(list(xrange(1, self._block_count)) + [0]))
+            self.assertEqual(len(data), self._block_count)
+            for i, block in enumerate(data[:-1], 1):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+            self.assertEqual(list(bytearray(data[-1])),
+                             list(self._blocks[0]))
+
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received,
+                             (2*self._block_count+1)*f._storage.block_size)
+
+    def test_write_blocks(self):
+        data = [bytearray([self._block_count])*self._block_size
+                for i in xrange(self._block_count)]
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.bytes_sent, 0)
+            self.assertEqual(f.bytes_received, 0)
+
+            orig = f.read_blocks(list(xrange(self._block_count)))
+            self.assertEqual(len(orig), self._block_count)
+            for i, block in enumerate(orig):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+            f.write_blocks(list(xrange(self._block_count)),
+                           [bytes(b) for b in data])
+            new = f.read_blocks(list(xrange(self._block_count)))
+            self.assertEqual(len(new), self._block_count)
+            for i, block in enumerate(new):
+                self.assertEqual(list(bytearray(block)),
+                                 list(data[i]))
+            f.write_blocks(list(xrange(self._block_count)),
+                           [bytes(b) for b in self._blocks])
+            orig = f.read_blocks(list(xrange(self._block_count)))
+            self.assertEqual(len(orig), self._block_count)
+            for i, block in enumerate(orig):
+                self.assertEqual(list(bytearray(block)),
+                                 list(self._blocks[i]))
+
+            self.assertEqual(f.bytes_sent,
+                             self._block_count*f._storage.block_size*2)
+            self.assertEqual(f.bytes_received,
+                             self._block_count*f._storage.block_size*3)
+
+    def test_update_header_data(self):
+        fname = ".".join(self.id().split(".")[1:])
+        fname += ".bin"
+        fname = os.path.join(thisdir, fname)
+        if os.path.exists(fname):
+            os.remove(fname)                           # pragma: no cover
+        bsize = 10
+        bcount = 11
+        header_data = bytes(bytearray([0,1,2]))
+        fsetup = EncryptedBlockStorage.setup(
+            fname,
+            block_size=bsize,
+            block_count=bcount,
+            key=self._test_key,
+            key_size=self._test_key_size,
+            header_data=header_data)
+        fsetup.close()
+        new_header_data = bytes(bytearray([1,1,1]))
+        with EncryptedBlockStorage(fname,
+                                   key=fsetup.key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.header_data, header_data)
+            f.update_header_data(new_header_data)
+            self.assertEqual(f.header_data, new_header_data)
+        with EncryptedBlockStorage(fname,
+                                   key=fsetup.key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.header_data, new_header_data)
+        with self.assertRaises(ValueError):
+            with EncryptedBlockStorage(fname,
+                                       key=fsetup.key,
+                                       storage_type=self._type_name) as f:
+                f.update_header_data(bytes(bytearray([1,1])))
+        with self.assertRaises(ValueError):
+            with EncryptedBlockStorage(fname,
+                                       key=fsetup.key,
+                                       storage_type=self._type_name) as f:
+                f.update_header_data(bytes(bytearray([1,1,1,1])))
+        with EncryptedBlockStorage(fname,
+                                   key=fsetup.key,
+                                   storage_type=self._type_name) as f:
+            self.assertEqual(f.header_data, new_header_data)
+        os.remove(fname)
+
+    def test_locked_flag(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            with self.assertRaises(IOError):
+                with EncryptedBlockStorage(self._testfname,
+                                           key=self._key,
+                                           storage_type=self._type_name) as f1:
+                    pass                               # pragma: no cover
+            with self.assertRaises(IOError):
+                with EncryptedBlockStorage(self._testfname,
+                                           key=self._key,
+                                           storage_type=self._type_name) as f1:
+                    pass                               # pragma: no cover
+            with EncryptedBlockStorage(self._testfname,
+                                       key=self._key,
+                                       storage_type=self._type_name,
+                                       ignore_lock=True) as f1:
+                pass
+            with self.assertRaises(IOError):
+                with EncryptedBlockStorage(self._testfname,
+                                           key=self._key,
+                                           storage_type=self._type_name) as f1:
+                    pass                               # pragma: no cover
+            with EncryptedBlockStorage(self._testfname,
+                                       key=self._key,
+                                       storage_type=self._type_name,
+                                       ignore_lock=True) as f1:
+                pass
+            with EncryptedBlockStorage(self._testfname,
+                                       key=self._key,
+                                       storage_type=self._type_name,
+                                       ignore_lock=True) as f1:
+                pass
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as f:
+            pass
+
+    def test_read_block_cloned(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as forig:
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                for i, data in enumerate(self._blocks):
+                    self.assertEqual(list(bytearray(f.read_block(i))),
+                                     list(self._blocks[i]))
+                for i, data in enumerate(self._blocks):
+                    self.assertEqual(list(bytearray(f.read_block(i))),
+                                     list(self._blocks[i]))
+                for i, data in reversed(list(enumerate(self._blocks))):
+                    self.assertEqual(list(bytearray(f.read_block(i))),
+                                     list(self._blocks[i]))
+                for i, data in reversed(list(enumerate(self._blocks))):
+                    self.assertEqual(list(bytearray(f.read_block(i))),
+                                     list(self._blocks[i]))
+
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received,
+                                 self._block_count*f._storage.block_size*4)
+
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                self.assertEqual(list(bytearray(f.read_block(0))),
+                                 list(self._blocks[0]))
+                self.assertEqual(list(bytearray(f.read_block(self._block_count-1))),
+                                 list(self._blocks[-1]))
+
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received,
+                                 f._storage.block_size*2)
+
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+
+    def test_write_block_cloned(self):
+        data = bytearray([self._block_count])*self._block_size
+        self.assertEqual(len(data) > 0, True)
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as forig:
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                for i in xrange(self._block_count):
+                    self.assertNotEqual(list(bytearray(f.read_block(i))),
+                                        list(data))
+                for i in xrange(self._block_count):
+                    f.write_block(i, bytes(data))
+                for i in xrange(self._block_count):
+                    self.assertEqual(list(bytearray(f.read_block(i))),
+                                     list(data))
+                for i, block in enumerate(self._blocks):
+                    f.write_block(i, bytes(block))
+
+                self.assertEqual(f.bytes_sent,
+                                 self._block_count*f._storage.block_size*2)
+                self.assertEqual(f.bytes_received,
+                                 self._block_count*f._storage.block_size*2)
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+
+    def test_read_blocks_cloned(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as forig:
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                data = f.read_blocks(list(xrange(self._block_count)))
+                self.assertEqual(len(data), self._block_count)
+                for i, block in enumerate(data):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+                data = f.read_blocks([0])
+                self.assertEqual(len(data), 1)
+                self.assertEqual(list(bytearray(data[0])),
+                                 list(self._blocks[0]))
+                self.assertEqual(len(self._blocks) > 1, True)
+                data = f.read_blocks(list(xrange(1, self._block_count)) + [0])
+                self.assertEqual(len(data), self._block_count)
+                for i, block in enumerate(data[:-1], 1):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+                self.assertEqual(list(bytearray(data[-1])),
+                                 list(self._blocks[0]))
+
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received,
+                                 (2*self._block_count + 1)*f._storage.block_size)
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+
+    def test_yield_blocks_cloned(self):
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as forig:
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                data = list(f.yield_blocks(list(xrange(self._block_count))))
+                self.assertEqual(len(data), self._block_count)
+                for i, block in enumerate(data):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+                data = list(f.yield_blocks([0]))
+                self.assertEqual(len(data), 1)
+                self.assertEqual(list(bytearray(data[0])),
+                                 list(self._blocks[0]))
+                self.assertEqual(len(self._blocks) > 1, True)
+                data = list(f.yield_blocks(list(xrange(1, self._block_count)) + [0]))
+                self.assertEqual(len(data), self._block_count)
+                for i, block in enumerate(data[:-1], 1):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+                self.assertEqual(list(bytearray(data[-1])),
+                                 list(self._blocks[0]))
+
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received,
+                                 (2*self._block_count + 1)*f._storage.block_size)
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+
+    def test_write_blocks_cloned(self):
+        data = [bytearray([self._block_count])*self._block_size
+                for i in xrange(self._block_count)]
+        with EncryptedBlockStorage(self._testfname,
+                                   key=self._key,
+                                   storage_type=self._type_name) as forig:
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+            with forig.clone_device() as f:
+                self.assertEqual(forig.bytes_sent, 0)
+                self.assertEqual(forig.bytes_received, 0)
+                self.assertEqual(f.bytes_sent, 0)
+                self.assertEqual(f.bytes_received, 0)
+
+                orig = f.read_blocks(list(xrange(self._block_count)))
+                self.assertEqual(len(orig), self._block_count)
+                for i, block in enumerate(orig):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+                f.write_blocks(list(xrange(self._block_count)),
+                               [bytes(b) for b in data])
+                new = f.read_blocks(list(xrange(self._block_count)))
+                self.assertEqual(len(new), self._block_count)
+                for i, block in enumerate(new):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(data[i]))
+                f.write_blocks(list(xrange(self._block_count)),
+                               [bytes(b) for b in self._blocks])
+                orig = f.read_blocks(list(xrange(self._block_count)))
+                self.assertEqual(len(orig), self._block_count)
+                for i, block in enumerate(orig):
+                    self.assertEqual(list(bytearray(block)),
+                                     list(self._blocks[i]))
+
+                self.assertEqual(f.bytes_sent,
+                                 self._block_count*f._storage.block_size*2)
+                self.assertEqual(f.bytes_received,
+                                 self._block_count*f._storage.block_size*3)
+            self.assertEqual(forig.bytes_sent, 0)
+            self.assertEqual(forig.bytes_received, 0)
+
+class TestEncryptedBlockStorageFileCTRKey(_TestEncryptedBlockStorage,
+                                          unittest2.TestCase):
+    _type_name = 'file'
+    _aes_mode = 'ctr'
+    _test_key = AES.KeyGen(16)
+
+class TestEncryptedBlockStorageFileCTR32(_TestEncryptedBlockStorage,
+                                         unittest2.TestCase):
+    _type_name = 'file'
+    _aes_mode = 'ctr'
+    _test_key_size = 16
+
+class TestEncryptedBlockStorageFileGCMKey(_TestEncryptedBlockStorage,
+                                          unittest2.TestCase):
+    _type_name = 'file'
+    _aes_mode = 'gcm'
+    _test_key = AES.KeyGen(24)
+
+class TestEncryptedBlockStorageFileGCM32(_TestEncryptedBlockStorage,
+                                         unittest2.TestCase):
+    _type_name = 'file'
+    _aes_mode = 'gcm'
+    _test_key_size = 24
+
+class TestEncryptedBlockStorageMMapFileCTRKey(_TestEncryptedBlockStorage,
+                                              unittest2.TestCase):
+    _type_name = 'mmap'
+    _aes_mode = 'ctr'
+    _test_key = AES.KeyGen(32)
+
+class TestEncryptedBlockStorageMMapFileCTR32(_TestEncryptedBlockStorage,
+                                             unittest2.TestCase):
+    _type_name = 'mmap'
+    _aes_mode = 'ctr'
+    _test_key_size = 32
+
+class TestEncryptedBlockStorageMMapFileGCMKey(_TestEncryptedBlockStorage,
+                                              unittest2.TestCase):
+    _type_name = 'mmap'
+    _aes_mode = 'gcm'
+
+class TestEncryptedBlockStorageMMapFileGCM32(_TestEncryptedBlockStorage,
+                                             unittest2.TestCase):
+    _type_name = 'mmap'
+    _aes_mode = 'gcm'
+
+if __name__ == "__main__":
+    unittest2.main()                                    # pragma: no cover