--- /dev/null
+// MuxWriter.java\r
+// $Id: MuxWriter.java,v 1.1 2010/06/15 12:26:33 smhuang Exp $\r
+// (c) COPYRIGHT MIT and INRIA, 1996.\r
+// Please first read the full copyright statement in file COPYRIGHT.html\r
+\r
+package org.w3c.www.mux;\r
+\r
+import java.io.IOException;\r
+import java.io.OutputStream;\r
+import java.io.PrintStream;\r
+\r
+/**\r
+ * This class is dumb. It does no control flow, and nothing clever, just\r
+ * emit appropriate MUX headers before sending some data.\r
+ * <p>The flow control is handled on a per session basis, by both the\r
+ * MuxSession class, and the MuxOutputStream class.\r
+ * @see MuxSession\r
+ * @see MuxOutputStream\r
+ */\r
+\r
+class MuxWriter implements MUX {\r
+ private static final byte padbytes[] = {\r
+ (byte) 0, (byte) 0, (byte) 0, (byte) 0,\r
+ (byte) 0, (byte) 0, (byte) 0, (byte) 0\r
+ };\r
+ private static final boolean debug = true;\r
+ /**\r
+ * The MUX Stream this writer is working for.\r
+ */\r
+ protected MuxStream stream = null;\r
+ /**\r
+ * The output buffer.\r
+ */\r
+ protected byte buffer[] = new byte[MUX.WRITER_BUFFER_SIZE] ;\r
+ /**\r
+ * The current buffer size.\r
+ */\r
+ protected int buflen = 0 ;\r
+ /**\r
+ * The current buffer pointer.\r
+ */\r
+ protected int bufptr = 0;\r
+ /**\r
+ * The output stream to write data to.\r
+ */\r
+ protected OutputStream out = null ;\r
+\r
+ /**\r
+ * Can we get this capacity from our buffer.\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * Make best effort (ie flush) to try getting the requested capacity. If\r
+ * success, than return <strong>true</strong> otherwise, return \r
+ * <strong>false</strong>.\r
+ * @param capacity Requested capacity.\r
+ * @return A boolean <strong>true</strong if requested capacity is \r
+ * available.\r
+ * @exception IOException If flushing the buffer trigered some IO errors.\r
+ */\r
+\r
+ private boolean ensureCapacity (int capacity) \r
+ throws IOException\r
+ {\r
+ if ( bufptr + buflen + capacity < buffer.length ) {\r
+ return true ;\r
+ } else if (buffer.length < capacity) {\r
+ flush() ;\r
+ return true ;\r
+ } else {\r
+ return false ;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Encode a word (little endian)\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * <p>The caller is assumed to make sure the required capacity is\r
+ * available.\r
+ * @param word The word to encode.\r
+ */\r
+\r
+ private final void encodeWord(int word) {\r
+ int pos = bufptr+buflen;\r
+ buffer[pos++] = (byte) (word & 0x000000ff);\r
+ buffer[pos++] = (byte) ((word & 0x0000ff00) >> 8) ;\r
+ buffer[pos++] = (byte) ((word & 0x00ff0000) >> 16);\r
+ buffer[pos++] = (byte) ((word & 0xff000000) >> 24);\r
+ buflen += 4;\r
+ if ( debug )\r
+ System.out.println("[encodeWord] 0x"\r
+ + Integer.toString(buffer[bufptr], 16)\r
+ + Integer.toString(buffer[bufptr+1], 16)\r
+ + Integer.toString(buffer[bufptr+2], 16)\r
+ + Integer.toString(buffer[bufptr+3], 16));\r
+\r
+ }\r
+\r
+ /**\r
+ * Encode a short (little endian)\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * <p>The caller is assumed to make sure the required capacity is\r
+ * available.\r
+ * @param s The short to encode.\r
+ */\r
+\r
+ private final void encodeShort(short s) {\r
+ int pos = bufptr+buflen;\r
+ buffer[pos++] = (byte) (s & 0x00ff);\r
+ buffer[pos++] = (byte) ((s & 0xff00) >> 8) ;\r
+ buflen += 2;\r
+ }\r
+\r
+ /**\r
+ * Encode a small message.\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * @param flags The header flags.\r
+ * @param session The session.\r
+ * @param len The message length.\r
+ * @param into Target buffer.\r
+ * @param dst Target buffer position.\r
+ */\r
+\r
+ private final void encodeMessage (int flags\r
+ , int sessid\r
+ , int length)\r
+ throws IOException\r
+ {\r
+ ensureCapacity(4);\r
+ int word = (flags | ((sessid & 0xff) << 18) | length);\r
+ if ( debug ) \r
+ System.out.println("sending h="+Integer.toString(word, 16));\r
+ encodeWord(word);\r
+ }\r
+\r
+ /**\r
+ * Encode a big message.\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * @param flags The header flags.\r
+ * @param session The session identifier.\r
+ * @param protocol The protocol identifier.\r
+ * @param len The message length.\r
+ */\r
+\r
+ private final void encodeLongMessage (int flags\r
+ , int sessid\r
+ , int protocol\r
+ , int length)\r
+ throws IOException\r
+ {\r
+ ensureCapacity(8);\r
+ int word = (flags | ((sessid & 0xff) << 18) | protocol);\r
+ if ( debug )\r
+ System.out.println("sending h="+Integer.toString(word, 16)\r
+ +", l="+Integer.toString(length, 16));\r
+ encodeWord(word);\r
+ encodeWord(length);\r
+ }\r
+\r
+ /**\r
+ * Emit the given chunk of data.\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * <p>The caller is reponsible for having emitted the right header\r
+ * before actually emitting that data.\r
+ */\r
+\r
+ private final void emitData(byte data[], int off, int len) \r
+ throws IOException\r
+ {\r
+ if ( len <= 0 )\r
+ return;\r
+ if (ensureCapacity(len)) {\r
+ // Just add to buffer:\r
+ System.arraycopy(data, off, buffer, bufptr+buflen, len);\r
+ buflen += len;\r
+ } else {\r
+ // Write through:\r
+ flush();\r
+ out.write(data, off, len);\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Emit the given String.\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * <p>The caller is reponsible for having emitted the right header\r
+ * @param str The String to be emitted.\r
+ * @param len Length of the String (or <strong>-1</strong> if not computed\r
+ * yet).\r
+ */\r
+\r
+ private final void emitData(String str, int len) \r
+ throws IOException\r
+ {\r
+ if ( len < 0 )\r
+ len = str.length();\r
+ if ( ! ensureCapacity(len)) \r
+ // FIXME\r
+ throw new RuntimeException("String to big to hold in buffer !");\r
+ str.getBytes(0, len, buffer, bufptr+buflen);\r
+ buflen += len;\r
+ }\r
+\r
+ /**\r
+ * Emit the given integer array as a short array (little endian).\r
+ * <p>The caller is responsible to synchronize access to that method.\r
+ * <p>The caller is reponsible for having emitted the right header\r
+ * @param a The array of int to be encoded as an array of shorts.\r
+ */\r
+\r
+ private final void emitShortArray(int a[]) \r
+ throws IOException\r
+ {\r
+ if ( ! ensureCapacity(a.length << 1) )\r
+ // FIXME\r
+ throw new RuntimeException("Array to bug to hold in buffer.");\r
+ for (int i = 0 ; i < a.length ; i++)\r
+ encodeShort((short) (a[i] & 0xffff));\r
+ }\r
+\r
+ /**\r
+ * Shutdown the writer.\r
+ */\r
+\r
+ protected synchronized void shutdown() {\r
+ buffer = null;\r
+ }\r
+\r
+ /**\r
+ * Flush current output buffer.\r
+ */\r
+\r
+ protected synchronized void flush() \r
+ throws IOException\r
+ {\r
+ if ( buflen > 0 ) {\r
+ out.write(buffer, bufptr, buflen);\r
+ bufptr = 0;\r
+ buflen = 0;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Write one message of output.\r
+ * @param sessid The session identifier.\r
+ * @param flags The flags of that message.\r
+ * @param protid The protocol identifier.\r
+ * @param b The buffer containing the data to write.\r
+ * @param o Offset of data within above buffer.\r
+ * @param l Length of data to be written.\r
+ */\r
+\r
+ protected synchronized void writeMessage (int sessid\r
+ , int flags\r
+ , int protocol\r
+ , byte b[], int o, int l) \r
+ throws IOException\r
+ {\r
+ encodeLongMessage(flags, sessid, protocol, l);\r
+ emitData(b, o, l) ;\r
+ }\r
+\r
+ /**\r
+ * Write one message of output.\r
+ * @param sessid The session identifier.\r
+ * @param flags The flags of that message.\r
+ * @param protid The protocol identifier.\r
+ */\r
+\r
+ protected synchronized void writeMessage (int sessid\r
+ , int flags\r
+ , int protocol)\r
+ throws IOException\r
+ {\r
+ encodeMessage(flags, sessid, protocol);\r
+ }\r
+\r
+ /**\r
+ * Short cut to write data on a given session.\r
+ * @param sessid The session to write data to.\r
+ * @param b The buffer containing the data to be written.\r
+ * @param o Offset of data within above buffer.\r
+ * @param l Length of data to be written.\r
+ */\r
+\r
+ protected synchronized void writeData (int sessid, byte b[], int o, int l) \r
+ throws IOException\r
+ {\r
+ encodeMessage(0, sessid, l);\r
+ if ( l > 0 ) {\r
+ // Emit raw data first:\r
+ emitData(b, o, l);\r
+ // Emit padding bytes as needed:\r
+ int padlen = ((l & 0x3) != 0 ) ? (4 - (l & 0x3)) : 0 ;\r
+ if (padlen != 0)\r
+ emitData(padbytes, 0, padlen);\r
+ }\r
+ }\r
+\r
+ protected void ctrlDefineString(int stackid, String id) \r
+ throws IOException\r
+ {\r
+ int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags\r
+ | (MUX.CTRL_DEFINE_STRING << 26) // opcode\r
+ | (stackid & MUX.LENGTH)); // stack identifier\r
+ int len = id.length();\r
+ synchronized(this) {\r
+ encodeWord(word);\r
+ encodeWord(len);\r
+ emitData(id, len);\r
+ }\r
+ }\r
+\r
+ protected void ctrlDefineStack(int id, int stack[])\r
+ throws IOException\r
+ {\r
+ int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags\r
+ | (MUX.CTRL_DEFINE_STACK << 26) // opcode\r
+ | (id & MUX.LENGTH));\r
+ int len = (stack.length << 1);\r
+ synchronized(this) {\r
+ encodeWord(word);\r
+ encodeWord(len);\r
+ emitShortArray(stack);\r
+ }\r
+ }\r
+\r
+ protected void ctrlMuxControl(int sessid, int fragsz) \r
+ throws IOException\r
+ {\r
+ int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags\r
+ | (MUX.CTRL_MUX_CONTROL << 26) // opcode\r
+ | (sessid << 18) // session id\r
+ | (fragsz & MUX.LENGTH)); // frag size\r
+ synchronized(this) {\r
+ encodeWord(word);\r
+ encodeWord(0);\r
+ }\r
+ }\r
+\r
+ protected void ctrlSendCredit(int sessid, int credit) \r
+ throws IOException\r
+ {\r
+ int word = ((MUX.LONG_LENGTH | MUX.CONTROL) // flags\r
+ | (MUX.CTRL_SEND_CREDIT << 26) // opcode\r
+ | (sessid << 18)); // session id\r
+ synchronized(this) {\r
+ encodeWord(word);\r
+ encodeWord(credit);\r
+ }\r
+ }\r
+\r
+ protected synchronized boolean needsFlush() {\r
+ return buflen > 0;\r
+ }\r
+\r
+ MuxWriter(MuxStream stream, OutputStream out) \r
+ throws IOException\r
+ {\r
+ this.stream = stream;\r
+ this.out = out;\r
+ this.buffer = new byte[MUX.WRITER_BUFFER_SIZE];\r
+ this.bufptr = 0;\r
+ this.buflen = 0;\r
+ }\r
+\r
+}\r