--- /dev/null
+// MuxStream.java\r
+// $Id: MuxStream.java,v 1.1 2010/06/15 12:26:33 smhuang Exp $\r
+// (c) COPYRIGHT MIT and INRIA, 1996.\r
+// Please first read the full copyright statement in file COPYRIGHT.html\r
+\r
+package org.w3c.www.mux;\r
+\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.io.OutputStream;\r
+import java.io.PrintStream;\r
+\r
+import java.net.InetAddress;\r
+import java.net.Socket;\r
+\r
+public class MuxStream {\r
+ /**\r
+ * That stream accept handler.\r
+ */\r
+ protected MuxStreamHandler handler = null;\r
+ /**\r
+ * This stream reader.\r
+ */\r
+ protected MuxReader reader = null;\r
+ /**\r
+ * This stream writer.\r
+ */\r
+ protected MuxWriter writer = null;\r
+ /**\r
+ * Currently defined sessions.\r
+ */\r
+ protected MuxSession sessions[] = null;\r
+ /**\r
+ * Is this the server side of the MUX channel ?\r
+ */\r
+ protected boolean server = false;\r
+ /**\r
+ * Inet address of the other end's connection (maybe <strong>null</strong>)\r
+ */\r
+ protected InetAddress inetaddr = null;\r
+ /**\r
+ * The raw input stream.\r
+ */\r
+ protected InputStream in = null;\r
+ /**\r
+ * The raw output stream.\r
+ */\r
+ protected OutputStream out = null;\r
+ /**\r
+ * Is this muxed stream still alive ?\r
+ */\r
+ protected boolean alive = true;\r
+\r
+ // Assumes sessions array is of correct size, and checks have been done\r
+\r
+ private synchronized MuxSession createSession(int sessid, int protid)\r
+ throws IOException\r
+ {\r
+ MuxSession session = sessions[sessid];\r
+ if ( session == null ) {\r
+ session = new MuxSession(this, sessid, protid);\r
+ sessions[sessid] = session;\r
+ } else {\r
+ System.out.println("MuxStream:createSession: already existing !");\r
+ }\r
+ return session;\r
+ }\r
+\r
+ // Are we willing to accept that new session ?\r
+ // Because we need to accept it internally, we always return something\r
+ // It is up to the caller to make sure flags has SYN set\r
+ // NOTE that the calls to the handler don't lock that object (a feature)\r
+\r
+ private MuxSession acceptSession(int flags\r
+ , int sessid\r
+ , int protid)\r
+ throws IOException\r
+ {\r
+ if (server & ((sessid & 1) == 0)) \r
+ throw new IOException("MUX: Invalid even session id "+sessid);\r
+ // Query the session handler about that new session:\r
+ MuxSession session = null;\r
+ if ((handler != null) && handler.acceptSession(this, sessid, protid)) {\r
+ // Session accepted, setup handler:\r
+ session = createSession(sessid, protid);\r
+ handler.notifySession(session);\r
+ } else {\r
+ // Session rejected, emit a RST:\r
+ session = null;\r
+System.out.println(this+": RST (accept) session "+sessid);\r
+ writer.writeMessage(sessid, MUX.RST, 0);\r
+ writer.flush();\r
+ }\r
+ return session;\r
+ }\r
+\r
+ private final synchronized MuxSession allocateSession(int protid) \r
+ throws IOException\r
+ {\r
+ // Available sessions ?\r
+ int i = (server ? 2 : 3);\r
+ for ( ; i < sessions.length; i += 2) {\r
+ if ( sessions[i] == null ) {\r
+ sessions[i] = new MuxSession(this, i, protid);\r
+ return sessions[i];\r
+ }\r
+ }\r
+ // Create a new session:\r
+ MuxSession session = checkSession(i);\r
+ if ( session == null )\r
+ session = new MuxSession(this, i, protid);\r
+ sessions[i] = session;\r
+ return session;\r
+ }\r
+\r
+ private final synchronized MuxSession checkSession(int sessid) \r
+ throws IOException\r
+ {\r
+ // Check protocol validity:\r
+ if ( sessid >= MUX.MAX_SESSION ) \r
+ throw new IOException("MUX: Invalid session id "+sessid);\r
+ // Get or create the appropriate session:\r
+ if ( sessid >= sessions.length ) {\r
+ MuxSession ns[] = new MuxSession[sessid+MUX.SESSIONS_INCR];\r
+ System.arraycopy(sessions, 0, ns, 0, sessions.length);\r
+ sessions = ns;\r
+ } \r
+ return sessions[sessid];\r
+ }\r
+\r
+ /**\r
+ * This stream is dying, clean it up.\r
+ * It is up to the caller to make sure all existing sessions have been\r
+ * terminated (gracefully or not).\r
+ * <p>This will shutdown all realted threads, and close the transport \r
+ * streams.\r
+ */\r
+\r
+ private synchronized void cleanup() {\r
+ alive = false;\r
+ // Cleanup the reader and writer objects:\r
+ reader.shutdown();\r
+ writer.shutdown();\r
+ reader = null;\r
+ writer = null;\r
+ // Close streams:\r
+ try {\r
+ in.close();\r
+ out.close();\r
+ } catch (IOException ex) {\r
+ }\r
+ in = null;\r
+ out = null;\r
+ }\r
+\r
+ /**\r
+ * Get this stream MuxWriter object.\r
+ * @return A MuxWriter instance.\r
+ */\r
+\r
+ protected final MuxWriter getMuxWriter() {\r
+ return writer;\r
+ }\r
+\r
+ /**\r
+ * A severe (fatal for that connection) errror has occured. Cleanup.\r
+ * @param obj The object that has generated the error.\r
+ * @param ex The exception that triggered the error (or <strong>null\r
+ * </strong> null if this was a logical error).\r
+ */\r
+\r
+ protected void error(Object obj, Exception ex) {\r
+ System.out.println("*** Fatal error on "+this);\r
+ ex.printStackTrace();\r
+ System.out.println("No recovery !");\r
+ System.exit(1);\r
+ }\r
+\r
+ /**\r
+ * A soft error has occured (eg socket close), Cleanup.\r
+ * @param obj The object that has detected the soft error.\r
+ * @param msg An associated String message.\r
+ */\r
+\r
+ protected synchronized void error(Object obj, String msg) {\r
+ // Is there any pending session ?\r
+ boolean problems = false;\r
+ synchronized(this) {\r
+ for (int i = 0 ; i < sessions.length ; i++) {\r
+ if ( sessions[i] != null ) \r
+ sessions[i].abort();\r
+ }\r
+ }\r
+ // If no problems, close socket, we're done:\r
+ cleanup();\r
+ }\r
+\r
+ /**\r
+ * Handle the given DefineString control message.\r
+ * @param strid The identifier for that String in the futur.\r
+ * @param str This String being defined.\r
+ */\r
+\r
+ protected void ctrlDefineString(int strid, String str) {\r
+ }\r
+\r
+ /**\r
+ * Handle the given DefineStack control message.\r
+ * @param id The identifier for that stack in the future.\r
+ * @param stack The stack description (as an array of shorts).\r
+ */\r
+\r
+ protected void ctrlDefineStack(int id, int stack[]) \r
+ throws IOException\r
+ {\r
+ }\r
+\r
+ /**\r
+ * Handle the given MuxControl control message.\r
+ * @param sessid The session to which that message applies.\r
+ * @param fragsz The max allowed fragment size on that session.\r
+ */\r
+\r
+ protected void ctrlMuxControl(int sessid, int fragsz)\r
+ throws IOException\r
+ {\r
+ MuxSession session = lookupSession(sessid, true);\r
+ session.notifyControl(fragsz);\r
+ }\r
+\r
+ /**\r
+ * Handle the given SendCredit control message.\r
+ * @param sessid The session to which that message applies.\r
+ * @param credit The allowed credits.\r
+ */\r
+\r
+ protected void ctrlSendCredit(int sessid, int credit)\r
+ throws IOException\r
+ {\r
+ MuxSession session = lookupSession(sessid, true);\r
+ session.notifyCredit(credit);\r
+ }\r
+\r
+ /**\r
+ * Handle that new incomming message.\r
+ * This method is called by the reader of that session, to dispatch\r
+ * the message currently being read.\r
+ * @return A MuxSession instance to dispatch that message to, or\r
+ * <strong>null</strong> otherwise (ie a new session was rejected, etc).\r
+ * In that last case, it is up to the reader of that session to discard \r
+ * any pending data.\r
+ */\r
+\r
+ protected MuxSession lookupSession(int flags\r
+ , int sessid\r
+ , int length\r
+ , int llength) \r
+ throws IOException\r
+ {\r
+ MuxSession session = checkSession(sessid);\r
+ if (session == null) {\r
+ if ((flags & MUX.SYN) != 0) {\r
+ // Length really means protid in that case:\r
+ session = acceptSession(flags, sessid, length);\r
+ } else if ((flags & MUX.FIN) != MUX.FIN) {\r
+ // We don't know about that session, emit some reset:\r
+ System.out.println(this+": RST (lookup) session "+sessid);\r
+ if ((flags & MUX.RST) != MUX.RST) {\r
+ // Above test breaks a nasty loop !\r
+ writer.writeMessage(sessid, MUX.RST, 0);\r
+ }\r
+ }\r
+ }\r
+ return session;\r
+ }\r
+\r
+ /**\r
+ * Lookup for an already existing session having the given identifier.\r
+ * @param sessid The identifier of the session to look for.\r
+ * @param check Is <strong>null</strong> a valid answer, if set and\r
+ * the requested session doesn't exist, a runtime exception is thrown.\r
+ * @return A MuxSession instance, or <strong>null</strong> if check is\r
+ * <strong>false</strong> and no session was found.\r
+ */\r
+\r
+ protected synchronized MuxSession lookupSession(int sessid\r
+ , boolean check) {\r
+ if ( sessid < sessions.length ) {\r
+ MuxSession session = sessions[sessid];\r
+ if ( session != null )\r
+ return session;\r
+ }\r
+ if ( check ) {\r
+ throw new RuntimeException("MuxStream:lookupSession: "\r
+ + " invalid session id "\r
+ + sessid + ".");\r
+ }\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Unregiter the given session, it has been closed.\r
+ * @param session The session to unregister.\r
+ */\r
+\r
+ protected synchronized void unregisterSession(MuxSession session) {\r
+ sessions[session.getIdentifier()] = null;\r
+ }\r
+\r
+ /**\r
+ * Create a new MUX session, by connecting to the other end.\r
+ * @param protid The protocol that is going to be spoken on that new \r
+ * session.\r
+ * @return A connected MuxSession.\r
+ * @exception IOException If the connection couldn't be set up properly.\r
+ */\r
+\r
+ public MuxSession connect(int protid) \r
+ throws IOException\r
+ {\r
+ // Is this stream still alive ?\r
+ synchronized(this) {\r
+ if ( ! alive )\r
+ throw new IOException("Broken mux stream");\r
+ }\r
+ // Allocate a new session identifier:\r
+ MuxSession session = allocateSession(protid);\r
+ // If SYN with long-length not set accepted, uncomment following:\r
+ // writer.writeMessage(session.getIdentifier(), MUX.SYN, protid);\r
+ writer.writeMessage(session.getIdentifier()\r
+ , MUX.SYN\r
+ , protid\r
+ , null, 0, 0);\r
+ return session;\r
+ }\r
+\r
+ /**\r
+ * Get the InetAddress associated with that MUX stream, if any.\r
+ * MUX streams can run on any kind of Input/Output streams. This method\r
+ * will only return a non-null instance when possible.\r
+ * @return An InetAddress instance, or <strong>null</strong> if not\r
+ * available.\r
+ */\r
+\r
+ public InetAddress getInetAddress() {\r
+ return inetaddr;\r
+ }\r
+\r
+ /**\r
+ * Shutdown this stream, and associated sessions gracefully.\r
+ * @param force If <strong>true</strong> abort all existing sessions, and\r
+ * close the muxed streams physically. Otherwise, shutdown the muxed stream\r
+ * gracefully only if no more sessions are running.\r
+ * @return A boolean, <strong>true</strong> if shutdown was performed,\r
+ * <strong>false</strong> if it was not performed because <em>force</em>\r
+ * was <strong>false</strong> and some sessions were still running.\r
+ * @exception IOException If some IO error occured.\r
+ */\r
+\r
+ public synchronized boolean shutdown(boolean force) \r
+ throws IOException\r
+ {\r
+ // Has this stream already been killed ?\r
+ if ( ! alive )\r
+ return true;\r
+ boolean terminate = true;\r
+ // Check sessions status:\r
+ if ( force ) {\r
+ for (int i = 0 ; i < sessions.length ; i++) {\r
+ MuxSession s = sessions[i];\r
+ if ( s != null )\r
+ s.abort();\r
+ }\r
+ } else {\r
+ for (int i = 0 ; i < sessions.length ; i++) {\r
+ if ( sessions[i] != null ) {\r
+ terminate = false;\r
+ break;\r
+ }\r
+ }\r
+ }\r
+ if ( terminate ) \r
+ cleanup();\r
+ return terminate;\r
+ }\r
+ \r
+ public MuxStream(boolean server\r
+ , MuxStreamHandler handler\r
+ , InputStream in\r
+ , OutputStream out) \r
+ throws IOException\r
+ {\r
+ this.server = server;\r
+ this.handler = handler;\r
+ this.in = in;\r
+ this.out = out;\r
+ this.reader = new MuxReader(this, in);\r
+ this.writer = new MuxWriter(this, out);\r
+ this.sessions = new MuxSession[8];\r
+ this.reader.start();\r
+ }\r
+\r
+ public MuxStream(boolean server, MuxStreamHandler handler, Socket socket) \r
+ throws IOException\r
+ {\r
+ this(server\r
+ , handler\r
+ , socket.getInputStream()\r
+ , socket.getOutputStream());\r
+ this.inetaddr = socket.getInetAddress();\r
+ }\r
+\r
+}\r