PyORAm
[iotcloud.git] / PyORAM / src / pyoram / storage / block_storage_s3.py
1 __all__ = ('BlockStorageS3',)
2
3 import struct
4 import logging
5 from multiprocessing.pool import ThreadPool
6
7 import pyoram
8 from pyoram.storage.block_storage import \
9     (BlockStorageInterface,
10      BlockStorageTypeFactory)
11 from pyoram.storage.boto3_s3_wrapper import Boto3S3Wrapper
12
13 import tqdm
14 import six
15 from six.moves import xrange, map
16
17 log = logging.getLogger("pyoram")
18
19 class BlockStorageS3(BlockStorageInterface):
20     """
21     A block storage device for Amazon Simple
22     Storage Service (S3).
23     """
24
25     _index_name = "PyORAMBlockStorageS3_index.bin"
26     _index_struct_string = "!LLL?"
27     _index_offset = struct.calcsize(_index_struct_string)
28
29     def __init__(self,
30                  storage_name,
31                  bucket_name=None,
32                  aws_access_key_id=None,
33                  aws_secret_access_key=None,
34                  region_name=None,
35                  ignore_lock=False,
36                  threadpool_size=None,
37                  s3_wrapper=Boto3S3Wrapper):
38
39         self._bytes_sent = 0
40         self._bytes_received = 0
41         self._storage_name = storage_name
42         self._bucket_name = bucket_name
43         self._aws_access_key_id = aws_access_key_id
44         self._aws_secret_access_key = aws_secret_access_key
45         self._region_name = region_name
46         self._pool = None
47         self._close_pool = True
48         self._s3 = None
49         self._ignore_lock = ignore_lock
50         self._async_write = None
51         self._async_write_callback = None
52
53         if bucket_name is None:
54             raise ValueError("'bucket_name' keyword is required")
55
56         if threadpool_size != 0:
57             self._pool = ThreadPool(threadpool_size)
58
59         self._s3 = s3_wrapper(bucket_name,
60                               aws_access_key_id=aws_access_key_id,
61                               aws_secret_access_key=aws_secret_access_key,
62                               region_name=region_name)
63         self._basename = self.storage_name+"/b%d"
64
65         index_data = self._s3.download(
66             self._storage_name+"/"+BlockStorageS3._index_name)
67         self._block_size, self._block_count, user_header_size, locked = \
68             struct.unpack(
69                 BlockStorageS3._index_struct_string,
70                 index_data[:BlockStorageS3._index_offset])
71         if locked and (not self._ignore_lock):
72             raise IOError(
73                 "Can not open block storage device because it is "
74                 "locked by another process. To ignore this check, "
75                 "initialize this class with the keyword 'ignore_lock' "
76                 "set to True.")
77         self._user_header_data = bytes()
78         if user_header_size > 0:
79             self._user_header_data = \
80                 index_data[BlockStorageS3._index_offset:
81                            (BlockStorageS3._index_offset+user_header_size)]
82
83         if not self._ignore_lock:
84             # turn on the locked flag
85             self._s3.upload((self._storage_name+"/"+BlockStorageS3._index_name,
86                              struct.pack(BlockStorageS3._index_struct_string,
87                                          self.block_size,
88                                          self.block_count,
89                                          len(self.header_data),
90                                          True) + \
91                              self.header_data))
92
93     def _check_async(self):
94         if self._async_write is not None:
95             for i in self._async_write:
96                 if self._async_write_callback is not None:
97                     self._async_write_callback(i)
98             self._async_write = None
99             self._async_write_callback = None
100
101     def _schedule_async_write(self, arglist, callback=None):
102         assert self._async_write is None
103         if self._pool is not None:
104             self._async_write = \
105                 self._pool.imap_unordered(self._s3.upload, arglist)
106             self._async_write_callback = callback
107         else:
108             # Note: we are using six.map which always
109             #       behaves like imap
110             for i in map(self._s3.upload, arglist):
111                 if callback is not None:
112                     callback(i)
113
114     def _download(self, i):
115         return self._s3.download(self._basename % i)
116
117     #
118     # Define BlockStorageInterface Methods
119     #
120
121     def clone_device(self):
122         f =  BlockStorageS3(self.storage_name,
123                             bucket_name=self._bucket_name,
124                             aws_access_key_id=self._aws_access_key_id,
125                             aws_secret_access_key=self._aws_secret_access_key,
126                             region_name=self._region_name,
127                             threadpool_size=0,
128                             s3_wrapper=type(self._s3),
129                             ignore_lock=True)
130         f._pool = self._pool
131         f._close_pool = False
132         return f
133
134     @classmethod
135     def compute_storage_size(cls,
136                              block_size,
137                              block_count,
138                              header_data=None,
139                              ignore_header=False):
140         assert (block_size > 0) and (block_size == int(block_size))
141         assert (block_count > 0) and (block_count == int(block_count))
142         if header_data is None:
143             header_data = bytes()
144         if ignore_header:
145             return block_size * block_count
146         else:
147             return BlockStorageS3._index_offset + \
148                     len(header_data) + \
149                     block_size * block_count
150
151     @classmethod
152     def setup(cls,
153               storage_name,
154               block_size,
155               block_count,
156               bucket_name=None,
157               aws_access_key_id=None,
158               aws_secret_access_key=None,
159               region_name=None,
160               header_data=None,
161               initialize=None,
162               threadpool_size=None,
163               ignore_existing=False,
164               s3_wrapper=Boto3S3Wrapper):
165
166         if bucket_name is None:
167             raise ValueError("'bucket_name' is required")
168         if (block_size <= 0) or (block_size != int(block_size)):
169             raise ValueError(
170                 "Block size (bytes) must be a positive integer: %s"
171                 % (block_size))
172         if (block_count <= 0) or (block_count != int(block_count)):
173             raise ValueError(
174                 "Block count must be a positive integer: %s"
175                 % (block_count))
176         if (header_data is not None) and \
177            (type(header_data) is not bytes):
178             raise TypeError(
179                 "'header_data' must be of type bytes. "
180                 "Invalid type: %s" % (type(header_data)))
181
182         pool = None
183         if threadpool_size != 0:
184             pool = ThreadPool(threadpool_size)
185
186         s3 = s3_wrapper(bucket_name,
187                         aws_access_key_id=aws_access_key_id,
188                         aws_secret_access_key=aws_secret_access_key,
189                         region_name=region_name)
190         exists = s3.exists(storage_name)
191         if (not ignore_existing) and exists:
192             raise IOError(
193                 "Storage location already exists in bucket %s: %s"
194                 % (bucket_name, storage_name))
195         if exists:
196             log.info("Deleting objects in existing S3 entry: %s/%s"
197                      % (bucket_name, storage_name))
198             print("Clearing Existing S3 Objects With Prefix %s/%s/"
199                   % (bucket_name, storage_name))
200             s3.clear(storage_name, threadpool=pool)
201
202         if header_data is None:
203             s3.upload((storage_name+"/"+BlockStorageS3._index_name,
204                        struct.pack(BlockStorageS3._index_struct_string,
205                                   block_size,
206                                   block_count,
207                                   0,
208                                   False)))
209         else:
210             s3.upload((storage_name+"/"+BlockStorageS3._index_name,
211                        struct.pack(BlockStorageS3._index_struct_string,
212                                    block_size,
213                                    block_count,
214                                    len(header_data),
215                                    False) + \
216                        header_data))
217
218         if initialize is None:
219             zeros = bytes(bytearray(block_size))
220             initialize = lambda i: zeros
221         basename = storage_name+"/b%d"
222         # NOTE: We will not be informed when a thread
223         #       encounters an exception (e.g., when
224         #       calling initialize(i). We must ensure
225         #       that all iterations were processed
226         #       by counting the results.
227         def init_blocks():
228             for i in xrange(block_count):
229                 yield (basename % i, initialize(i))
230         def _do_upload(arg):
231             try:
232                 s3.upload(arg)
233             except Exception as e:                     # pragma: no cover
234                 log.error(                             # pragma: no cover
235                     "An exception occured during S3 "  # pragma: no cover
236                     "setup when calling the block "    # pragma: no cover
237                     "initialization function: %s"      # pragma: no cover
238                     % (str(e)))                        # pragma: no cover
239                 raise                                  # pragma: no cover
240         total = None
241         progress_bar = tqdm.tqdm(total=block_count*block_size,
242                                  desc="Initializing S3 Block Storage Space",
243                                  unit="B",
244                                  unit_scale=True,
245                                  disable=not pyoram.config.SHOW_PROGRESS_BAR)
246         if pool is not None:
247             try:
248                 for i,_ in enumerate(
249                         pool.imap_unordered(_do_upload, init_blocks())):
250                     total = i
251                     progress_bar.update(n=block_size)
252             except Exception as e:                     # pragma: no cover
253                 s3.clear(storage_name)                 # pragma: no cover
254                 raise                                  # pragma: no cover
255             finally:
256                 progress_bar.close()
257                 pool.close()
258                 pool.join()
259         else:
260             try:
261                 for i,_ in enumerate(
262                         map(s3.upload, init_blocks())):
263                     total = i
264                     progress_bar.update(n=block_size)
265             except Exception as e:                     # pragma: no cover
266                 s3.clear(storage_name)                 # pragma: no cover
267                 raise                                  # pragma: no cover
268             finally:
269                 progress_bar.close()
270
271         if total != block_count - 1:
272             s3.clear(storage_name)                     # pragma: no cover
273             if pool is not None:                       # pragma: no cover
274                 pool.close()                           # pragma: no cover
275                 pool.join()                            # pragma: no cover
276             raise ValueError(                          # pragma: no cover
277                 "Something went wrong during S3 block" # pragma: no cover
278                 " initialization. Check the logger "   # pragma: no cover
279                 "output for more information.")        # pragma: no cover
280
281         return BlockStorageS3(storage_name,
282                               bucket_name=bucket_name,
283                               aws_access_key_id=aws_access_key_id,
284                               aws_secret_access_key=aws_secret_access_key,
285                               region_name=region_name,
286                               threadpool_size=threadpool_size,
287                               s3_wrapper=s3_wrapper)
288
289     @property
290     def header_data(self):
291         return self._user_header_data
292
293     @property
294     def block_count(self):
295         return self._block_count
296
297     @property
298     def block_size(self):
299         return self._block_size
300
301     @property
302     def storage_name(self):
303         return self._storage_name
304
305     def update_header_data(self, new_header_data):
306         self._check_async()
307         if len(new_header_data) != len(self.header_data):
308             raise ValueError(
309                 "The size of header data can not change.\n"
310                 "Original bytes: %s\n"
311                 "New bytes: %s" % (len(self.header_data),
312                                    len(new_header_data)))
313         self._user_header_data = new_header_data
314
315         index_data = bytearray(self._s3.download(
316             self._storage_name+"/"+BlockStorageS3._index_name))
317         lenbefore = len(index_data)
318         index_data[BlockStorageS3._index_offset:] = new_header_data
319         assert lenbefore == len(index_data)
320         self._s3.upload((self._storage_name+"/"+BlockStorageS3._index_name,
321                          bytes(index_data)))
322
323     def close(self):
324         self._check_async()
325         if self._s3 is not None:
326             if not self._ignore_lock:
327                 # turn off the locked flag
328                 self._s3.upload(
329                     (self._storage_name+"/"+BlockStorageS3._index_name,
330                      struct.pack(BlockStorageS3._index_struct_string,
331                                  self.block_size,
332                                  self.block_count,
333                                  len(self.header_data),
334                                  False) + \
335                      self.header_data))
336         if self._close_pool and (self._pool is not None):
337             self._pool.close()
338             self._pool.join()
339             self._pool = None
340
341     def read_blocks(self, indices):
342         self._check_async()
343         # be sure not to exhaust this if it is an iterator
344         # or generator
345         indices = list(indices)
346         assert all(0 <= i <= self.block_count for i in indices)
347         self._bytes_received += self.block_size * len(indices)
348         if self._pool is not None:
349             return self._pool.map(self._download, indices)
350         else:
351             return list(map(self._download, indices))
352
353     def yield_blocks(self, indices):
354         self._check_async()
355         # be sure not to exhaust this if it is an iterator
356         # or generator
357         indices = list(indices)
358         assert all(0 <= i <= self.block_count for i in indices)
359         self._bytes_received += self.block_size * len(indices)
360         if self._pool is not None:
361             return self._pool.imap(self._download, indices)
362         else:
363             return map(self._download, indices)
364
365     def read_block(self, i):
366         self._check_async()
367         assert 0 <= i < self.block_count
368         self._bytes_received += self.block_size
369         return self._download(i)
370
371     def write_blocks(self, indices, blocks, callback=None):
372         self._check_async()
373         # be sure not to exhaust this if it is an iterator
374         # or generator
375         indices = list(indices)
376         assert all(0 <= i <= self.block_count for i in indices)
377         self._bytes_sent += self.block_size * len(indices)
378         indices = (self._basename % i for i in indices)
379         self._schedule_async_write(zip(indices, blocks),
380                                    callback=callback)
381
382     def write_block(self, i, block):
383         self._check_async()
384         assert 0 <= i < self.block_count
385         self._bytes_sent += self.block_size
386         self._schedule_async_write((((self._basename % i), block),))
387
388     @property
389     def bytes_sent(self):
390         return self._bytes_sent
391
392     @property
393     def bytes_received(self):
394         return self._bytes_received
395
396 BlockStorageTypeFactory.register_device("s3", BlockStorageS3)