int32_t getInt();
char get();
void get(Array<char> * array);
+ void position(int32_t newPosition);
Array<char> * array();
private:
};
ByteBuffer * ByteBuffer_wrap(Array<char> * array);
+ByteBuffer * ByteBuffer_allocate(uint size);
#endif
SecureRandom *random;
Array<char> *salt;
Table *table;
- int32_t listeningPort = -1;
- Thread *localServerThread = NULL;
- bool doEnd = false;
- TimingSingleton *timer = NULL;
+ int32_t listeningPort;
+ Thread *localServerThread;
+ bool doEnd;
+ TimingSingleton *timer;
/**
* Generates Key from password.
*/
Array<char> *sendLocalData(Array<char> *sendData, int64_t localSequenceNumber, IoTString *host, int port);
- public void close();
+ void close();
};
#endif
#include "Slot.h"
-
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber) {
- seqnum = _seqnum;
- machineid = _machineid;
- prevhmac = _prevhmac;
- hmac = _hmac;
- entries = new Vector<Entry *>();
- livecount = 1;
- seqnumlive = true;
- freespace = SLOT_SIZE - getBaseSize();
- table = _table;
- localSequenceNumber = _localSequenceNumber;
+#include "ByteBuffer.h"
+#include "Entry.h"
+#include "Error.h"
+#include "CloudComm.h"
+#include "Table.h"
+#include "LastMessage.h"
+
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber) :
+ seqnum(_seqnum),
+ prevhmac(_prevhmac),
+ hmac(_hmac),
+ machineid(_machineid),
+ entries(new Vector<Entry *>()),
+ livecount(1),
+ seqnumlive(true),
+ freespace(SLOT_SIZE - getBaseSize()),
+ table(_table),
+ localSequenceNumber(_localSequenceNumber) {
}
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber) {
- this(_table, _seqnum, _machineid, _prevhmac, NULL, _localSequenceNumber);
-}
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber) :
+ seqnum(_seqnum),
+ prevhmac(_prevhmac),
+ hmac(NULL),
+ machineid(_machineid),
+ entries(new Vector<Entry *>()),
+ livecount(1),
+ seqnumlive(true),
+ freespace(SLOT_SIZE - getBaseSize()),
+ table(_table),
+ localSequenceNumber(_localSequenceNumber) {
+ }
-Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) {
- this(_table, _seqnum, _machineid, new char[HMAC_SIZE], NULL, _localSequenceNumber);
-}
+Slot::Slot(Table *_table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber) :
+ seqnum(_seqnum),
+ prevhmac(new Array<char>(HMAC_SIZE)),
+ hmac(NULL),
+ machineid(_machineid),
+ entries(new Vector<Entry *>()),
+ livecount(1),
+ seqnumlive(true),
+ freespace(SLOT_SIZE - getBaseSize()),
+ table(_table),
+ localSequenceNumber(_localSequenceNumber) {
+ }
Entry *Slot::addEntry(Entry *e) {
e = e->getCopy(this);
return entries;
}
-Slot *Slotdecode(Table *table, char *array, Mac *mac) {
+Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac) {
mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
- char *realmac = mac->doFinal();
+ Array<char> *realmac = mac->doFinal();
ByteBuffer *bb = ByteBuffer_wrap(array);
- char *hmac = new char[HMAC_SIZE];
- char *prevhmac = new char[HMAC_SIZE];
+ Array<char> *hmac = new Array<char>(HMAC_SIZE);
+ Array<char> *prevhmac = new Array<char>(HMAC_SIZE);
bb->get(hmac);
bb->get(prevhmac);
- if (!Arrays.equals(realmac, hmac))
+ if (!realmac->equals(hmac))
throw new Error("Server Error: Invalid HMAC! Potential Attack!");
int64_t seqnum = bb->getLong();
int64_t machineid = bb->getLong();
int numentries = bb->getInt();
- Slot slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
+ Slot *slot = new Slot(table, seqnum, machineid, prevhmac, hmac, -1);
for (int i = 0; i < numentries; i++) {
- slot->addShallowEntry(Entry->decode(slot, bb));
+ slot->addShallowEntry(Entry_decode(slot, bb));
}
return slot;
}
-char *Slot::encode(Mac * mac) {
- char *array = new char[SLOT_SIZE];
+Array<char> *Slot::encode(Mac * mac) {
+ Array<char> *array = new Array<char>(SLOT_SIZE);
ByteBuffer *bb = ByteBuffer_wrap(array);
/* Leave space for the slot HMAC. */
bb->position(HMAC_SIZE);
bb->putLong(seqnum);
bb->putLong(machineid);
bb->putInt(entries->size());
- for (Entry entry : entries) {
+ for (Entry *entry : entries) {
entry->encode(bb);
}
/* Compute our HMAC */
mac->update(array, HMAC_SIZE, array.length - HMAC_SIZE);
- char *realmac = mac->doFinal();
+ Array<char> *realmac = mac->doFinal();
hmac = realmac;
bb->position(0);
bb->put(realmac);
Vector<Entry *> *liveEntries = new Vector<Entry *>();
for (Entry *entry : entries) {
if (entry->isLive()) {
- if (!resize || entry->getType() != Entry->TypeTableStatus)
+ if (!resize || entry->getType() != TypeTableStatus)
liveEntries->add(entry);
}
}
}
}
-char *Slot::getSlotCryptIV() {
- ByteBuffer *buffer = ByteBuffer_allocate(CloudComm.IV_SIZE);
+Array<char> *Slot::getSlotCryptIV() {
+ ByteBuffer *buffer = ByteBuffer_allocate(CloudComm_IV_SIZE);
buffer->putLong(machineid);
int64_t localSequenceNumberShift = localSequenceNumber << 16;
buffer->putLong(localSequenceNumberShift);
#define HMAC_SIZE 32
class Slot : public Liveness {
-private:
+ private:
/** Sequence number of the slot. */
int64_t seqnum;
/** HMAC of previous slot. */
- char *prevhmac;
+ Array<char> *prevhmac;
/** HMAC of this slot. */
- char *hmac;
+ Array<char> *hmac;
/** Machine that sent this slot. */
int64_t machineid;
/** Vector of entries in this slot. */
void addShallowEntry(Entry *e);
public:
- Slot(Table *_table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, char *_hmac, int64_t _localSequenceNumber);
- Slot(Table _table, int64_t _seqnum, int64_t _machineid, char *_prevhmac, int64_t _localSequenceNumber);
- Slot(Table _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
+ Slot(Table *_table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, Array<char> *_hmac, int64_t _localSequenceNumber);
+ Slot(Table * _table, int64_t _seqnum, int64_t _machineid, Array<char> *_prevhmac, int64_t _localSequenceNumber);
+ Slot(Table * _table, int64_t _seqnum, int64_t _machineid, int64_t _localSequenceNumber);
- char *getHMAC() { return hmac; }
- char *getPrevHMAC() { return prevhmac; }
+ Array<char> *getHMAC() { return hmac; }
+ Array<char> *getPrevHMAC() { return prevhmac; }
Entry *addEntry(Entry *e);
void removeEntry(Entry *e);
bool hasSpace(Entry *e);
Vector<Entry *> *getEntries();
- char *encode(Mac *mac);
+ Array<char> *encode(Mac *mac);
int getBaseSize() { return 2 * HMAC_SIZE + 2 * sizeof(int64_t) + sizeof(int); }
Vector<Entry *> *getLiveEntries(bool resize);
int64_t getSequenceNumber() { return seqnum; }
void setDead();
void decrementLiveCount();
bool isLive() { return livecount > 0; }
- char *getSlotCryptIV();
+ Array<char> *getSlotCryptIV();
+ friend Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac);
};
-Slot *Slotdecode(Table *table, char *array, Mac *mac);
+Slot *Slotdecode(Table *table, Array<char> *array, Mac *mac);
#endif
#include "Table.h"
-Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) {
- localMachineId = _localMachineId;
- cloud = new CloudComm(this, baseurl, password, listeningPort);
-
+Table::Table(String baseurl, String password, int64_t _localMachineId, int listeningPort) :
+ buffer(NULL),
+ cloud(new CloudComm(this, baseurl, password, listeningPort)),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumber(0),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedSize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastPendingSendArbitrationEntriesToDelete(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
init();
}
-Table::Table(CloudComm _cloud, int64_t _localMachineId) {
- localMachineId = _localMachineId;
- cloud = _cloud;
-
+Table::Table(CloudComm _cloud, int64_t _localMachineId) :
+ buffer(NULL),
+ cloud(_cloud),
+ random(NULL),
+ liveTableStatus(NULL),
+ pendingTransactionBuilder(NULL),
+ lastPendingTransactionSpeculatedOn(NULL),
+ firstPendingTransaction(NULL),
+ numberOfSlots(0),
+ bufferResizeThreshold(0),
+ liveSlotCount(0),
+ oldestLiveSlotSequenceNumber(0),
+ localMachineId(_localMachineId),
+ sequenceNumber(0),
+ localTransactionSequenceNumber(0),
+ lastTransactionSequenceNumberSpeculatedOn(0),
+ oldestTransactionSequenceNumberSpeculatedOn(0),
+ localArbitrationSequenceNumber(0),
+ hadPartialSendToServer(false),
+ attemptedToSendToServer(false),
+ expectedSize(0),
+ didFindTableStatus(false),
+ currMaxSize(0),
+ lastSlotAttemptedToSend(NULL),
+ lastIsNewKey(false),
+ lastNewSize(0),
+ lastTransactionPartsSent(NULL),
+ lastPendingSendArbitrationEntriesToDelete(NULL),
+ lastNewKey(NULL),
+ committedKeyValueTable(NULL),
+ speculatedKeyValueTable(NULL),
+ pendingTransactionSpeculatedKeyValueTable(NULL),
+ liveNewKeyTable(NULL),
+ lastMessageTable(NULL),
+ rejectedMessageWatchVectorTable(NULL),
+ arbitratorTable(NULL),
+ liveAbortTable(NULL),
+ newTransactionParts(NULL),
+ newCommitParts(NULL),
+ lastArbitratedTransactionNumberByArbitratorTable(NULL),
+ liveTransactionBySequenceNumberTable(NULL),
+ liveTransactionByTransactionIdTable(NULL),
+ liveCommitsTable(NULL),
+ liveCommitsByKeyTable(NULL),
+ lastCommitSeenSequenceNumberByArbitratorTable(NULL),
+ rejectedSlotVector(NULL),
+ pendingTransactionQueue(NULL),
+ pendingSendArbitrationRounds(NULL),
+ pendingSendArbitrationEntriesToDelete(NULL),
+ transactionPartsSent(NULL),
+ outstandingTransactionStatus(NULL),
+ liveAbortsGeneratedByLocal(NULL),
+ offlineTransactionsCommittedAndAtServer(NULL),
+ localCommunicationTable(NULL),
+ lastTransactionSeenFromMachineFromServer(NULL),
+ lastArbitrationDataLocalSequenceNumberSeenFromArbitrator(NULL),
+ lastInsertedNewKey(false),
+ lastSeqNumArbOn(0)
+{
init();
}
pendingTransactionSpeculatedKeyValueTable = new Hashtable<IoTString, KeyValue>();
liveNewKeyTable = new Hashtable<IoTString, NewKey>();
lastMessageTable = new Hashtable<int64_t Pair<int64_t Liveness> >();
- rejectedMessageWatchVectorTable = new Hashtable<int64_t HashSet<RejectedMessage> >();
+ rejectedMessageWatchVectorTable = new Hashtable<int64_t Hashset<RejectedMessage> >();
arbitratorTable = new Hashtable<IoTString, Long>();
liveAbortTable = new Hashtable<Pair<int64_t, int64_t>, Abort>();
newTransactionParts = new Hashtable<int64_t Hashtable<Pair<int64_t int32_t>, TransactionPart> >();
transactionPartsSent = new Hashtable<Transaction, Vector<int32_t> >();
outstandingTransactionStatus = new Hashtable<int64_t TransactionStatus>();
liveAbortsGeneratedByLocal = new Hashtable<int64_t Abort>();
- offlineTransactionsCommittedAndAtServer = new HashSet<Pair<int64_t, int64_t> >();
+ offlineTransactionsCommittedAndAtServer = new Hashset<Pair<int64_t, int64_t> >();
localCommunicationTable = new Hashtable<int64_t Pair<String, int32_t> >();
lastTransactionSeenFromMachineFromServer = new Hashtable<int64_t, int64_t>();
pendingSendArbitrationRounds = new Vector<ArbitrationRound>();
int64_t o = buffer.getOldestSeqNum();
int64_t n = buffer.getNewestSeqNum();
- int[] types = new int[10];
+ Array<int> * types = new Array<int>(10);
int num = 0;
localSequenceNumber++;
TableStatus status = new TableStatus(s, numberOfSlots);
s.addEntry(status);
- Slot[] array = cloud.putSlot(s, numberOfSlots);
+ Array<Slot> * array = cloud.putSlot(s, numberOfSlots);
if (array == NULL) {
- array = new Slot[] {s};
+ array = new Array<Slot>(1);
+ array->set(0, s);
// update local block chain
validateAndUpdate(array, true);
} else if (array.length == 1) {
*/
synchronized void Table::rebuild() {
// Just pull the latest slots from the server
- Slot[] newslots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newslots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newslots, true);
sendToServer(NULL);
updateLiveTransactionsAndStatus();
synchronized bool Table::update() {
try {
- Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
validateAndUpdate(newSlots, false);
sendToServer(NULL);
sendToServer(NULL);
} catch (ServerException e) {
- Set<Long> arbitratorTriedAndFailed = new HashSet<Long>();
+ Set<Long> arbitratorTriedAndFailed = new Hashset<Long>();
for (Iterator<Transaction> iter = pendingTransactionQueue.iterator(); iter.hasNext(); ) {
Transaction transaction = iter.next();
try {
if (hadPartialSendToServer) {
- Slot[] newSlots = cloud.getSlots(sequenceNumber + 1);
+ Array<Slot> * newSlots = cloud.getSlots(sequenceNumber + 1);
if (newSlots.length == 0) {
fromRetry = true;
- ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
+ ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(lastSlotAttemptedToSend, lastNewSize, lastIsNewKey);
if (sendSlotsReturn.getFirst()) {
if (newKey != NULL) {
lastPendingSendArbitrationEntriesToDelete = new Vector<Entry>(pendingSendArbitrationEntriesToDelete);
- ThreeTuple<bool, bool, Slot[]> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
+ ThreeTuple<bool, bool, Array<Slot>*> sendSlotsReturn = sendSlotsToServer(slot, newSize, newKey != NULL);
if (sendSlotsReturn.getFirst()) {
return returnData;
}
-ThreeTuple<bool, bool, Slot[]> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
+ThreeTuple<bool, bool, Array<Slot>*> Table::sendSlotsToServer(Slot slot, int newSize, bool isNewKey) {
bool attemptedToSendToServerTmp = attemptedToSendToServer;
attemptedToSendToServer = true;
bool inserted = false;
bool lastTryInserted = false;
- Slot[] array = cloud.putSlot(slot, newSize);
+ Array<Slot>* array = cloud.putSlot(slot, newSize);
if (array == NULL) {
- array = new Slot[] {slot};
+ array = new Array<Slot>();
+ array->set(0, slot);
rejectedSlotVector.clear();
inserted = true;
} else {
}
}
- return new ThreeTuple<bool, bool, Slot[]>(inserted, lastTryInserted, array);
+ return new ThreeTuple<bool, bool, Array<Slot>*>(inserted, lastTryInserted, array);
}
/**
/**
* Checks for malicious activity and updates the local copy of the block chain.
*/
-void Table::validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal) {
+void Table::validateAndUpdate(Array<Slot>* newSlots, bool acceptUpdatesToLocal) {
// The cloud communication layer has checked slot HMACs already before decoding
if (newSlots.length == 0) {
checkHMACChain(indexer, newSlots);
// Set to keep track of messages from clients
- HashSet<Long> machineSet = new HashSet<Long>(lastMessageTable.keySet());
+ Hashset<Long> machineSet = new Hashset<int64_t>(lastMessageTable.keySet());
// Process each slots data
for (Slot slot : newSlots) {
// The last transaction arbitrated on
int64_t lastTransactionCommitted = -1;
- Set<Abort> generatedAborts = new HashSet<Abort>();
+ Set<Abort> generatedAborts = new Hashset<Abort>();
for (Long transactionSequenceNumber : transactionSequenceNumbers) {
Transaction transaction = liveTransactionBySequenceNumberTable.get(transactionSequenceNumber);
newCommit.createCommitParts();
// Append all the commit parts to the end of the pending queue waiting for sending to the server
- ArbitrationRound arbitrationRound = new ArbitrationRound(newCommit, new HashSet<Abort>());
+ ArbitrationRound * arbitrationRound = new ArbitrationRound(newCommit, new Hashset<Abort *>());
pendingSendArbitrationRounds.add(arbitrationRound);
if (compactArbitrationData()) {
status.setStatus(TransactionStatus.StatusAborted);
}
} else {
- Set addAbortSet = new HashSet<Abort>();
+ Hashset<Abort*> addAbortSet = new Hashset<Abort* >();
// Create the abort
// If we got here then this is a brand new commit and needs full processing
// Get what commits should be edited, these are the commits that have live values for their keys
- Set<Commit> commitsToEdit = new HashSet<Commit>();
+ Hashset<Commit*> * commitsToEdit = new Hashset<Commit*>();
for (KeyValue kv : commit.getKeyValueUpdateSet()) {
commitsToEdit.add(liveCommitsByKeyTable.get(kv.getKey()));
}
return false; // did not speculate
}
- Set<Long> incompleteTransactionArbitrator = new HashSet<Long>();
+ Hashset<int64_t> * incompleteTransactionArbitrator = new Hashset<int64_t>();
bool didSkip = true;
for (int i = startIndex; i < transactionSequenceNumbersSorted.size(); i++) {
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
-void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> * machineSet) {
// Update the last message seen
updateLastMessage(slot.getMachineID(), slot.getSequenceNumber(), slot, acceptUpdatesToLocal, machineSet);
/**
* Update the last message that was sent for a machine Id
*/
-void Table::processEntry(LastMessage entry, HashSet<Long> machineSet) {
+void Table::processEntry(LastMessage entry, Hashset<int64_t>* machineSet) {
// Update what the last message received by a machine was
updateLastMessage(entry.getMachineID(), entry.getSequenceNumber(), entry, false, machineSet);
}
// Create a list of clients to watch until they see this rejected message entry.
- HashSet<Long> deviceWatchSet = new HashSet<Long>();
+ Hashset<int64_t>* deviceWatchSet = new Hashset<int64_t>();
for (Map.Entry<int64_t Pair<int64_t Liveness> > lastMessageEntry : lastMessageTable.entrySet()) {
// Machine ID for the last message entry
* Check that the last message seen is correct and that there is no mismatch of our own last message or that
* other clients have not had a rollback on the last message.
*/
-void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<Long> machineSet) {
+void Table::updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<intr64_t> * machineSet) {
// We have seen this machine ID
machineSet.remove(machineId);
// Get the set of rejected messages that this machine Id is has not seen yet
- HashSet<RejectedMessage> watchset = rejectedMessageWatchVectorTable.get(machineId);
+ Hashset<RejectedMessage*>* watchset = rejectedMessageWatchVectorTable.get(machineId);
// If there is a rejected message that this machine Id has not seen yet
if (watchset != NULL) {
* rejected message entry and which have not.
*/
void Table::addWatchVector(int64_t machineId, RejectedMessage entry) {
- HashSet<RejectedMessage> entries = rejectedMessageWatchVectorTable.get(machineId);
+ Hashset<RejectedMessage*>* entries = rejectedMessageWatchVectorTable.get(machineId);
if (entries == NULL) {
// There is no set for this machine ID yet so create one
- entries = new HashSet<RejectedMessage>();
+ entries = new Hashset<RejectedMessage*>();
rejectedMessageWatchVectorTable.put(machineId, entries);
}
entries.add(entry);
/**
* Check if the HMAC chain is not violated
*/
-void Table::checkHMACChain(SlotIndexer indexer, Slot[] newSlots) {
+void Table::checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots) {
for (int i = 0; i < newSlots.length; i++) {
Slot currSlot = newSlots[i];
Slot prevSlot = indexer.getSlot(currSlot.getSequenceNumber() - 1);
#ifndef Table_H
#define Table_H
-
+#include "common.h"
+#include "Pair.h"
+#include "ThreeTuple.h"
/**
* IoTTable data structure. Provides client interface.
* @author Brian Demsky
private:
/* Helper Objects */
SlotBuffer *buffer;
- CloudComm *cloud = NULL;
- Random *random = NULL;
- TableStatus *liveTableStatus = NULL;
- PendingTransaction *pendingTransactionBuilder = NULL; // Pending Transaction used in building a Pending Transaction
- Transaction *lastPendingTransactionSpeculatedOn = NULL; // Last transaction that was speculated on from the pending transaction
- Transaction *firstPendingTransaction = NULL; // first transaction in the pending transaction list
+ CloudComm *cloud;
+ Random *random;
+ TableStatus *liveTableStatus;
+ PendingTransaction *pendingTransactionBuilder; // Pending Transaction used in building a Pending Transaction
+ Transaction *lastPendingTransactionSpeculatedOn; // Last transaction that was speculated on from the pending transaction
+ Transaction *firstPendingTransaction; // first transaction in the pending transaction list
/* Variables */
- int numberOfSlots = 0; // Number of slots stored in buffer
- int bufferResizeThreshold = 0;// Threshold on the number of live slots before a resize is needed
- int64_t liveSlotCount = 0;// Number of currently live slots
- int64_t oldestLiveSlotSequenceNumver = 0; // Smallest sequence number of the slot with a live entry
- int64_t localMachineId = 0; // Machine ID of this client device
- int64_t sequenceNumber = 0; // Largest sequence number a client has received
- int64_t localSequenceNumber = 0;
+ int numberOfSlots; // Number of slots stored in buffer
+ int bufferResizeThreshold;// Threshold on the number of live slots before a resize is needed
+ int64_t liveSlotCount;// Number of currently live slots
+ int64_t oldestLiveSlotSequenceNumver; // Smallest sequence number of the slot with a live entry
+ int64_t localMachineId; // Machine ID of this client device
+ int64_t sequenceNumber; // Largest sequence number a client has received
+ int64_t localSequenceNumber;
// int smallestTableStatusSeen = -1; // Smallest Table Status that was seen in the latest slots sent from the server
// int largestTableStatusSeen = -1; // Largest Table Status that was seen in the latest slots sent from the server
- int64_t localTransactionSequenceNumber = 0; // Local sequence number counter for transactions
- int64_t lastTransactionSequenceNumberSpeculatedOn = -1; // the last transaction that was speculated on
- int64_t oldestTransactionSequenceNumberSpeculatedOn = -1; // the oldest transaction that was speculated on
- int64_t localArbitrationSequenceNumber = 0;
- bool hadPartialSendToServer = false;
- bool attemptedToSendToServer = false;
+ int64_t localTransactionSequenceNumber; // Local sequence number counter for transactions
+ int64_t lastTransactionSequenceNumberSpeculatedOn; // the last transaction that was speculated on
+ int64_t oldestTransactionSequenceNumberSpeculatedOn; // the oldest transaction that was speculated on
+ int64_t localArbitrationSequenceNumber;
+ bool hadPartialSendToServer;
+ bool attemptedToSendToServer;
int64_t expectedsize;
- bool didFindTableStatus = false;
- int64_t currMaxSize = 0;
+ bool didFindTableStatus;
+ int64_t currMaxSize;
- Slot *lastSlotAttemptedToSend = NULL;
- bool lastIsNewKey = false;
- int lastNewSize = 0;
- Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent = NULL;
- Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete = NULL;
- NewKey *lastNewKey = NULL;
+ Slot *lastSlotAttemptedToSend;
+ bool lastIsNewKey;
+ int lastNewSize;
+ Hashtable<Transaction *, Vector<int32_t> *> lastTransactionPartsSent;
+ Vector<Entry *> *lastPendingSendArbitrationEntriesToDelete;
+ NewKey *lastNewKey;
/* Data Structures */
- Hashtable<IoTString *, KeyValue *> *committedKeyValueTable = NULL;// Table of committed key value pairs
- Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value
- Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable = NULL; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
- Hashtable<IoTString *, NewKey *> *liveNewKeyTable = NULL; // Table of live new keys
- Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable = NULL; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
- Hashtable<int64_t, HashSet<RejectedMessage *> *> *rejectedMessageWatchVectorTable = NULL; // Table of machine Ids and the set of rejected messages they have not seen yet
- Hashtable<IoTString *, int64_t> *arbitratorTable = NULL;// Table of keys and their arbitrators
- Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable = NULL;// Table live abort messages
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts = NULL; // transaction parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts = NULL; // commit parts that are seen in this latest round of slots from the server
- Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable = NULL; // Last transaction sequence number that an arbitrator arbitrated on
- Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable = NULL; // live transaction grouped by the sequence number
- Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable = NULL; // live transaction grouped by the transaction ID
- Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable = NULL;
- Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable = NULL;
- Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable = NULL;
- Vector<int64_t> *rejectedSlotVector = NULL; // Vector of rejected slots that have yet to be sent to the server
- Vector<Transaction *> *pendingTransactionQueue = NULL;
- Vector<ArbitrationRound *> *pendingSendArbitrationRounds = NULL;
- Vector<Entry *> *pendingSendArbitrationEntriesToDelete = NULL;
- Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent = NULL;
- Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus = NULL;
- Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal = NULL;
- Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer = NULL;
- Hashtable<int64_t, Pair<String *, int32_t> > *localCommunicationTable = NULL;
- Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer = NULL;
- Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator = NULL;
+ Hashtable<IoTString *, KeyValue *> *committedKeyValueTable;// Table of committed key value pairs
+ Hashtable<IoTString *, KeyValue *> *speculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value
+ Hashtable<IoTString *, KeyValue *> *pendingTransactionSpeculatedKeyValueTable; // Table of speculated key value pairs, if there is a speculative value from the pending transactions
+ Hashtable<IoTString *, NewKey *> *liveNewKeyTable; // Table of live new keys
+ Hashtable<int64_t, Pair<int64_t, Liveness *> *> *lastMessageTable; // Last message sent by a client machine id -> (Seq Num, Slot or LastMessage);
+ Hashtable<int64_t, Hashset<RejectedMessage *> *> *rejectedMessageWatchVectorTable; // Table of machine Ids and the set of rejected messages they have not seen yet
+ Hashtable<IoTString *, int64_t> *arbitratorTable;// Table of keys and their arbitrators
+ Hashtable<Pair<int64_t, int64_t> *, Abort *> *liveAbortTable;// Table live abort messages
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, TransactionPart *> *> *newTransactionParts; // transaction parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, Hashtable<Pair<int64_t, int32_t> *, CommitPart *> *> *newCommitParts; // commit parts that are seen in this latest round of slots from the server
+ Hashtable<int64_t, int64_t> *lastArbitratedTransactionNumberByArbitratorTable; // Last transaction sequence number that an arbitrator arbitrated on
+ Hashtable<int64_t, Transaction *> *liveTransactionBySequenceNumberTable; // live transaction grouped by the sequence number
+ Hashtable<Pair<int64_t, int64_t> *, Transaction *> *liveTransactionByTransactionIdTable; // live transaction grouped by the transaction ID
+ Hashtable<int64_t, Hashtable<int64_t, Commit *> > *liveCommitsTable;
+ Hashtable<IoTString *, Commit *> *liveCommitsByKeyTable;
+ Hashtable<int64_t, int64_t> *lastCommitSeenSequenceNumberByArbitratorTable;
+ Vector<int64_t> *rejectedSlotVector; // Vector of rejected slots that have yet to be sent to the server
+ Vector<Transaction *> *pendingTransactionQueue;
+ Vector<ArbitrationRound *> *pendingSendArbitrationRounds;
+ Vector<Entry *> *pendingSendArbitrationEntriesToDelete;
+ Hashtable<Transaction *, Vector<int32_t *> *> *transactionPartsSent;
+ Hashtable<int64_t, TransactionStatus *> *outstandingTransactionStatus;
+ Hashtable<int64_t, Abort *> *liveAbortsGeneratedByLocal;
+ Hashset<Pair<int64_t, int64_t> *> *offlineTransactionsCommittedAndAtServer;
+ Hashtable<int64_t, Pair<IoTString *, int32_t> > *localCommunicationTable;
+ Hashtable<int64_t, int64_t> *lastTransactionSeenFromMachineFromServer;
+ Hashtable<int64_t, int64_t> *lastArbitrationDataLocalSequenceNumberSeenFromArbitrator;
+ bool lastInsertedNewKey;
+ int64_t lastSeqNumArbOn;
+
+
void init();
/**
* Recalculate the new resize threshold
*/
void setResizeThreshold();
bool sendToServer(NewKey *newKey);
- synchronized bool updateFromLocal(int64_t machineId);
+ bool updateFromLocal(int64_t machineId);
Pair<bool, bool> sendTransactionToLocal(Transaction *transaction);
ThreeTuple<bool, bool, Array<Slot *> *> *sendSlotsToServer(Slot *slot, int newSize, bool isNewKey);
/**
/**
* Checks for malicious activity and updates the local copy of the block chain.
*/
- void validateAndUpdate(Slot[] newSlots, bool acceptUpdatesToLocal);
+ void validateAndUpdate(Array<Slot> * newSlots, bool acceptUpdatesToLocal);
void updateLiveStateFromServer();
*/
void processNewTransactionParts();
- int64_t lastSeqNumArbOn = 0;
+
void arbitrateFromServer();
/**
* Process this slot, entry by entry. Also update the latest message sent by slot
*/
- void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+ void processSlot(SlotIndexer indexer, Slot slot, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
/**
* Update the last message that was sent for a machine Id
*/
- void processEntry(LastMessage entry, HashSet<int64_t> machineSet);
+ void processEntry(LastMessage entry, Hashset<int64_t> machineSet);
/**
* Add the new key to the arbitrators table and update the set of live new keys (in case of a rescued new key message)
* Check that the last message seen is correct and that there is no mismatch of our own last message or that
* other clients have not had a rollback on the last message.
*/
- void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, HashSet<int64_t> machineSet);
+ void updateLastMessage(int64_t machineId, int64_t seqNum, Liveness liveness, bool acceptUpdatesToLocal, Hashset<int64_t> machineSet);
/**
* Add a rejected message entry to the watch set to keep track of which clients have seen that
/**
* Check if the HMAC chain is not violated
*/
- void checkHMACChain(SlotIndexer indexer, Slot[] newSlots);
- bool lastInsertedNewKey = false;
+ void checkHMACChain(SlotIndexer indexer, Array<Slot> * newSlots);
+
public:
- Table(String baseurl, String password, int64_t _localMachineId, int listeningPort);
+ Table(IoTString * baseurl, IoTString * password, int64_t _localMachineId, int listeningPort);
Table(CloudComm _cloud, int64_t _localMachineId);
/**
* Rebuild the table from scratch by pulling the latest block chain from the server.
*/
void rebuild();
- void addLocalCommunication(int64_t arbitrator, String hostName, int portNumber);
+ void addLocalCommunication(int64_t arbitrator, IoTString* hostName, int portNumber);
uint64_t getArbitrator(IoTString *key);
void close();
IoTString *getCommitted(IoTString *key);
ourfree(array);
}
+ bool equals(Array<type> * _array) {
+ if (_array->size != size)
+ return false;
+ int cmp=memcmp(array, _array->array, size * sizeof(type));
+ return cmp == 0;
+ }
+
type get(uint index) const {
return array[index];
}
class SlotIndexer;
class Table;
class TableStatus;
-class ThreeTuple;
class TimingSingleton;
class Transaction;
class TransactionPart;
class TransactionStatus;
-class Mac;
class Error;
-
-
+//Code to write
+class SecretKeySpec;
+class Mac;
+class SecureRandom;
+class Thread;
+class DataInputStream;
+class URL;
+class Random;
#endif
class Vector {
public:
Vector(uint _capacity = VECTOR_DEFCAP) :
- size(0),
+ fldsize(0),
capacity(_capacity),
array((type *) ourmalloc(sizeof(type) * _capacity)) {
}
Vector(uint _capacity, type *_array) :
- size(_capacity),
+ fldsize(_capacity),
capacity(_capacity),
array((type *) ourmalloc(sizeof(type) * _capacity)) {
memcpy(array, _array, capacity * sizeof(type));
}
Vector(Vector<type> *v) :
- size(v->size),
+ fldsize(v->fldsize),
capacity(v->capacity),
array((type *) ourmalloc(sizeof(type) * v->capacity)) {
memcpy(array, v->array, capacity * sizeof(type));
}
void pop() {
- size--;
+ fldsize--;
}
type last() const {
- return array[size - 1];
+ return array[fldsize - 1];
}
void setSize(uint _size) {
- if (_size <= size) {
- size = _size;
+ if (_size <= fldsize) {
+ fldsize = _size;
return;
} else if (_size > capacity) {
array = (type *)ourrealloc(array, _size * sizeof(type));
capacity = _size;
}
- bzero(&array[size], (_size - size) * sizeof(type));
- size = _size;
+ bzero(&array[fldsize], (_size - fldsize) * sizeof(type));
+ fldsize = _size;
}
void addAll(Vector<type> *v) {
- int oldsize = size;
- setSize(size + v->size);
- memcpy(&array[size], v->array, v->size * sizeof(type));
+ int oldsize = fldsize;
+ setSize(fldsize + v->fldsize);
+ memcpy(&array[fldsize], v->array, v->fldsize * sizeof(type));
}
void add(type item) {
- if (size >= capacity) {
+ if (fldsize >= capacity) {
uint newcap = capacity << 1;
array = (type *)ourrealloc(array, newcap * sizeof(type));
capacity = newcap;
}
- array[size++] = item;
+ array[fldsize++] = item;
}
type get(uint index) const {
}
void setExpand(uint index, type item) {
- if (index >= size)
+ if (index >= fldsize)
setSize(index + 1);
set(index, item);
}
array[index] = item;
}
- uint getSize() const {
- return size;
+ uint size() const {
+ return fldsize;
}
bool isEmpty() const {
- return size == 0;
+ return fldsize == 0;
}
~Vector() {
}
void clear() {
- size = 0;
+ fldsize = 0;
}
type *expose() {
}
CMEMALLOC;
private:
- uint size;
+ uint fldsize;
uint capacity;
type *array;
};