-
+#include"SlotBuffer.h"
/**
* Circular buffer that holds the live set of slots.
* @author Brian Demsky
* @version 1.0
*/
-class SlotBuffer {
- static final int DEFAULT_SIZE = 16;
+SlotBuffer::SlotBuffer() :
+ array(new Array<Slot *>(SlotBuffer_DEFAULT_SIZE + 1)),
+ head(0),
+ tail(0),
+ oldestseqn(0) {
+}
- Slot[] array;
- int head;
- int tail;
- int64_t oldestseqn;
+int SlotBuffer::size() {
+ if (head >= tail)
+ return head - tail;
+ return (array->length() + head) - tail;
+}
- SlotBuffer() {
- array = new Slot[DEFAULT_SIZE + 1];
- head = tail = 0;
- oldestseqn = 0;
- }
+int SlotBuffer::capacity() {
+ return array->length() - 1;
+}
- int size() {
- if (head >= tail)
- return head - tail;
- return (array.length + head) - tail;
+void SlotBuffer::resize(int newsize) {
+ if (newsize == (array->length() - 1))
+ return;
+
+ Array<Slot *> * newarray = new Array<Slot *>(newsize + 1);
+ int currsize = size();
+ int index = tail;
+ for (int i = 0; i < currsize; i++) {
+ newarray->set(i, array->get(index));
+ if ((++index) == array->length())
+ index = 0;
}
+ array = newarray;
+ tail = 0;
+ head = currsize;
+}
- int capacity() {
- return array.length - 1;
- }
+void SlotBuffer::incrementHead() {
+ head++;
+ if (head >= array->length())
+ head = 0;
+}
- void resize(int newsize) {
- if (newsize == (array.length - 1))
- return;
+void SlotBuffer::incrementTail() {
+ tail++;
+ if (tail >= array->length())
+ tail = 0;
+}
- Slot[] newarray = new Slot[newsize + 1];
- int currsize = size();
- int index = tail;
- for (int i = 0; i < currsize; i++) {
- newarray[i] = array[index];
- if ((++index) == array.length)
- index = 0;
- }
- array = newarray;
+void SlotBuffer::putSlot(Slot * s) {
+ int64_t checkNum = (getNewestSeqNum() + 1);
+
+ if (checkNum != s->getSequenceNumber()) {
+ // We have a gap so expunge all our slots
+ oldestseqn = s->getSequenceNumber();
tail = 0;
- head = currsize;
+ head = 1;
+ array->set(0, s);
+ return;
}
-
- void incrementHead() {
- head++;
- if (head >= array.length)
- head = 0;
+
+ array->set(head, s);
+ incrementHead();
+
+ if (oldestseqn == 0) {
+ oldestseqn = s->getSequenceNumber();
}
- void incrementTail() {
- tail++;
- if (tail >= array.length)
- tail = 0;
+ if (head == tail) {
+ incrementTail();
+ oldestseqn++;
}
+}
- void putSlot(Slot s) {
-
- int64_t checkNum = (getNewestSeqNum() + 1);
-
- if (checkNum != s.getSequenceNumber()) {
- // We have a gap so expunge all our slots
- oldestseqn = s.getSequenceNumber();
- tail = 0;
- head = 1;
- array[0] = s;
- return;
- }
-
- array[head] = s;
- incrementHead();
-
- if (oldestseqn == 0) {
- oldestseqn = s.getSequenceNumber();
- }
+Slot SlotBuffer::getSlot(int64_t seqnum) {
+ int32_t diff = (int32_t) (seqnum - oldestseqn);
+ int32_t index = diff + tail;
- if (head == tail) {
- incrementTail();
- oldestseqn++;
- }
+ if (index < 0) {
+ // Really old message so we dont have it anymore
+ return NULL;
}
- Slot getSlot(int64_t seqnum) {
- int diff = (int) (seqnum - oldestseqn);
- int index = diff + tail;
-
- if (index < 0) {
- // Really old message so we dont have it anymore
+ if (index >= array->length()) {
+ if (head >= tail) {
return NULL;
}
+ index -= array->length();
+ }
- if (index >= array.length) {
- if (head >= tail) {
- return NULL;
- }
- index -= array.length;
- }
-
- if (index >= array.length) {
-
- return NULL;
- }
- if (head >= tail && index >= head) {
- return NULL;
- }
+ if (index >= array->length) {
- return array[index];
+ return NULL;
}
-
- int64_t getOldestSeqNum() {
- return oldestseqn;
+ if (head >= tail && index >= head) {
+ return NULL;
}
- int64_t getNewestSeqNum() {
- return oldestseqn + size() - 1;
- }
+ return array->get(index);
}
+#ifndef SLOTBUFFER_H
+#define SLOTBUFFER_H
+
+#include"common.h"
/**
* Circular buffer that holds the live set of slots.
* @version 1.0
*/
-class SlotBuffer {
- static final int DEFAULT_SIZE = 16;
-
- private Slot[] array;
- private int head;
- private int tail;
- public int64_t oldestseqn;
-
- SlotBuffer() {
- array = new Slot[DEFAULT_SIZE + 1];
- head = tail = 0;
- oldestseqn = 0;
- }
-
- int size() {
- if (head >= tail)
- return head - tail;
- return (array.length + head) - tail;
- }
-
- int capacity() {
- return array.length - 1;
- }
-
- void resize(int newsize) {
- if (newsize == (array.length - 1))
- return;
-
- Slot[] newarray = new Slot[newsize + 1];
- int currsize = size();
- int index = tail;
- for (int i = 0; i < currsize; i++) {
- newarray[i] = array[index];
- if ((++index) == array.length)
- index = 0;
- }
- array = newarray;
- tail = 0;
- head = currsize;
- }
-
- private void incrementHead() {
- head++;
- if (head >= array.length)
- head = 0;
- }
-
- private void incrementTail() {
- tail++;
- if (tail >= array.length)
- tail = 0;
- }
-
- void putSlot(Slot s) {
-
- int64_t checkNum = (getNewestSeqNum() + 1);
+#define SlotBuffer_DEFAULT_SIZE 16
- if (checkNum != s.getSequenceNumber()) {
- // We have a gap so expunge all our slots
- oldestseqn = s.getSequenceNumber();
- tail = 0;
- head = 1;
- array[0] = s;
- return;
- }
-
- array[head] = s;
- incrementHead();
-
- if (oldestseqn == 0) {
- oldestseqn = s.getSequenceNumber();
- }
-
- if (head == tail) {
- incrementTail();
- oldestseqn++;
- }
- }
-
- Slot getSlot(int64_t seqnum) {
- int diff = (int) (seqnum - oldestseqn);
- int index = diff + tail;
-
- if (index < 0) {
- // Really old message so we dont have it anymore
- return NULL;
- }
-
- if (index >= array.length) {
- if (head >= tail) {
- return NULL;
- }
- index -= array.length;
- }
-
- if (index >= array.length) {
-
- return NULL;
- }
- if (head >= tail && index >= head) {
- return NULL;
- }
-
- return array[index];
- }
-
- int64_t getOldestSeqNum() {
- return oldestseqn;
- }
-
- int64_t getNewestSeqNum() {
- return oldestseqn + size() - 1;
- }
-}
+class SlotBuffer {
+ private:
+ Array<Slot *> * array;
+ int32_t head;
+ int32_t tail;
+ void incrementHead();
+ void incrementTail();
+
+ public:
+ int64_t oldestseqn;
+
+ SlotBuffer();
+ int32_t size();
+ int32_t capacity();
+ void resize(int newsize);
+ void putSlot(Slot *s);
+ Slot * getSlot(int64_t seqnum);
+ int64_t getOldestSeqNum() { return oldestseqn; }
+ int64_t getNewestSeqNum() { return oldestseqn + size() - 1;}
+};
+#endif
-
+#ifndef TRANSACTION_H
+#define TRANSACTION_H
+#include "common.h"
class Transaction {
-
- private Map<Integer, TransactionPart> parts = NULL;
- private Set<Integer> missingParts = NULL;
- private List<Integer> partsPendingSend = NULL;
- private bool isComplete = false;
- private bool hasLastPart = false;
- private Set<KeyValue> keyValueGuardSet = NULL;
- private Set<KeyValue> keyValueUpdateSet = NULL;
- private bool isDead = false;
- private int64_t sequenceNumber = -1;
- private int64_t clientLocalSequenceNumber = -1;
- private int64_t arbitratorId = -1;
- private int64_t machineId = -1;
- private Pair<Long, Long> transactionId = NULL;
-
- private int nextPartToSend = 0;
- private bool didSendAPartToServer = false;
-
- private TransactionStatus transactionStatus = NULL;
-
- private bool hadServerFailure = false;
+ private:
+ Hashtable<int32_t, TransactionPart *> parts = NULL;
+ Set<int32_t> missingParts = NULL;
+ List<int32_t> partsPendingSend = NULL;
+ bool isComplete = false;
+ bool hasLastPart = false;
+ Hashset<KeyValue *> * keyValueGuardSet = NULL;
+ Hashset<KeyValue *> * keyValueUpdateSet = NULL;
+ bool isDead = false;
+ int64_t sequenceNumber = -1;
+ int64_t clientLocalSequenceNumber = -1;
+ int64_t arbitratorId = -1;
+ int64_t machineId = -1;
+ Pair<uint64_t, uint64_t> transactionId = NULL;
+
+ int nextPartToSend = 0;
+ bool didSendAPartToServer = false;
+
+ TransactionStatus transactionStatus = NULL;
+
+ bool hadServerFailure = false;
public Transaction() {
parts = new HashMap<Integer, TransactionPart>();
}
return true;
}
-}
+};
+#endif