}
delete trit;
} else {
- bool isInserted = false;
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
- isInserted = true;
- break;
- }
- }
-
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- Vector<Entry *> *ventries = s->getEntries();
- uint vesize = ventries->size();
- for (uint vei = 0; vei < vesize; vei++) {
- Entry *entry = ventries->get(vei);
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
- }
-
- if (isInserted) {
+ if (checkSend(newSlots, lastSlotAttemptedToSend)) {
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
newKey = NULL;
}
// continue;
} else {
- bool isInserted = false;
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if ((s->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
- isInserted = true;
- break;
- }
- }
-
- for (uint si = 0; si < newSlots->length(); si++) {
- Slot *s = newSlots->get(si);
- if (isInserted) {
- break;
- }
-
- // Process each entry in the slot
- Vector<Entry *> *entries = s->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
-
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == lastSlotAttemptedToSend->getSequenceNumber())) {
- isInserted = true;
- break;
- }
- }
- }
- }
-
- if (isInserted) {
+ if (checkSend(newSlots, lastSlotAttemptedToSend)) {
if (newKey != NULL) {
if (lastInsertedNewKey && (lastNewKey->getKey() == newKey->getKey()) && (lastNewKey->getMachineID() == newKey->getMachineID())) {
newKey = NULL;
localSequenceNumber++;
// Try to fill the slot with data
- ThreeTuple<bool, int32_t, bool> fillSlotsReturn = fillSlot(slot, false, newKey);
- bool needsResize = fillSlotsReturn.getFirst();
- int newSize = fillSlotsReturn.getSecond();
- bool insertedNewKey = fillSlotsReturn.getThird();
+ int newSize = 0;
+ bool insertedNewKey = false;
+ bool needsResize = fillSlot(slot, false, newKey, newSize, insertedNewKey);
if (needsResize) {
// Reset which transaction to send
transactionPartsSent->clear();
// We needed a resize so try again
- fillSlot(slot, true, newKey);
+ fillSlot(slot, true, newKey, newSize, insertedNewKey);
}
lastSlotAttemptedToSend = slot;
if (sendSlotsReturn) {
// Did insert into the block chain
-
if (insertedNewKey) {
// This slot was what was inserted not a previous slot
-
// New Key was successfully inserted into the block chain so dont want to insert it again
newKey = NULL;
}
round->removeParts(pendingSendArbitrationEntriesToDelete);
if (!round->isDoneSending()) {
- // Sent all the parts
+ //Add part back in
pendingSendArbitrationRounds->set(oldcount++,
pendingSendArbitrationRounds->get(i));
}
validateAndUpdate(newSlots, true);
}
}
-
} catch (ServerException *e) {
if (e->getType() != ServerException_TypeInputTimeout) {
// Nothing was able to be sent to the server so just clear these data structures
return returnData;
}
+/** Checks whether a given slot was sent using new slots in
+ array. Returns true if sent and false otherwise. */
+
+bool Table::checkSend(Array<Slot *> * array, Slot *checkSlot) {
+ uint size = array->length();
+ for (uint i = 0; i < size; i++) {
+ Slot *s = array->get(i);
+ if ((s->getSequenceNumber() == checkSlot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
+ return true;
+ }
+ }
+
+ //Also need to see if other machines acknowledged our message
+ for (uint i = 0; i < size; i++) {
+ Slot *s = array->get(i);
+
+ // Process each entry in the slot
+ Vector<Entry *> *entries = s->getEntries();
+ uint eSize = entries->size();
+ for (uint ei = 0; ei < eSize; ei++) {
+ Entry *entry = entries->get(ei);
+
+ if (entry->getType() == TypeLastMessage) {
+ LastMessage *lastMessage = (LastMessage *)entry;
+
+ if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == checkSlot->getSequenceNumber())) {
+ return true;
+ }
+ }
+ }
+ }
+ //Not found
+ return false;
+}
/** Method tries to send slot to server. Returns status in tuple.
isInserted returns whether last un-acked send (if any) was
}
if (hadPartialSendToServer) {
- *isInserted = false;
- uint size = (*array)->length();
- for (uint i = 0; i < size; i++) {
- Slot *s = (*array)->get(i);
- if ((s->getSequenceNumber() == slot->getSequenceNumber()) && (s->getMachineID() == localMachineId)) {
- *isInserted = true;
- break;
- }
- }
+ *isInserted = checkSend(*array, slot);
- //Also need to see if other machines acknowledged our message
- if (!(*isInserted)) {
- for (uint i = 0; i < size; i++) {
- Slot *s = (*array)->get(i);
-
- // Process each entry in the slot
- Vector<Entry *> *entries = s->getEntries();
- uint eSize = entries->size();
- for (uint ei = 0; ei < eSize; ei++) {
- Entry *entry = entries->get(ei);
-
- if (entry->getType() == TypeLastMessage) {
- LastMessage *lastMessage = (LastMessage *)entry;
-
- if ((lastMessage->getMachineID() == localMachineId) && (lastMessage->getSequenceNumber() == slot->getSequenceNumber())) {
- *isInserted = true;
- goto done;
- }
- }
- }
- }
- }
- done:
if (!(*isInserted)) {
rejectedSlotVector->add(slot->getSequenceNumber());
}
}
/**
- * Returns false if a resize was needed
+ * Returns true if a resize was needed but not done.
*/
-ThreeTuple<bool, int32_t, bool> Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry) {
- int newSize = 0;
+bool Table::fillSlot(Slot *slot, bool resize, NewKey *newKeyEntry, int & newSize, bool & insertedKey) {
+ newSize = 0;//special value to indicate no resize
if (liveSlotCount > bufferResizeThreshold) {
resize = true;//Resize is forced
}
int64_t currentRescueSequenceNumber = mandatoryRescueReturn.getThird();
if (needsResize && !resize) {
- // We need to resize but we are not resizing so return false
- return ThreeTuple<bool, int32_t, bool>(true, NULL, NULL);
+ // We need to resize but we are not resizing so return true to force on retry
+ return true;
}
- bool inserted = false;
+ insertedKey = false;
if (newKeyEntry != NULL) {
newKeyEntry->setSlot(slot);
if (slot->hasSpace(newKeyEntry)) {
slot->addEntry(newKeyEntry);
- inserted = true;
+ insertedKey = true;
}
}
// Fill the remainder of the slot with rescue data
doOptionalRescue(slot, seenLiveSlot, currentRescueSequenceNumber, resize);
- return ThreeTuple<bool, int32_t, bool>(false, newSize, inserted);
+ return false;
}
void Table::doRejectedMessages(Slot *s) {
}
void Table::arbitrateFromServer() {
-
if (liveTransactionBySequenceNumberTable->size() == 0) {
// Nothing to arbitrate on so move on
return;
* transactions
*/
bool Table::updateCommittedTable() {
-
if (newCommitParts->size() == 0) {
// Nothing new to process
return false;
Transaction *transaction = liveTransactionBySequenceNumberTable->get(key);
// Check if the transaction is dead
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator()) && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator() >= transaction->getSequenceNumber())) {
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(transaction->getArbitrator())
+ && lastArbitratedTransactionNumberByArbitratorTable->get(transaction->getArbitrator()) >= transaction->getSequenceNumber()) {
// Set dead the transaction
transaction->setDead();
TransactionStatus *status = outstandingTransactionStatus->get(key);
// Check if the transaction is dead
- if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator()) && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
-
+ if (lastArbitratedTransactionNumberByArbitratorTable->contains(status->getTransactionArbitrator())
+ && (lastArbitratedTransactionNumberByArbitratorTable->get(status->getTransactionArbitrator()) >= status->getTransactionSequenceNumber())) {
// Set committed
status->setStatus(TransactionStatus_StatusCommitted);