#define TRANS_SOFT_ABORT 20
#define TRANS_SUCESSFUL 21
#define TRANS_PREFETCH_RESPONSE 22
+#define START_REMOTE_THREAD 23
//Control bits for status of objects in Machine pile
#define OBJ_LOCKED_BUT_VERSION_MATCH 14
int transPrefetchProcess(transrecord_t *, int **, short);
void sendPrefetchReq(prefetchpile_t*, int);
void getPrefetchResponse(int, int);
+unsigned short getObjType(unsigned int oid);
+int startRemoteThread(unsigned int oid, unsigned int mid);
/* end transactions */
#endif
void *srcObj;
objheader_t *h;
trans_commit_data_t transinfo;
+ unsigned short objType;
int fd_flags = fcntl((int)acceptfd, F_GETFD), size;
return;
}
break;
+ case START_REMOTE_THREAD:
+ retval = recv((int)acceptfd, &oid, sizeof(unsigned int), 0);
+ if (retval <= 0)
+ perror("dstmAccept(): error receiving START_REMOTE_THREAD msg");
+ else if (retval != sizeof(unsigned int))
+ printf("dstmAccept(): incorrect msg size %d for START_REMOTE_THREAD\n",
+ retval);
+ else
+ { //TODO: execute run method on this global thread object
+ printf("dstmAccept(): received START_REMOTE_THREAD msg, oid=%d\n", oid);
+ objType = getObjType(oid);
+ printf("dstmAccept(): type of object %d is %d\n", oid, objType);
+ }
+ break;
default:
printf("DEBUG -> dstmAccept: Error Unknown opcode %d\n", control);
#include <netdb.h>
#include <netinet/in.h>
#include <string.h>
+#include <sys/ioctl.h>
+#include <net/if.h>
#define LISTEN_PORT 2156
close(tmpsd);
return 0;
}
+
+unsigned int getMyIpAddr(const char *interfaceStr)
+{
+ int sock;
+ struct ifreq interfaceInfo;
+ struct sockaddr_in *myAddr = (struct sockaddr_in *)&interfaceInfo.ifr_addr;
+
+ memset(&interfaceInfo, 0, sizeof(struct ifreq));
+
+ if((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ {
+ perror("getMyIpAddr():socket()");
+ return 1;
+ }
+
+ strcpy(interfaceInfo.ifr_name, interfaceStr);
+ myAddr->sin_family = AF_INET;
+
+ if(ioctl(sock, SIOCGIFADDR, &interfaceInfo) != 0)
+ {
+ perror("getMyIpAddr():ioctl()");
+ return 1;
+ }
+
+ close(sock);
+
+ return ntohl(myAddr->sin_addr.s_addr);
+}
+
/*
main() {
unsigned int mid;
/* Save the oids not found and number of oids not found for later use */
if ((mobj = mhashSearch(oid)) == NULL) {/* Obj not found */
/* Save the oids not found and number of oids not found for later use */
-
- oidnotfound[objnotfound] = OID(((objheader_t *)mobj));
+ oidnotfound[objnotfound] = oid;
objnotfound++;
} else { /* If Obj found in machine (i.e. has not moved) */
/* Check if Obj is locked by any previous transaction */
printf("Error in receving response for prefetch request %s, %d\n",__FILE__, __LINE__);
return;
}
+
+unsigned short getObjType(unsigned int oid)
+{
+ objheader_t *objheader;
+ unsigned short numoffsets = 0;
+
+ if ((objheader = (objheader_t *) mhashSearch(oid)) == NULL)
+ {
+ if ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+ {
+ prefetch(1, &oid, &numoffsets, NULL);
+ pthread_mutex_lock(&pflookup.lock);
+ while ((objheader = (objheader_t *) prehashSearch(oid)) == NULL)
+ pthread_cond_wait(&pflookup.cond, &pflookup.lock);
+ pthread_mutex_unlock(&pflookup.lock);
+ }
+ }
+
+ return TYPE(objheader);
+}
+
+int startRemoteThread(unsigned int oid, unsigned int mid)
+{
+ int sock;
+ struct sockaddr_in remoteAddr;
+ char msg[1 + sizeof(unsigned int)];
+ int bytesSent;
+ int status;
+
+ if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
+ {
+ perror("startRemoteThread():socket()");
+ return -1;
+ }
+
+ bzero(&remoteAddr, sizeof(remoteAddr));
+ remoteAddr.sin_family = AF_INET;
+ remoteAddr.sin_port = htons(LISTEN_PORT);
+ remoteAddr.sin_addr.s_addr = htonl(mid);
+
+ if (connect(sock, (struct sockaddr *)&remoteAddr, sizeof(remoteAddr)) < 0)
+ {
+ printf("startRemoteThread():error %d connecting to %s:%d\n", errno,
+ inet_ntoa(remoteAddr.sin_addr), LISTEN_PORT);
+ status = -1;
+ }
+ else
+ {
+ msg[0] = START_REMOTE_THREAD;
+ memcpy(&msg[1], &oid, sizeof(unsigned int));
+
+ bytesSent = send(sock, msg, 1 + sizeof(unsigned int), 0);
+ if (bytesSent < 0)
+ {
+ perror("startRemoteThread():send()");
+ status = -1;
+ }
+ else if (bytesSent != 1 + sizeof(unsigned int))
+ {
+ printf("startRemoteThread(): error, sent %d bytes\n", bytesSent);
+ status = -1;
+ }
+ else
+ {
+ status = 0;
+ }
+ }
+
+ close(sock);
+ return status;
+}
+