--- /dev/null
+/*\r
+\r
+ Derby - org.apache.derby.impl.jdbc.PositionedStoreStream\r
+\r
+ Licensed to the Apache Software Foundation (ASF) under one\r
+ or more contributor license agreements. See the NOTICE file\r
+ distributed with this work for additional information\r
+ regarding copyright ownership. The ASF licenses this file\r
+ to you under the Apache License, Version 2.0 (the\r
+ "License"); you may not use this file except in compliance\r
+ with the License. You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing,\r
+ software distributed under the License is distributed on an\r
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\r
+ KIND, either express or implied. See the License for the\r
+ specific language governing permissions and limitations\r
+ under the License.\r
+\r
+ */\r
+package org.apache.derby.impl.jdbc;\r
+\r
+import java.io.EOFException;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+\r
+import org.apache.derby.iapi.error.StandardException;\r
+import org.apache.derby.iapi.types.Resetable;\r
+\r
+/**\r
+ * A wrapper-stream able to reposition the underlying store stream.\r
+ * <p>\r
+ * Where a user expects the underlying stream to be at a given position,\r
+ * {@link #reposition} must be called with the expected position first. A use\r
+ * case for this scenario is the LOB objects, where you can request a stream and\r
+ * at the same time (this does not mean concurrently) query the LOB about its\r
+ * length or ask to get a part of the LOB returned. Such multiplexed operations\r
+ * must result in consistent and valid data, and to achieve this the underlying\r
+ * store stream must be able to reposition itself.\r
+ *\r
+ * <em>Synchronization</em>: Access to instances of this class must be\r
+ * externally synchronized on the connection synchronization object. There are\r
+ * two reasons for this:\r
+ * <ul> <li>Access to store must be single threaded.\r
+ * <li>This class is not thread safe, and calling the various methods from\r
+ * different threads concurrently can result in inconsistent position\r
+ * values. To avoid redundant internal synchronization, this class\r
+ * assumes and <b>requires</b> external synchronization (also called\r
+ * client-side locking).\r
+ * </ul>\r
+ * @see EmbedConnection#getConnectionSynchronization\r
+ */\r
+//@NotThreadSafe\r
+public class PositionedStoreStream\r
+ extends InputStream\r
+ implements Resetable {\r
+\r
+ /** Underlying store stream serving bytes. */\r
+ //@GuardedBy("EmbedConnection.getConnectionSynchronization()")\r
+ private final InputStream stream;\r
+ /** Convenience reference to the stream as a resettable stream. */\r
+ //@GuardedBy("EmbedConnection.getConnectionSynchronization()")\r
+ private final Resetable resettable;\r
+ /**\r
+ * Position of the underlying store stream.\r
+ * Note that the position is maintained by this class, not the underlying\r
+ * store stream itself.\r
+ * <em>Future improvement</em>: Add this functionality to the underlying\r
+ * store stream itself to avoid another level in the stream stack.\r
+ */\r
+ //@GuardedBy("EmbedConnection.getConnectionSynchronization()")\r
+ private long pos = 0L;\r
+\r
+ /**\r
+ * Creates a positioned store stream on top of the specified resettable\r
+ * stream.\r
+ *\r
+ * @param in a {@link Resetable}-stream\r
+ * @throws ClassCastException if the inputstream does not implement\r
+ * {@link Resetable}\r
+ */\r
+ public PositionedStoreStream(InputStream in) {\r
+ this.stream = in;\r
+ this.resettable = (Resetable)in;\r
+ }\r
+\r
+ /**\r
+ * Reads a number of bytes from the underlying stream and stores them in the\r
+ * specified byte array.\r
+ *\r
+ * @return The actual number of bytes read, or -1 if the end of the stream\r
+ * is reached.\r
+ * @throws IOException if an I/O error occurs\r
+ */\r
+ public int read(byte[] b)\r
+ throws IOException {\r
+ int ret = this.stream.read(b);\r
+ this.pos += ret;\r
+ return ret;\r
+ }\r
+\r
+ /**\r
+ * Reads a number of bytes from the underlying stream and stores them in the\r
+ * specified byte array at the specified offset.\r
+ *\r
+ * @return The actual number of bytes read, or -1 if the end of the stream\r
+ * is reached.\r
+ * @throws IOException if an I/O error occurs\r
+ */\r
+ public int read(byte[] b, int off, int len)\r
+ throws IOException {\r
+ int ret = this.stream.read(b, off, len);\r
+ this.pos += ret;\r
+ return ret;\r
+ }\r
+\r
+ /**\r
+ * Reads a single byte from the underlying stream.\r
+ *\r
+ * @return The next byte of data, or -1 if the end of the stream is reached.\r
+ * @throws IOException if an I/O error occurs\r
+ */\r
+ public int read()\r
+ throws IOException {\r
+ int ret = this.stream.read();\r
+ if (ret > -1) {\r
+ this.pos++;\r
+ }\r
+ return ret;\r
+ }\r
+\r
+ /**\r
+ * Skips up to the specified number of bytes from the underlying stream.\r
+ *\r
+ * @return The actual number of bytes skipped.\r
+ * @throws IOException if an I/O error occurs\r
+ */\r
+ public long skip(long toSkip)\r
+ throws IOException {\r
+ long ret = this.stream.skip(toSkip);\r
+ this.pos += ret;\r
+ return ret;\r
+ }\r
+\r
+ /**\r
+ * Resets the resettable stream.\r
+ *\r
+ * @throws IOException\r
+ * @throws StandardException if resetting the stream in store fails\r
+ * @see Resetable#resetStream\r
+ */\r
+ public void resetStream()\r
+ throws IOException, StandardException {\r
+ this.resettable.resetStream();\r
+ this.pos = 0L;\r
+ }\r
+\r
+ /**\r
+ * Initialize the resettable stream for use.\r
+ *\r
+ * @throws StandardException if initializing the store in stream fails\r
+ * @see Resetable#initStream\r
+ */\r
+ public void initStream()\r
+ throws StandardException {\r
+ this.resettable.initStream();\r
+ this.pos = 0L;\r
+ }\r
+\r
+ /**\r
+ * Closes the resettable stream.\r
+ *\r
+ * @see Resetable#closeStream\r
+ */\r
+ public void closeStream() {\r
+ this.resettable.closeStream();\r
+ }\r
+\r
+ /**\r
+ * Repositions the underlying store stream to the requested position.\r
+ * <p>\r
+ * Repositioning is required because there can be several uses of the store\r
+ * stream, which changes the position of it. If a class is dependent on the\r
+ * underlying stream not changing its position, it must call reposition with\r
+ * the position it expects before using the stream again.\r
+ *\r
+ * @throws IOException if reading from the store stream fails\r
+ * @throws StandardException if resetting the store in stream fails, or\r
+ * some other exception happens in store\r
+ * @see #getPosition\r
+ */\r
+ public void reposition(long requestedPos)\r
+ throws IOException, StandardException {\r
+ if (this.pos < requestedPos) {\r
+ // Reposition from current position.\r
+ skipFully(requestedPos - this.pos);\r
+ this.pos = requestedPos;\r
+ } else if (this.pos > requestedPos) {\r
+ // Reposition from start.\r
+ this.resettable.resetStream();\r
+ skipFully(requestedPos);\r
+ this.pos = requestedPos;\r
+ }\r
+ }\r
+\r
+ /**\r
+ * Returns the current position of the underlying store stream.\r
+ *\r
+ * @return Current byte position of the store stream.\r
+ */\r
+ public long getPosition() {\r
+ return this.pos;\r
+ }\r
+\r
+ /**\r
+ * Skip exactly the requested number of bytes.\r
+ *\r
+ * @throws EOFException if EOF is reached before all bytes are skipped\r
+ * @throws IOException if reading from the stream fails\r
+ */\r
+ private void skipFully(long toSkip)\r
+ throws IOException {\r
+ long remaining = toSkip;\r
+ while (remaining > 0) {\r
+ long skippedNow = this.stream.skip(remaining);\r
+ if (skippedNow == 0) {\r
+ if (this.stream.read() == -1) {\r
+ throw new EOFException("Reached end-of-stream prematurely" +\r
+ ", with " + remaining + " byte(s) to go");\r
+ } else {\r
+ skippedNow = 1;\r
+ }\r
+ }\r
+ remaining -= skippedNow;\r
+ }\r
+ }\r
+} // End class PositionedStoreStream\r