From: bdemsky Date: Thu, 5 Oct 2006 21:35:49 +0000 (+0000) Subject: Socket I/O code and Example X-Git-Tag: preEdgeChange~817 X-Git-Url: http://demsky.eecs.uci.edu/git/?a=commitdiff_plain;h=709d2e7f2de21a60a8cd13fcc6ef2235f3debc92;p=IRC.git Socket I/O code and Example --- diff --git a/Robust/src/ClassLibrary/ServerSocket.java b/Robust/src/ClassLibrary/ServerSocket.java index 58ebc8f0..d99ff83e 100644 --- a/Robust/src/ClassLibrary/ServerSocket.java +++ b/Robust/src/ClassLibrary/ServerSocket.java @@ -1,6 +1,6 @@ public class ServerSocket { /* Socket pending flag */ - flag SocketPending; + external flag SocketPending; /* File Descriptor */ int fd; @@ -12,12 +12,12 @@ public class ServerSocket { public Socket accept() { Socket s=new Socket(); - int newfd=nativeaccept(s, fd); + int newfd=nativeaccept(s); s.setFD(newfd); return s; } - private static native int nativeaccept(Socket s,int fd); + private native int nativeaccept(Socket s); public void close(); diff --git a/Robust/src/ClassLibrary/Socket.java b/Robust/src/ClassLibrary/Socket.java index 1f0f32c0..8ad852d5 100644 --- a/Robust/src/ClassLibrary/Socket.java +++ b/Robust/src/ClassLibrary/Socket.java @@ -1,6 +1,6 @@ public class Socket { /* Data pending flag */ - flag IOPending; + external flag IOPending; /* File Descriptor */ int fd; @@ -12,17 +12,17 @@ public class Socket { } public int read(byte[] b) { - return nativeRead(b, fd); + return nativeRead(b); } public void write(byte[] b) { - nativeWrite(b, fd); + nativeWrite(b); } - private native static int nativeRead(byte[] b, int fd); - private native static void nativeWrite(byte[] b, int fd); - private native static void nativeClose(int fd); + private native int nativeRead(byte[] b); + private native void nativeWrite(byte[] b); + private native void nativeClose(); public void close() { - nativeClose(fd); + nativeClose(); } } diff --git a/Robust/src/IR/FlagDescriptor.java b/Robust/src/IR/FlagDescriptor.java index f2f6bfd2..f358fb62 100644 --- a/Robust/src/IR/FlagDescriptor.java +++ b/Robust/src/IR/FlagDescriptor.java @@ -12,6 +12,15 @@ public class FlagDescriptor extends Descriptor { super(identifier); } + private boolean isExternal=false; + public void makeExternal() { + isExternal=true; + } + + public boolean getExternal() { + return isExternal; + } + public String toString() { return "Flag "+getSymbol(); } diff --git a/Robust/src/IR/Tree/BuildIR.java b/Robust/src/IR/Tree/BuildIR.java index 52becbd0..c7bdb918 100644 --- a/Robust/src/IR/Tree/BuildIR.java +++ b/Robust/src/IR/Tree/BuildIR.java @@ -246,7 +246,10 @@ public class BuildIR { private void parseFlagDecl(ClassDescriptor cn,ParseNode pn) { String name=pn.getChild("name").getTerminal(); - cn.addFlag(new FlagDescriptor(name)); + FlagDescriptor flag=new FlagDescriptor(name); + if (pn.getChild("external")!=null) + flag.makeExternal(); + cn.addFlag(flag); } private void parseFieldDecl(ClassDescriptor cn,ParseNode pn) { diff --git a/Robust/src/IR/Tree/SemanticCheck.java b/Robust/src/IR/Tree/SemanticCheck.java index eb1b5ec2..a2cd5122 100644 --- a/Robust/src/IR/Tree/SemanticCheck.java +++ b/Robust/src/IR/Tree/SemanticCheck.java @@ -116,6 +116,8 @@ public class SemanticCheck { //Make sure the flag is declared if (flag_d==null) throw new Error("Flag descriptor "+name+" undefined in class: "+cd.getSymbol()); + if (flag_d.getExternal()) + throw new Error("Attempting to modify external flag: "+name); flag.setFlag(flag_d); } } @@ -505,6 +507,8 @@ public class SemanticCheck { //Make sure the flag is declared if (flag_d==null) throw new Error("Flag descriptor "+name+" undefined in class: "+cd.getSymbol()); + if (flag_d.getExternal()) + throw new Error("Attempting to modify external flag: "+name); flag.setFlag(flag_d); } } diff --git a/Robust/src/Lex/Keyword.java b/Robust/src/Lex/Keyword.java index f91131ae..bdd7332c 100644 --- a/Robust/src/Lex/Keyword.java +++ b/Robust/src/Lex/Keyword.java @@ -66,6 +66,7 @@ class Keyword extends Token { key_table.put("while", new Integer(Sym.WHILE)); //Keywords for failure aware computation key_table.put("flag", new Integer(Sym.FLAG)); + key_table.put("external", new Integer(Sym.EXTERNAL)); key_table.put("tag", new Integer(Sym.TAG)); key_table.put("task", new Integer(Sym.TASK)); key_table.put("taskexit", new Integer(Sym.TASKEXIT)); diff --git a/Robust/src/Lex/Lexer.java b/Robust/src/Lex/Lexer.java index 456cd0d1..448cbff2 100644 --- a/Robust/src/Lex/Lexer.java +++ b/Robust/src/Lex/Lexer.java @@ -246,7 +246,7 @@ public class Lexer { static final String[] keywords = new String[] { "abstract", "assert", "boolean", "break", "byte", "case", "catch", "char", "class", "const", "continue", "default", "do", "double", "else", "enum", - "extends", "final", "finally", + "extends", "external", "final", "finally", "flag", //keyword for failure aware computation "float", "for", "goto", "if", "implements", "import", "instanceof", "int", "interface", "long", diff --git a/Robust/src/Parse/java14.cup b/Robust/src/Parse/java14.cup index 4a6877a4..ae8058e5 100644 --- a/Robust/src/Parse/java14.cup +++ b/Robust/src/Parse/java14.cup @@ -221,6 +221,7 @@ non terminal ParseNode expression_opt, expression; //non terminal ParseNode constant_expression; //failure aware computation keywords terminal FLAG; +terminal EXTERNAL; terminal TAG; terminal TASK; terminal TASKEXIT; @@ -683,6 +684,12 @@ flag_declaration ::= ParseNode pn=new ParseNode("flag_declaration"); pn.addChild("name").addChild(id); RESULT=pn; + :} | + EXTERNAL FLAG IDENTIFIER:id SEMICOLON {: + ParseNode pn=new ParseNode("flag_declaration"); + pn.addChild("name").addChild(id); + pn.addChild("external"); + RESULT=pn; :} ; diff --git a/Robust/src/Runtime/runtime.c b/Robust/src/Runtime/runtime.c index cab31e81..7393e7e4 100644 --- a/Robust/src/Runtime/runtime.c +++ b/Robust/src/Runtime/runtime.c @@ -185,16 +185,16 @@ int maxreadfd; struct RuntimeHash *fdtoobject; void addreadfd(int fd) { - if (fd>maxreadfd) - fd=maxreadfd; + if (fd>=maxreadfd) + maxreadfd=fd+1; FD_SET(fd, &readfds); } void removereadfd(int fd) { FD_CLR(fd, &readfds); - if (maxreadfd==fd) { + if (maxreadfd==(fd+1)) { maxreadfd--; - while(!FD_ISSET(maxreadfd, &readfds)&&maxreadfd>0) + while(maxreadfd>0&&!FD_ISSET(maxreadfd-1, &readfds)) maxreadfd--; } } @@ -222,54 +222,60 @@ void executetasks() { mmap(0, 0x1000, 0, MAP_SHARED|MAP_FIXED|MAP_ANON, -1, 0); newtask: - while(!isEmpty(activetasks)) { - struct QueueItem * qi=(struct QueueItem *) getTail(activetasks); - struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr; - int i; - struct timeval timeout={0,0}; - fd_set tmpreadfds; - int numselect; - FD_COPY(&readfds, &tmpreadfds); - numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout); - if (numselect>0) { - /* Process ready fd's */ - int fd; - for(fd=0;fd0)) { + + if (maxreadfd>0) { + int i; + struct timeval timeout={0,0}; + fd_set tmpreadfds; + int numselect; + FD_COPY(&readfds, &tmpreadfds); + numselect=select(maxreadfd, &tmpreadfds, NULL, NULL, &timeout); + if (numselect>0) { + /* Process ready fd's */ + int fd; + for(fd=0;fdtask->numParameters;i++) { - void * parameter=tpd->parameterArray[i]; - struct parameterdescriptor * pd=tpd->task->descriptorarray[i]; - struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue; - if (!RuntimeHashcontainskey(pw->objectset, (int) parameter)) - goto newtask; - taskpointerarray[i]=parameter; - } - { - struct RuntimeHash * forward=allocateRuntimeHash(100); - struct RuntimeHash * reverse=allocateRuntimeHash(100); - void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse); - if (setjmp(error_handler)) { - /* Recover */ - int h; + + if (!isEmpty(activetasks)) { + int i; + struct QueueItem * qi=(struct QueueItem *) getTail(activetasks); + struct taskparamdescriptor *tpd=(struct taskparamdescriptor *) qi->objectptr; + removeItem(activetasks, qi); + + for(i=0;itask->numParameters;i++) { + void * parameter=tpd->parameterArray[i]; + struct parameterdescriptor * pd=tpd->task->descriptorarray[i]; + struct parameterwrapper *pw=(struct parameterwrapper *) pd->queue; + if (!RuntimeHashcontainskey(pw->objectset, (int) parameter)) + goto newtask; + taskpointerarray[i]=parameter; + } + { + struct RuntimeHash * forward=allocateRuntimeHash(100); + struct RuntimeHash * reverse=allocateRuntimeHash(100); + void ** checkpoint=makecheckpoint(tpd->task->numParameters, taskpointerarray, forward, reverse); + if (setjmp(error_handler)) { + /* Recover */ + int h; #ifdef DEBUG - printf("Recovering\n"); + printf("Recovering\n"); #endif - genputtable(failedtasks,tpd,tpd); - restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse); - } else { - /* Actually call task */ - ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray); + genputtable(failedtasks,tpd,tpd); + restorecheckpoint(tpd->task->numParameters, taskpointerarray, checkpoint, forward, reverse); + } else { + /* Actually call task */ + ((void (*) (void **)) tpd->task->taskptr)(taskpointerarray); + } } } } @@ -302,7 +308,8 @@ void processtasks() { int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int port) { - int fd=socket(AF_INET, SOCK_STREAM, 0); + int fd; + int n=1; struct sockaddr_in sin; @@ -310,12 +317,21 @@ int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int sin.sin_family = AF_INET; sin.sin_port = htons (port); sin.sin_addr.s_addr = htonl (INADDR_ANY); - - if (fd<0) + fd=socket(AF_INET, SOCK_STREAM, 0); + if (fd<0) { +#ifdef DEBUG + perror(NULL); + printf("createSocket error #1\n"); +#endif longjmp(error_handler,5); - + } + if (setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof (n)) < 0) { close(fd); +#ifdef DEBUG + perror(NULL); + printf("createSocket error #2\n"); +#endif longjmp(error_handler, 6); } fcntl(fd, F_SETFD, 1); @@ -324,47 +340,66 @@ int ___ServerSocket______createSocket____I(struct ___ServerSocket___ * sock, int /* bind to port */ if (bind(fd, (struct sockaddr *) &sin, sizeof(sin))<0) { close (fd); +#ifdef DEBUG + perror(NULL); + printf("createSocket error #3\n"); +#endif longjmp(error_handler, 7); } /* listen */ - if (listen(fd, 10)<0) { + if (listen(fd, 5)<0) { close (fd); +#ifdef DEBUG + perror(NULL); + printf("createSocket error #4\n"); +#endif longjmp(error_handler, 8); } /* Store the fd/socket object mapping */ RuntimeHashadd(fdtoobject, fd, (int) sock); addreadfd(fd); - return fd; } -int ___ServerSocket______nativeaccept____L___Socket____I(struct ___Socket___ * sock, int fd) { +int ___ServerSocket______nativeaccept____L___Socket___(struct ___ServerSocket___ * serversock, struct ___Socket___ * sock) { struct sockaddr_in sin; unsigned int sinlen=sizeof(sin); - int newfd=accept(fd, (struct sockaddr *)&sin, &sinlen); + int fd=serversock->___fd___; + int newfd; + newfd=accept(fd, (struct sockaddr *)&sin, &sinlen); + if (newfd<0) { +#ifdef DEBUG + perror(NULL); + printf("acceptSocket error #1\n"); +#endif longjmp(error_handler, 9); } fcntl(newfd, F_SETFL, fcntl(fd, F_GETFL)|O_NONBLOCK); - RuntimeHashadd(fdtoobject, fd, (int) sock); - addreadfd(fd); + RuntimeHashadd(fdtoobject, newfd, (int) sock); + addreadfd(newfd); + flagorand(serversock,0,0xFFFFFFFE); return newfd; } -void ___Socket______nativeWrite_____AR_C_I(struct ArrayObject * ao, int fd) { + +void ___Socket______nativeWrite_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) { + int fd=sock->___fd___; int length=ao->___length___; char * charstr=((char *)& ao->___length___)+sizeof(int); int bytewritten=write(fd, charstr, length); if (bytewritten!=length) { printf("ERROR IN NATIVEWRITE\n"); } + flagorand(sock,0,0xFFFFFFFE); } -int ___Socket______nativeRead_____AR_C_I(struct ArrayObject * ao, int fd) { +int ___Socket______nativeRead_____AR_C(struct ___Socket___ * sock, struct ArrayObject * ao) { + int fd=sock->___fd___; int length=ao->___length___; char * charstr=((char *)& ao->___length___)+sizeof(int); int byteread=read(fd, charstr, length); @@ -372,15 +407,18 @@ int ___Socket______nativeRead_____AR_C_I(struct ArrayObject * ao, int fd) { if (byteread<0) { printf("ERROR IN NATIVEREAD\n"); } + flagorand(sock,0,0xFFFFFFFE); return byteread; } -void ___Socket______nativeClose____I(int fd) { +void ___Socket______nativeClose____(struct ___Socket___ * sock) { + int fd=sock->___fd___; int data; RuntimeHashget(fdtoobject, fd, &data); RuntimeHashremove(fdtoobject, fd, data); removereadfd(fd); close(fd); + flagorand(sock,0,0xFFFFFFFE); } #endif diff --git a/Robust/src/Tests/ServerExample.java b/Robust/src/Tests/ServerExample.java new file mode 100644 index 00000000..82b52ceb --- /dev/null +++ b/Robust/src/Tests/ServerExample.java @@ -0,0 +1,27 @@ +/* Startup object is generated with the initialstate flag set by the + * system to start the computation up */ + +task Startup(StartupObject s {initialstate}) { + System.printString("Starting\n"); + ServerSocket ss=new ServerSocket(8000); + System.printString("Creating ServerSocket\n"); + taskexit(s {!initialstate}); /* Turns initial state flag off, so this task won't refire */ +} + +task AcceptConnection(ServerSocket ss{SocketPending}) { + Socket s=ss.accept(); + System.printString("Creating Socket\n"); +} + +task IncomingIO(Socket s{IOPending}) { + byte[] b=new byte[10]; + int length=s.read(b); + byte[] b2=new byte[length]; + int i; + for(i=0;i