2 import java.util.HashMap;
3 import java.util.Arrays;
4 import java.util.Vector;
5 import javax.crypto.spec.*;
8 final public class Table {
10 private HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
11 private HashMap<Long, Pair<Long, Liveness> > lastmessagetable=new HashMap<Long, Pair<Long, Liveness> >();
12 private SlotBuffer buffer;
13 private CloudComm cloud;
15 private long sequencenumber;
16 private long localmachineid;
17 private TableStatus lastTableStatus;
18 static final int FREE_SLOTS = 10;
19 static final int FORCED_RESIZE_INCREMENT = 20;
21 public Table(String baseurl, String password, long _localmachineid) {
22 localmachineid=_localmachineid;
23 buffer = new SlotBuffer();
24 numslots = buffer.capacity();
26 initCloud(baseurl, password);
29 public Table(CloudComm _cloud, long _localmachineid) {
30 localmachineid=_localmachineid;
31 buffer = new SlotBuffer();
32 numslots = buffer.capacity();
37 private void initCloud(String baseurl, String password) {
39 SecretKeySpec secret=getKey(password);
40 Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
41 encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
42 Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
43 decryptCipher.init(Cipher.DECRYPT_MODE, secret);
44 hmac = Mac.getInstance("HmacSHA256");
46 cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
47 } catch (Exception e) {
48 throw new Error("Failed To Initialize Ciphers");
52 private SecretKeySpec getKey(String password) {
54 PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
55 SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
56 SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
58 } catch (Exception e) {
59 throw new Error("Failed generating key.");
63 public void update() {
64 Slot[] newslots=cloud.getSlots(sequencenumber+1);
65 validateandupdate(newslots, false);
68 public IoTString get(IoTString key) {
69 KeyValue kv=table.get(key);
76 public void initTable() {
77 Slot s=new Slot(1, localmachineid);
78 TableStatus status=new TableStatus(s, numslots);
80 Slot[] array=cloud.putSlot(s, numslots);
82 array = new Slot[] {s};
83 validateandupdate(array, true); // update data structure
85 throw new Error("Error on initialization");
89 public IoTString put(IoTString key, IoTString value) {
91 KeyValue oldvalue=table.get(key);
92 if (tryput(key, value, false)) {
96 return oldvalue.getValue();
101 private boolean tryput(IoTString key, IoTString value, boolean forcedresize) {
102 Slot s=new Slot(sequencenumber+1, localmachineid, buffer.getSlot(sequencenumber).getHMAC());
103 long seqn = buffer.getOldestSeqNum();
106 TableStatus status=new TableStatus(s, FORCED_RESIZE_INCREMENT + numslots);
110 if ((numslots - buffer.size()) < FREE_SLOTS) {
111 //have to check whether we have enough free slots
112 long fullfirstseqn = buffer.getNewestSeqNum() + 1 - numslots;
113 seqn = fullfirstseqn < 1 ? 1: fullfirstseqn;
114 for(int i=0; i < FREE_SLOTS; i++, seqn++) {
115 Slot prevslot=buffer.getSlot(seqn);
116 if (!prevslot.isLive())
118 Vector<Entry> liveentries = prevslot.getLiveEntries();
119 for(Entry liveentry:liveentries) {
120 if (redundant(liveentry))
122 if (s.hasSpace(liveentry))
123 s.addEntry(liveentry);
125 if (s.canFit(liveentry))
126 s.addEntry(liveentry);
127 else if (!forcedresize) {
128 return tryput(key, value, true);
134 KeyValue kv=new KeyValue(s, key, value);
135 boolean insertedkv=false;
136 if (s.hasSpace(kv)) {
141 long newestseqnum=buffer.getNewestSeqNum();
143 for(;seqn<=newestseqnum;seqn++) {
144 Slot prevslot=buffer.getSlot(seqn);
145 if (!prevslot.isLive())
147 Vector<Entry> liveentries = prevslot.getLiveEntries();
148 for(Entry liveentry:liveentries) {
149 if (redundant(liveentry))
151 if (s.hasSpace(liveentry))
152 s.addEntry(liveentry);
160 max = numslots + FORCED_RESIZE_INCREMENT;
161 Slot[] array=cloud.putSlot(s, max);
163 array = new Slot[] {s};
167 validateandupdate(array, true); // update data structure
172 boolean redundant(Entry liveentry) {
173 if (liveentry.getType()==Entry.TypeLastMessage) {
174 LastMessage lastmsg=(LastMessage) liveentry;
175 return lastmsg.getMachineID() == localmachineid;
181 private void validateandupdate(Slot[] newslots, boolean isput) {
182 //The cloud communication layer has checked slot HMACs already
184 if (newslots.length==0)
187 long firstseqnum=newslots[0].getSequenceNumber();
188 if (firstseqnum <= sequencenumber)
189 throw new Error("Server Error: Sent older slots!");
191 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
192 checkHMACChain(indexer, newslots);
195 for(Slot slot: newslots) {
196 updateExpectedSize();
197 processSlot(indexer, slot, isput);
200 //If there is a gap, check to see if the server sent us everything
201 if (firstseqnum != (sequencenumber+1))
202 checkNumSlots(newslots.length);
206 //commit new to slots
207 for(Slot slot:newslots) {
208 buffer.putSlot(slot);
210 sequencenumber = newslots[newslots.length - 1].getSequenceNumber();
213 private int expectedsize, currmaxsize;
215 private void checkNumSlots(int numslots) {
216 if (numslots != expectedsize)
217 throw new Error("Server Error: Server did not send all slots. Expected: "+expectedsize+" Received:"+numslots);
220 private void initExpectedSize() {
221 long prevslots = sequencenumber;
222 expectedsize = (prevslots < ((long) numslots)) ? (int) prevslots : numslots;
223 currmaxsize = numslots;
226 private void updateExpectedSize() {
228 if (expectedsize > currmaxsize)
229 expectedsize = currmaxsize;
232 private void updateCurrMaxSize(int newmaxsize) {
233 currmaxsize=newmaxsize;
236 private void commitNewMaxSize() {
237 if (numslots != currmaxsize)
238 buffer.resize(currmaxsize);
240 numslots=currmaxsize;
243 private void processEntry(KeyValue entry, SlotIndexer indexer) {
244 IoTString key=entry.getKey();
245 KeyValue oldvalue=table.get(key);
246 if (oldvalue != null) {
249 table.put(key, entry);
252 private void processEntry(LastMessage entry, SlotIndexer indexer) {
253 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false);
256 private void processEntry(RejectedMessage entry, SlotIndexer indexer) {
257 long oldseqnum=entry.getOldSeqNum();
258 long newseqnum=entry.getNewSeqNum();
259 boolean isequal=entry.getEqual();
260 long machineid=entry.getMachineID();
261 for(long seqnum=oldseqnum; seqnum<=newseqnum; seqnum++) {
262 Slot slot=indexer.getSlot(seqnum);
264 long slotmachineid=slot.getMachineID();
265 if (isequal!=(slotmachineid==machineid)) {
266 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
272 private void processEntry(TableStatus entry, SlotIndexer indexer) {
273 int newnumslots=entry.getMaxSlots();
274 updateCurrMaxSize(newnumslots);
275 if (lastTableStatus != null)
276 lastTableStatus.setDead();
277 lastTableStatus = entry;
280 private void updateLastMessage(long machineid, long seqnum, Liveness liveness, boolean isput) {
281 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
282 if (lastmsgentry == null)
285 long lastmsgseqnum = lastmsgentry.getFirst();
286 Liveness lastentry = lastmsgentry.getSecond();
287 if (lastentry instanceof LastMessage) {
288 ((LastMessage)lastentry).setDead();
289 } else if (lastentry instanceof Slot) {
290 ((Slot)lastentry).setDead();
292 throw new Error("Unrecognized type");
295 if (machineid == localmachineid) {
296 if (lastmsgseqnum != seqnum && !isput)
297 throw new Error("Server Error: Mismatch on local machine sequence number");
299 if (lastmsgseqnum > seqnum)
300 throw new Error("Server Error: Rollback on remote machine sequence number");
304 private void processSlot(SlotIndexer indexer, Slot slot, boolean isput) {
305 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, isput);
307 for(Entry entry : slot.getEntries()) {
308 switch(entry.getType()) {
309 case Entry.TypeKeyValue:
310 processEntry((KeyValue)entry, indexer);
313 case Entry.TypeLastMessage:
314 processEntry((LastMessage)entry, indexer);
317 case Entry.TypeRejectedMessage:
318 processEntry((RejectedMessage)entry, indexer);
321 case Entry.TypeTableStatus:
322 processEntry((TableStatus)entry, indexer);
326 throw new Error("Unrecognized type: "+entry.getType());
331 private void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
332 for(int i=0; i < newslots.length; i++) {
333 Slot currslot=newslots[i];
334 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
335 if (prevslot != null &&
336 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
337 throw new Error("Server Error: Invalid HMAC Chain");