2 import java.util.HashMap;
3 import java.util.Arrays;
4 import javax.crypto.spec.*;
9 HashMap<IoTString, KeyValue> table=new HashMap<IoTString, KeyValue>();
10 HashMap<Long, Pair<Long, Liveness>> lastmessagetable=new HashMap<Long, Pair<Long, Liveness>>();
17 public Table(String baseurl, String password, long _localmachineid) {
18 localmachineid=_localmachineid;
19 buffer = new SlotBuffer();
21 initCloud(baseurl, password);
24 private void initCloud(String baseurl, String password) {
26 SecretKeySpec secret=getKey(password);
27 Cipher encryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
28 encryptCipher.init(Cipher.ENCRYPT_MODE, secret);
29 Cipher decryptCipher = Cipher.getInstance("AES/CBC/PKCS5Padding");
30 decryptCipher.init(Cipher.DECRYPT_MODE, secret);
31 hmac = Mac.getInstance("HmacSHA256");
33 cloud=new CloudComm(baseurl, encryptCipher, decryptCipher, hmac);
34 } catch (Exception e) {
35 throw new Error("Failed To Initialize Ciphers");
39 private SecretKeySpec getKey(String password) {
41 PBEKeySpec keyspec = new PBEKeySpec(password.toCharArray());
42 SecretKey key = SecretKeyFactory.getInstance("PBKDF2WithHmacSHA256").generateSecret(keyspec);
43 SecretKeySpec secret = new SecretKeySpec(key.getEncoded(), "AES");
45 } catch (Exception e) {
46 throw new Error("Failed generating key.");
50 public void update() {
51 Slot[] newslots=cloud.getSlots(sequencenumber);
52 validateandupdate(newslots);
55 public IoTString get(IoTString key) {
56 KeyValue kv=table.get(key);
63 public IoTString put(IoTString key, IoTString value) {
67 void validateandupdate(Slot[] newslots) {
68 //The cloud communication layer has checked slot HMACs already
70 if (newslots.length==0)
73 long firstseqnum=newslots[0].getSequenceNumber();
74 if (firstseqnum < sequencenumber)
75 throw new Error("Server Error: Sent older slots!");
77 SlotIndexer indexer = new SlotIndexer(newslots, buffer);
78 checkHMACChain(indexer, newslots);
79 for(Slot slot: newslots) {
80 processSlot(indexer, slot);
85 void processEntry(KeyValue entry, SlotIndexer indexer) {
86 IoTString key=entry.getKey();
87 KeyValue oldvalue=table.get(key);
88 if (oldvalue != null) {
89 oldvalue.decrementLiveCount();
91 table.put(key, entry);
94 void processEntry(LastMessage entry, SlotIndexer indexer) {
95 updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry);
98 void processEntry(RejectedMessage entry, SlotIndexer indexer) {
99 long oldseqnum=entry.getOldSeqNum();
100 long newseqnum=entry.getNewSeqNum();
101 boolean isequal=entry.getEqual();
102 long machineid=entry.getMachineID();
103 for(long seqnum=oldseqnum;seqnum<=newseqnum;seqnum++) {
104 Slot slot=indexer.getSlot(seqnum);
106 long slotmachineid=slot.getMachineID();
107 if (isequal!=(slotmachineid==machineid)) {
108 throw new Error("Server Error: Trying to insert rejected message for slot "+seqnum);
114 void processEntry(TableStatus entry, SlotIndexer indexer, Slot slot) {
118 void updateLastMessage(long machineid, long seqnum, Liveness liveness) {
119 Pair<Long, Liveness> lastmsgentry = lastmessagetable.put(machineid, new Pair<Long, Liveness>(seqnum, liveness));
120 if (lastmsgentry == null)
123 long lastmsgseqnum = lastmsgentry.getFirst();
124 Liveness lastentry = lastmsgentry.getSecond();
125 if (lastentry instanceof LastMessage) {
126 ((LastMessage)lastentry).decrementLiveCount();
127 } else if (lastentry instanceof Slot) {
128 ((Slot)lastentry).decrementLiveCount();
130 throw new Error("Unrecognized type");
133 //Check that nothing funny happened
134 if (machineid == localmachineid) {
135 if (lastmsgseqnum != seqnum)
136 throw new Error("Server Error: Mismatch on local machine sequence number");
138 if (lastmsgseqnum > seqnum)
139 throw new Error("Server Error: Rolback on remote machine sequence number");
143 void processSlot(SlotIndexer indexer, Slot slot) {
144 updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot);
146 for(Entry entry : slot.getEntries()) {
147 switch(entry.getType()) {
148 case Entry.TypeKeyValue:
149 processEntry((KeyValue)entry, indexer);
151 case Entry.TypeLastMessage:
152 processEntry((LastMessage)entry, indexer);
154 case Entry.TypeRejectedMessage:
155 processEntry((RejectedMessage)entry, indexer);
157 case Entry.TypeTableStatus:
158 processEntry((TableStatus)entry, indexer, slot);
161 throw new Error("Unrecognized type: "+entry.getType());
166 void checkHMACChain(SlotIndexer indexer, Slot[] newslots) {
167 for(int i=0; i < newslots.length; i++) {
168 Slot currslot=newslots[i];
169 Slot prevslot=indexer.getSlot(currslot.getSequenceNumber()-1);
170 if (prevslot != null &&
171 !Arrays.equals(prevslot.getHMAC(), currslot.getPrevHMAC()))
172 throw new Error("Server Error: Invalid HMAC Chain");