--- /dev/null
+// MuxOutputStream.java\r
+// $Id: MuxOutputStream.java,v 1.1 2010/06/15 12:26:34 smhuang Exp $\r
+// (c) COPYRIGHT MIT and INRIA, 1996.\r
+// Please first read the full copyright statement in file COPYRIGHT.html\r
+\r
+package org.w3c.www.mux;\r
+\r
+import java.io.IOException;\r
+import java.io.OutputStream;\r
+import java.io.PrintStream;\r
+\r
+public class MuxOutputStream extends OutputStream {\r
+ protected static final boolean debug = false;\r
+\r
+ /**\r
+ * The session this stream is attached to.\r
+ */\r
+ protected MuxSession session = null;\r
+ /**\r
+ * The identifier of above session (fast access).\r
+ */\r
+ protected int sessid = -1;\r
+ /**\r
+ * The writer instance for the multiplexed stream.\r
+ */\r
+ protected MuxWriter writer = null;\r
+ /**\r
+ * The current max allowed fragment size.\r
+ */\r
+ protected int fragsz = MUX.SENDER_DEFAULT_FRAGMENT_SIZE;\r
+ /**\r
+ * The currently available credit.\r
+ */\r
+ protected int avail_credit = MUX.SENDER_DEFAULT_CREDIT;\r
+ /**\r
+ * Has this stream been closed ?\r
+ */\r
+ protected boolean closed = false;\r
+\r
+ /**\r
+ * Callback notifying that more credit is available for that stream.\r
+ * @param credit The credit we are getting from our peer.\r
+ */\r
+\r
+ protected synchronized void notifyCredit(int credit) {\r
+ if ( debug )\r
+ System.out.println("> notifyCredit["+sessid+"]: "+credit);\r
+ avail_credit += credit;\r
+ notifyAll();\r
+ }\r
+\r
+ /**\r
+ * Callback notifying the the frgament size has changed.\r
+ * @param control The new fragment size.\r
+ */\r
+\r
+ protected synchronized void notifyControl(int control) {\r
+ if ( debug )\r
+ System.out.println("notifyControl: "+control);\r
+ fragsz = control;\r
+ }\r
+\r
+ /**\r
+ * Emit the given data on current session.\r
+ * @param b The buffer containing the data to be emitted.\r
+ * @param off Offset of data within above buffer.\r
+ * @param len Length of data to be written,\r
+ */\r
+\r
+ private synchronized void send(byte b[], int off, int len) \r
+ throws IOException\r
+ {\r
+ // Otherwise perform:\r
+ while (len > 0) {\r
+ // Make sure we have some remaining credit:\r
+ while ( avail_credit <= 0 ) {\r
+ // If closed, trigger an error:\r
+ if ( closed ) \r
+ throw new IOException("Broken pipe");\r
+ writer.flush();\r
+ try {\r
+ wait();\r
+ } catch (InterruptedException ex) {\r
+ throw new IOException("Interrupted IO !");\r
+ }\r
+ }\r
+ // Chunk (if needed) until all available credit has been consumed\r
+ while ( avail_credit > 0 ) {\r
+ if ( fragsz <= 0 ) {\r
+ int sz = Math.min(avail_credit, len);\r
+ writer.writeData(sessid, b, off, sz);\r
+ len -= sz;\r
+ off += sz;\r
+ avail_credit -= sz;\r
+ } else if (len < fragsz ) {\r
+ // No fragmentation needed, we can sink all our data:\r
+ writer.writeData(sessid, b, off, len);\r
+ avail_credit -= len;\r
+ return;\r
+ } else {\r
+ // Emit only one single chunk:\r
+ writer.writeData(sessid, b, off, fragsz);\r
+ len -= fragsz;\r
+ off += fragsz;\r
+ avail_credit -= fragsz;\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Writes a byte. This method will block until the byte is actually\r
+ * written.\r
+ * It is <em>highly</em> recomended that you use a buffered output\r
+ * stream on top of that stream, or that you don't use that method.\r
+ * @param b the byte\r
+ * @exception IOException If an I/O error has occurred.\r
+ */\r
+\r
+ public void write(int b) \r
+ throws IOException\r
+ {\r
+ byte bits[] = new byte[1];\r
+ bits[0] = (byte) (b&0xff);\r
+ write(bits, 0, 1);\r
+ }\r
+\r
+ /**\r
+ * Writes a sub array of bytes. \r
+ * @param b the data to be written\r
+ * @param off the start offset in the data\r
+ * @param len the number of bytes that are written\r
+ * @exception IOException If an I/O error has occurred.\r
+ */\r
+\r
+ public void write(byte b[], int off, int len) \r
+ throws IOException\r
+ {\r
+ send(b, off, len);\r
+ }\r
+\r
+ /**\r
+ * Flush that output stream, blocking all data has been sent.\r
+ * @exception IOException If some IO errors occur.\r
+ */\r
+\r
+ public void flush() \r
+ throws IOException\r
+ {\r
+ writer.flush();\r
+ }\r
+\r
+ /**\r
+ * Close that session output stream.\r
+ * @exception IOException If some IO errors occur.\r
+ */\r
+\r
+ public synchronized void close()\r
+ throws IOException\r
+ {\r
+ if ( closed )\r
+ return;\r
+ closed = true;\r
+ session.sendFIN();\r
+ notifyAll();\r
+ return ;\r
+ }\r
+\r
+ protected MuxOutputStream(MuxSession session) {\r
+ this.session = session;\r
+ this.sessid = session.getIdentifier();\r
+ this.writer = session.getMuxStream().getMuxWriter();\r
+ }\r
+\r
+}\r