--- /dev/null
+/*\r
+\r
+ Derby - Class org.apache.derby.impl.store.access.sort.MergeScan\r
+\r
+ Licensed to the Apache Software Foundation (ASF) under one or more\r
+ contributor license agreements. See the NOTICE file distributed with\r
+ this work for additional information regarding copyright ownership.\r
+ The ASF licenses this file to you under the Apache License, Version 2.0\r
+ (the "License"); you may not use this file except in compliance with\r
+ the License. You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+\r
+ */\r
+\r
+package org.apache.derby.impl.store.access.sort;\r
+\r
+import java.util.Enumeration;\r
+import java.util.Vector;\r
+\r
+import org.apache.derby.iapi.services.sanity.SanityManager;\r
+import org.apache.derby.iapi.error.StandardException;\r
+import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;\r
+import org.apache.derby.iapi.store.access.SortObserver;\r
+import org.apache.derby.iapi.store.raw.StreamContainerHandle;\r
+import org.apache.derby.iapi.store.raw.Transaction;\r
+\r
+import org.apache.derby.iapi.types.DataValueDescriptor;\r
+\r
+// For JavaDoc references (i.e. @see)\r
+import org.apache.derby.iapi.store.access.conglomerate.ScanManager;\r
+\r
+/**\r
+ A sort scan that is capable of merging as many merge runs\r
+ as will fit in the passed-in sort buffer.\r
+**/\r
+\r
+public class MergeScan extends SortScan\r
+{\r
+ /**\r
+ The sort buffer we will use.\r
+ **/\r
+ protected SortBuffer sortBuffer;\r
+\r
+ /**\r
+ The merge runs.\r
+ **/\r
+ protected Vector mergeRuns;\r
+\r
+ /**\r
+ Array of scan controllers for the merge runs.\r
+ Entries in the array become null as the last\r
+ row is pulled out and the scan is closed.\r
+ **/\r
+ protected StreamContainerHandle openScans[];\r
+\r
+ private SortObserver sortObserver;\r
+\r
+ /*\r
+ * Constructors.\r
+ */\r
+\r
+ MergeScan(\r
+ MergeSort sort, \r
+ TransactionManager tran,\r
+ SortBuffer sortBuffer, \r
+ Vector mergeRuns,\r
+ SortObserver sortObserver,\r
+ boolean hold)\r
+ {\r
+ super(sort, tran, hold);\r
+ this.sortBuffer = sortBuffer;\r
+ this.mergeRuns = mergeRuns;\r
+ this.tran = tran;\r
+ this.sortObserver = sortObserver;\r
+ }\r
+\r
+ /*\r
+ * Methods of MergeSortScan\r
+ */\r
+\r
+ /**\r
+ Move to the next position in the scan.\r
+ @see org.apache.derby.iapi.store.access.ScanController#next\r
+ **/\r
+ public boolean next()\r
+ throws StandardException\r
+ {\r
+ current = sortBuffer.removeFirst();\r
+ if (current != null)\r
+ mergeARow(sortBuffer.getLastAux());\r
+ return (current != null);\r
+ }\r
+\r
+ /**\r
+ Close the scan.\r
+ @see org.apache.derby.iapi.store.access.ScanController#close\r
+ **/\r
+ public void close()\r
+ {\r
+ if (openScans != null)\r
+ {\r
+ for (int i = 0; i < openScans.length; i++)\r
+ {\r
+ if (openScans[i] != null)\r
+ {\r
+ openScans[i].close();\r
+ }\r
+ openScans[i] = null;\r
+ }\r
+ openScans = null;\r
+ }\r
+\r
+ // Hand sort buffer and remaining merge runs to sort.\r
+ if (super.sort != null)\r
+ {\r
+ sort.doneScanning(this, sortBuffer, mergeRuns);\r
+ sortBuffer = null;\r
+ mergeRuns = null;\r
+ }\r
+\r
+ // Sets sort to null\r
+ super.close();\r
+ }\r
+\r
+ /**\r
+ Close the scan.\r
+ @see ScanManager#closeForEndTransaction\r
+ **/\r
+ public boolean closeForEndTransaction(boolean closeHeldScan)\r
+ {\r
+ if (!hold || closeHeldScan)\r
+ {\r
+ close();\r
+ return(true);\r
+ }\r
+ else\r
+ {\r
+ return(false);\r
+ }\r
+ }\r
+\r
+ /*\r
+ * Methods of MergeScan\r
+ */\r
+\r
+ /**\r
+ Initialize the scan, returning false if there\r
+ was some error.\r
+ **/\r
+ public boolean init(TransactionManager tran)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ // We really expect to have at least one\r
+ // merge run.\r
+ SanityManager.ASSERT(mergeRuns != null);\r
+ SanityManager.ASSERT(mergeRuns.size() > 0);\r
+\r
+ // This sort scan also expects that the\r
+ // caller has ensured that the sort buffer\r
+ // capacity will hold a row from all the\r
+ // merge runs.\r
+ SanityManager.ASSERT(sortBuffer.capacity() >= mergeRuns.size());\r
+ }\r
+\r
+ // Clear the sort buffer.\r
+ sortBuffer.reset();\r
+\r
+ // Create an array to hold a scan controller\r
+ // for each merge run.\r
+ openScans = new StreamContainerHandle[mergeRuns.size()];\r
+ if (openScans == null)\r
+ return false;\r
+\r
+ // Open a scan on each merge run.\r
+ int scanindex = 0;\r
+ Enumeration e = mergeRuns.elements();\r
+ while (e.hasMoreElements())\r
+ {\r
+ // get the container id\r
+ long id = ((Long) e.nextElement()).longValue();\r
+\r
+ Transaction rawTran = tran.getRawStoreXact(); // get raw transaction\r
+ int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+ openScans[scanindex++] = \r
+ rawTran.openStreamContainer(segmentId, id, hold);\r
+ }\r
+\r
+ // Load the initial rows.\r
+ for (scanindex = 0; scanindex < openScans.length; scanindex++)\r
+ mergeARow(scanindex);\r
+\r
+ // Success!\r
+ return true;\r
+ }\r
+\r
+ /**\r
+ Insert rows while we keep getting duplicates \r
+ from the merge run whose scan is in the\r
+ open scan array entry indexed by scanindex.\r
+ **/\r
+ void mergeARow(int scanindex)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ // Unless there's a bug, the scan index will refer\r
+ // to an open scan. That's because we never put\r
+ // a scan index for a closed scan into the sort\r
+ // buffer (via setNextAux).\r
+ SanityManager.ASSERT(openScans[scanindex] != null);\r
+ }\r
+\r
+ DataValueDescriptor[] row;\r
+\r
+ // Read rows from the merge run and stuff them into the\r
+ // sort buffer for as long as we encounter duplicates.\r
+ do\r
+ {\r
+ row = sortObserver.getArrayClone();\r
+\r
+ // Fetch the row from the merge run.\r
+ if (!openScans[scanindex].fetchNext(row))\r
+ {\r
+ // If we're out of rows in the merge run, close the scan.\r
+ \r
+ openScans[scanindex].close();\r
+ openScans[scanindex] = null;\r
+ return;\r
+ }\r
+\r
+ // Save the index of this merge run with\r
+ // the row we're putting in the sort buffer.\r
+ sortBuffer.setNextAux(scanindex);\r
+ }\r
+ while (sortBuffer.insert(row) == SortBuffer.INSERT_DUPLICATE);\r
+ }\r
+}\r