}
public void run() {
- int localresults[][];
+ double localresults[][];
atomic {
//compute the results
- localresults=new int[1+x1-x0][1+y1-y0];
+ localresults=new double[1+x1-x0][1+y1-y0];
//Use b transpose for cache performance
for(int i = x0; i<= x1; i++){
- int a[]=mmul.a[i];
+ double a[]=mmul.a[i];
int M=mmul.M;
for (int j = y0; j <= y1; j++) {
- int innerProduct=0;
- int b[] = mmul.btranspose[j];
+ double innerProduct=0;
+ double b[] = mmul.btranspose[j];
for(int k = 0; k < M; k++) {
innerProduct += a[k] *b[k];
}
atomic {
//write the results
for(int i=x0;i<=x1;i++) {
- int c[]=mmul.c[i];
+ double c[]=mmul.c[i];
for(int j=y0;j<=y1;j++) {
c[j]=localresults[i-x0][j-y0];
}
}
public static void main(String[] args) {
- int mid1 = (128<<24)|(195<<16)|(175<<8)|73;
+ int mid1 = (128<<24)|(195<<16)|(175<<8)|69;
int mid2 = (128<<24)|(195<<16)|(175<<8)|69;
int mid3 = (128<<24)|(195<<16)|(175<<8)|71;
- int NUM_THREADS = 1;
+ int NUM_THREADS = 2;
int p, q, r;
MatrixMultiply[] mm;
MatrixMultiply tmp;
}
atomic {
- mm[0] = global new MatrixMultiply(matrix,0,0,399,399);
+ mm[0] = global new MatrixMultiply(matrix,0,0,399,200);
+ mm[1] = global new MatrixMultiply(matrix,0,201,399,399);
}
atomic {
// print out the result of the matrix multiply
System.printString("Starting\n");
System.printString("Matrix Product c =\n");
- int val;
+ double val;
atomic {
for (int i = 0; i < p; i++) {
- int c[]=matrix.c[i];
+ double c[]=matrix.c[i];
for (int j = 0; j < r; j++) {
val = c[j];
}
public class MMul{
public int L, M, N;
- public int[][] a;
- public int[][] b;
- public int[][] c;
- public int[][] btranspose;
+ public double[][] a;
+ public double[][] b;
+ public double[][] c;
+ public double[][] btranspose;
public MMul(int L, int M, int N) {
this.L = L;
this.M = M;
this.N = N;
- a = global new int[L][M];
- b = global new int[M][N];
- c = global new int[L][N];
- btranspose = global new int[N][M];
+ a = global new double[L][M];
+ b = global new double[M][N];
+ c = global new double[L][N];
+ btranspose = global new double[N][M];
}
public void setValues() {
for(int i = 0; i < L; i++) {
- int ai[] = a[i];
+ double ai[] = a[i];
for(int j = 0; j < M; j++) {
ai[j] = j+1;
}
}
for(int i = 0; i < M; i++) {
- int bi[] = b[i];
+ double bi[] = b[i];
for(int j = 0; j < N; j++) {
bi[j] = j+1;
}
}
for(int i = 0; i < L; i++) {
- int ci[] = c[i];
+ double ci[] = c[i];
for(int j = 0; j < N; j++) {
ci[j] = 0;
}
}
for(int i = 0; i < N; i++) {
- int btransposei[] = btranspose[i];
+ double btransposei[] = btranspose[i];
for(int j = 0; j < M; j++) {
btransposei[j] = 0;
}
public void transpose() {
for(int row = 0; row < M; row++) {
- int brow[] = b[row];
+ double brow[] = b[row];
for(int col = 0; col < N; col++) {
btranspose[col][row] = brow[col];
}
extern int classsize[];
extern int numHostsInSystem;
+extern pthread_mutex_t notifymutex;
objstr_t *mainobjstore;
pthread_mutex_t mainobjstore_mutex;
unsigned short objType, *versionarry, version;
unsigned int *oidarry, numoid, mid, threadid;
- /*
- transinfo.objlocked = NULL;
- transinfo.objnotfound = NULL;
- transinfo.modptr = NULL;
- transinfo.numlocked = 0;
- transinfo.numnotfound = 0;
- */
-
/* Receive control messages from other machines */
while(1) {
int ret=recv_data_errorcode((int)acceptfd, &control, sizeof(char));
- if (ret==-1)
+ if (ret==-1) {
+ printf("DEBUG -> RECV Error!.. retrying\n");
break;
+ }
switch(control) {
case READ_REQUEST:
/* Read oid requested and search if available */
memcpy((char*)header + sizeof(objheader_t), ((char *)modptr + sizeof(objheader_t) + offset), tmpsize);
header->version += 1;
/* If threads are waiting on this object to be updated, notify them */
+ pthread_mutex_lock(¬ifymutex);
if(header->notifylist != NULL) {
notifyAll(&header->notifylist, OID(header), header->version);
}
+ pthread_mutex_unlock(¬ifymutex);
pthread_mutex_unlock(&mainobjstore_mutex);
offset += sizeof(objheader_t) + tmpsize;
}
struct sockaddr_in remoteAddr;
int bytesSent;
int size;
-
int i = 0;
+
while(i < numoid) {
oid = *(oidarry + i);
if((header = (objheader_t *) mhashSearch(oid)) == NULL) {
/* Check to see if versions are same */
checkversion:
if ((STATUS(header) & LOCK) != LOCK) {
- //FIXME make locking atomic
+ pthread_mutex_lock(¬ifymutex);
STATUS(header) |= LOCK;
newversion = header->version;
if(newversion == *(versionarry + i)) {
//Add to the notify list
if((header->notifylist = insNode(header->notifylist, threadid, mid)) == NULL) {
printf("Error: Obj notify list points to NULL %s, %d\n", __FILE__, __LINE__);
+ pthread_mutex_unlock(¬ifymutex);
return;
}
STATUS(header) &= ~(LOCK);
+ pthread_mutex_unlock(¬ifymutex);
} else {
STATUS(header) &= ~(LOCK);
+ pthread_mutex_unlock(¬ifymutex);
if ((sd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
perror("processReqNotify():socket()");
return;
extern objstr_t *prefetchcache;
objstr_t *objstrCreate(unsigned int size) {
- objstr_t *tmp = calloc(1, (sizeof(objstr_t) + size));
+ objstr_t *tmp;
+ if((tmp = calloc(1, (sizeof(objstr_t) + size))) == NULL) {
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
+ }
tmp->size = size;
tmp->next = NULL;
tmp->top = tmp + 1; //points to end of objstr_t structure!
{ //end of list, all full
if (size > DEFAULT_OBJ_STORE_SIZE) //in case of large objects
{
- store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size));
+ if((store->next = (objstr_t *)calloc(1,(sizeof(objstr_t) + size))) == NULL) {
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
+ }
if (store->next == NULL)
return NULL;
store = store->next;
}
else
{
- store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE));
+ if((store->next = calloc(1,(sizeof(objstr_t) + DEFAULT_OBJ_STORE_SIZE))) == NULL) {
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
+ }
if (store->next == NULL)
return NULL;
store = store->next;
sockPoolHashTable_t *transReadSockPool;
sockPoolHashTable_t *transPrefetchSockPool;
+pthread_mutex_t notifymutex;
void printhex(unsigned char *, int);
plistnode_t *createPiles(transrecord_t *);
int top=endoffsets[ntuples-1];
*((int *)(node+len))=ntuples;
len += sizeof(int);
- /* int i;
- for(i=0;i<ntuples;i++) {
- if (oids[i]%2==0&&(oids[i]!=0)) {
- printf("Bad oid %ld\n",oids[i]);
- }
- }*/
-
memcpy(node+len, oids, ntuples*sizeof(unsigned int));
memcpy(node+len+ntuples*sizeof(unsigned int), endoffsets, ntuples*sizeof(unsigned short));
memcpy(node+len+ntuples*(sizeof(unsigned int)+sizeof(short)), arrayfields, top*sizeof(short));
pthread_mutex_init(&prefetchcache_mutex, &prefetchcache_mutex_attr);
+ pthread_mutex_init(¬ifymutex, NULL);
//Create prefetch cache lookup table
if(prehashCreate(HASH_SIZE, LOADFACTOR)) {
printf("ERROR\n");
/* This function initializes things required in the transaction start*/
transrecord_t *transStart() {
- transrecord_t *tmp = calloc(1, sizeof(transrecord_t));
+ transrecord_t *tmp;
+ if((tmp = calloc(1, sizeof(transrecord_t))) == NULL){
+ printf("%s() Calloc error at line %d, %s\n", __func__, __LINE__, __FILE__);
+ return NULL;
+ }
tmp->cache = objstrCreate(1048576);
tmp->lookupTable = chashCreate(HASH_SIZE, LOADFACTOR);
#ifdef COMPILER
chashInsert(record->lookupTable, oid, objcopy);
}
- // freeSock(transReadSockPool, mnum, sd);
-
return objcopy;
}
}
GETSIZE(tmpsize, header);
pthread_mutex_lock(&mainobjstore_mutex);
- memcpy((char*)header+sizeof(objheader_t), (char *)tcptr+ sizeof(objheader_t), tmpsize);
+ char *tmptcptr = (char *) tcptr;
+ memcpy((char*)header+sizeof(objheader_t), (char *)tmptcptr+ sizeof(objheader_t), tmpsize);
header->version += 1;
+ pthread_mutex_lock(¬ifymutex);
if(header->notifylist != NULL) {
notifyAll(&header->notifylist, OID(header), header->version);
}
+ pthread_mutex_unlock(¬ifymutex);
pthread_mutex_unlock(&mainobjstore_mutex);
}
/* If object is newly created inside transaction then commit it */
for (i = 0; i < numcreated; i++) {
if ((header = ((objheader_t *) chashSearch(localtdata->tdata->rec->lookupTable, oidcreated[i]))) == NULL) {
- printf("Error: transComProcess() chashSearch returned NULL at %s, %d\n", __FILE__, __LINE__);
+ printf("Error: transComProcess() chashSearch returned NULL for oid = %x at %s, %d\n", oidcreated[i], __FILE__, __LINE__);
return 1;
}
GETSIZE(tmpsize, header);
pthread_cond_t threadcond = PTHREAD_COND_INITIALIZER;
notifydata_t *ndata;
- //FIXME currently all oids belong to one machine
oid = oidarry[0];
if((mid = lhashSearch(oid)) == 0) {
printf("Error: %s() No such machine found for oid =%x\n",__func__, oid);
printf("notifyAll():error %d connecting to %s:%d\n", errno,
inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
status = -1;
+ fflush(stdout);
} else {
bzero(msg, (1+sizeof(unsigned short) + 2*sizeof(unsigned int)));
msg[0] = THREAD_NOTIFY_RESPONSE;
}
void transAbort(transrecord_t *trans) {
- objstrDelete(trans->cache);
- chashDelete(trans->lookupTable);
- free(trans);
+ objstrDelete(trans->cache);
+ chashDelete(trans->lookupTable);
+ free(trans);
}