struct RuntimeHash *fdtoobject;
void addreadfd(int fd) {
- if (fd>maxreadfd)
- fd=maxreadfd;
+ if (fd>=maxreadfd)
+ maxreadfd=fd+1;
FD_SET(fd, &readfds);
}
void removereadfd(int fd) {
FD_CLR(fd, &readfds);
- if (maxreadfd==fd) {
+ if (maxreadfd==(fd+1)) {
maxreadfd--;
- while(!FD_ISSET(maxreadfd, &readfds)&&maxreadfd>0)
+ while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds))
maxreadfd--;
}
}
mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0);
newtask:
- while(!isEmpty(activetasks)) {
- struct QueueItem * qi=(struct QueueItem *) getTail(activetasks);
- struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr;
- int i;
- struct timeval timeout={0,0};
- fd_set tmpreadfds;
- int numselect;
- FD_COPY(&readfds, &tmpreadfds);
- numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
- if (numselect>0) {
- /* Process ready fd's */
- int fd;
- for(fd=0;fd<maxreadfd;fd++) {
- if (FD_ISSET(fd, &readfds)) {
- /* Set ready flag on object */
- void * objptr;
- if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
- flagorand(objptr,1,0xFFFFFFFF); /* Set the first flag to 1 */
+ while(!isEmpty(activetasks)||(maxreadfd>0)) {
+
+ if (maxreadfd>0) {
+ int i;
+ struct timeval timeout={0,0};
+ fd_set tmpreadfds;
+ int numselect;
+ FD_COPY(&readfds, &tmpreadfds);
+ numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout);
+ if (numselect>0) {
+ /* Process ready fd's */
+ int fd;
+ for(fd=0;fd<maxreadfd;fd++) {
+ if (FD_ISSET(fd, &tmpreadfds)) {
+ /* Set ready flag on object */
+ void * objptr;
+ if (RuntimeHashget(fdtoobject, fd,(int *) &objptr)) {
+ flagorand(objptr,1,0xFFFFFFFF); /* Set the first flag to 1 */
+ }
}
}
}
}
-
- removeItem(activetasks, qi);
-
- for(i=0;i<tpd->task->numParameters;i++) {
- void * parameter=tpd->parameterArray[i];
- struct parameterdescriptor * pd=tpd->task->descriptorarray[i];
- struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
- if (!RuntimeHashcontainskey(pw->objectset, (int) parameter))
- goto newtask;
- taskpointerarray[i]=parameter;
- }
- {
- struct RuntimeHash * forward=allocateRuntimeHash(100);
- struct RuntimeHash * reverse=allocateRuntimeHash(100);
- void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse);
- if (setjmp(error_handler)) {
- /* Recover */
- int h;
+
+ if (!isEmpty(activetasks)) {
+ int i;
+ struct QueueItem * qi=(struct QueueItem *) getTail(activetasks);
+ struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr;
+ removeItem(activetasks, qi);
+
+ for(i=0;i<tpd->task->numParameters;i++) {
+ void * parameter=tpd->parameterArray[i];
+ struct parameterdescriptor * pd=tpd->task->descriptorarray[i];
+ struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue;
+ if (!RuntimeHashcontainskey(pw->objectset, (int) parameter))
+ goto newtask;
+ taskpointerarray[i]=parameter;
+ }
+ {
+ struct RuntimeHash * forward=allocateRuntimeHash(100);
+ struct RuntimeHash * reverse=allocateRuntimeHash(100);
+ void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse);
+ if (setjmp(error_handler)) {
+ /* Recover */
+ int h;
#ifdef DEBUG
- printf("Recovering\n");
+ printf("Recovering\n");
#endif
- genputtable(failedtasks,tpd,tpd);
- restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse);
- } else {
- /* Actually call task */
- ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray);
+ genputtable(failedtasks,tpd,tpd);
+ restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse);
+ } else {
+ /* Actually call task */
+ ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray);
+ }
}
}
}
int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int port) {
- int fd=socket(AF_INET, SOCK_STREAM, 0);
+ int fd;
+
int n=1;
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_port = htons (port);
sin.sin_addr.s_addr = htonl (INADDR_ANY);
-
- if (fd<0)
+ fd=socket(AF_INET, SOCK_STREAM, 0);
+ if (fd<0) {
+#ifdef DEBUG
+ perror(NULL);
+ printf("createSocket error #1\n");
+#endif
longjmp(error_handler,5);
-
+ }
+
if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof (n)) < 0) {
close(fd);
+#ifdef DEBUG
+ perror(NULL);
+ printf("createSocket error #2\n");
+#endif
longjmp(error_handler, 6);
}
fcntl(fd, F_SETFD, 1);
/* bind to port */
if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))<0) {
close (fd);
+#ifdef DEBUG
+ perror(NULL);
+ printf("createSocket error #3\n");
+#endif
longjmp(error_handler, 7);
}
/* listen */
- if (listen(fd, 10)<0) {
+ if (listen(fd, 5)<0) {
close (fd);
+#ifdef DEBUG
+ perror(NULL);
+ printf("createSocket error #4\n");
+#endif
longjmp(error_handler, 8);
}
/* Store the fd/socket object mapping */
RuntimeHashadd(fdtoobject, fd, (int) sock);
addreadfd(fd);
-
return fd;
}
-int ___ServerSocket______nativeaccept____L___Socket____I(struct ___Socket___ * sock, int fd) {
+int ___ServerSocket______nativeaccept____L___Socket___(struct ___ServerSocket___ * serversock, struct ___Socket___ * sock) {
struct sockaddr_in sin;
unsigned int sinlen=sizeof(sin);
- int newfd=accept(fd, (struct sockaddr *)&sin, &sinlen);
+ int fd=serversock->___fd___;
+ int newfd;
+ newfd=accept(fd, (struct sockaddr *)&sin, &sinlen);
+
if (newfd<0) {
+#ifdef DEBUG
+ perror(NULL);
+ printf("acceptSocket error #1\n");
+#endif
longjmp(error_handler, 9);
}
fcntl(newfd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK);
- RuntimeHashadd(fdtoobject, fd, (int) sock);
- addreadfd(fd);
+ RuntimeHashadd(fdtoobject, newfd, (int) sock);
+ addreadfd(newfd);
+ flagorand(serversock,0,0xFFFFFFFE);
return newfd;
}
-void ___Socket______nativeWrite_____AR_C_I(struct ArrayObject * ao, int fd) {
+
+void ___Socket______nativeWrite_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) {
+ int fd=sock->___fd___;
int length=ao->___length___;
char * charstr=((char *)& ao->___length___)+sizeof(int);
int bytewritten=write(fd, charstr, length);
if (bytewritten!=length) {
printf("ERROR IN NATIVEWRITE\n");
}
+ flagorand(sock,0,0xFFFFFFFE);
}
-int ___Socket______nativeRead_____AR_C_I(struct ArrayObject * ao, int fd) {
+int ___Socket______nativeRead_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) {
+ int fd=sock->___fd___;
int length=ao->___length___;
char * charstr=((char *)& ao->___length___)+sizeof(int);
int byteread=read(fd, charstr, length);
if (byteread<0) {
printf("ERROR IN NATIVEREAD\n");
}
+ flagorand(sock,0,0xFFFFFFFE);
return byteread;
}
-void ___Socket______nativeClose____I(int fd) {
+void ___Socket______nativeClose____(struct ___Socket___ * sock) {
+ int fd=sock->___fd___;
int data;
RuntimeHashget(fdtoobject, fd, &data);
RuntimeHashremove(fdtoobject, fd, data);
removereadfd(fd);
close(fd);
+ flagorand(sock,0,0xFFFFFFFE);
}
#endif