Adding JMCR-Stable version
[Benchmarks_CSolver.git] / JMCR-Stable / real-world application / MyDerby-10.3 / java / engine / org / apache / derby / impl / store / access / sort / MergeSort.java
diff --git a/JMCR-Stable/real-world application/MyDerby-10.3/java/engine/org/apache/derby/impl/store/access/sort/MergeSort.java b/JMCR-Stable/real-world application/MyDerby-10.3/java/engine/org/apache/derby/impl/store/access/sort/MergeSort.java
new file mode 100644 (file)
index 0000000..3915160
--- /dev/null
@@ -0,0 +1,768 @@
+/*\r
+\r
+   Derby - Class org.apache.derby.impl.store.access.sort.MergeSort\r
+\r
+   Licensed to the Apache Software Foundation (ASF) under one or more\r
+   contributor license agreements.  See the NOTICE file distributed with\r
+   this work for additional information regarding copyright ownership.\r
+   The ASF licenses this file to you under the Apache License, Version 2.0\r
+   (the "License"); you may not use this file except in compliance with\r
+   the License.  You may obtain a copy of the License at\r
+\r
+      http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+   Unless required by applicable law or agreed to in writing, software\r
+   distributed under the License is distributed on an "AS IS" BASIS,\r
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+   See the License for the specific language governing permissions and\r
+   limitations under the License.\r
+\r
+ */\r
+\r
+package org.apache.derby.impl.store.access.sort;\r
+\r
+import org.apache.derby.iapi.reference.SQLState;\r
+\r
+import org.apache.derby.iapi.services.io.FormatableBitSet;\r
+\r
+import org.apache.derby.iapi.services.sanity.SanityManager;\r
+import org.apache.derby.iapi.error.StandardException;\r
+import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource;\r
+import org.apache.derby.iapi.store.access.conglomerate.Sort;\r
+import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;\r
+import org.apache.derby.iapi.types.CloneableObject;\r
+import org.apache.derby.iapi.store.access.ColumnOrdering;\r
+import org.apache.derby.iapi.store.access.RowUtil;\r
+import org.apache.derby.iapi.store.access.ScanController;\r
+import org.apache.derby.iapi.store.access.SortObserver;\r
+import org.apache.derby.iapi.store.access.SortController;\r
+import org.apache.derby.iapi.store.access.TransactionController;\r
+\r
+import org.apache.derby.iapi.store.raw.StreamContainerHandle;\r
+import org.apache.derby.iapi.store.raw.RawStoreFactory;\r
+import org.apache.derby.iapi.store.raw.Transaction;\r
+\r
+import org.apache.derby.iapi.types.DataValueDescriptor;\r
+\r
+import org.apache.derby.iapi.types.Orderable;\r
+\r
+import java.util.Enumeration;\r
+import java.util.Properties;\r
+import java.util.Vector;\r
+\r
+/**\r
+\r
+  A sort implementation which does the sort in-memory if it can,\r
+  but which can do an external merge sort so that it can sort an\r
+  arbitrary number of rows.\r
+\r
+**/\r
+\r
+final class MergeSort implements Sort\r
+{\r
+\r
+       /*\r
+        * Fields\r
+        */\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_CLOSED = 0;\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_INITIALIZED = 1;\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_INSERTING = 2;\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_DONE_INSERTING = 3;\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_SCANNING = 4;\r
+\r
+       /**\r
+       **/\r
+       private static final int STATE_DONE_SCANNING = 5;\r
+\r
+       /**\r
+       Maintains the current state of the sort as defined in\r
+       the preceding values.  Sorts start off and end up closed.\r
+       **/\r
+       private int state = STATE_CLOSED;\r
+\r
+       /**\r
+       The template as passed in on create.  Valid when the state\r
+       is INITIALIZED through SCANNING, null otherwise.\r
+       **/\r
+       protected DataValueDescriptor[] template;\r
+\r
+       /**\r
+       The column ordering as passed in on create.  Valid when\r
+       the state is INITIALIZED through SCANNING, null otherwise.\r
+       May be null if there is no column ordering - this means\r
+       that all rows are considered to be duplicates, and the\r
+       sort will only emit a single row.\r
+       **/\r
+       protected ColumnOrdering columnOrdering[];\r
+\r
+       /**\r
+    A lookup table to speed up lookup of a column associated with the i'th\r
+    column to compare.  To find the column id to compare as the i'th column\r
+    look in columnOrderingMap[i].\r
+       **/\r
+       private int columnOrderingMap[];\r
+\r
+       /**\r
+    A lookup table to speed up lookup of Ascending state of a column, \r
+       **/\r
+       private boolean columnOrderingAscendingMap[];\r
+\r
+       /**\r
+       The sort observer.  May be null.  Used as a callback.\r
+       **/\r
+       SortObserver sortObserver;\r
+\r
+       /**\r
+       Whether the rows are expected to be in order on insert,\r
+       as passed in on create.\r
+       **/\r
+       protected boolean alreadyInOrder;\r
+\r
+       /**\r
+       The inserter that's being used to insert rows into the sort.\r
+       This field is only valid when the state is INSERTING.\r
+       **/\r
+       private MergeInserter inserter = null;\r
+\r
+       /**\r
+       The scan that's being used to return rows from the sort.\r
+       This field is only valid when the state is SCANNING.\r
+       **/\r
+       private Scan scan = null;\r
+\r
+       /**\r
+       A vector of merge runs, produced by the MergeInserter.\r
+       Might be null if no merge runs were produced.\r
+       It is a vector of container ids.\r
+       **/\r
+       private Vector mergeRuns = null;\r
+\r
+       /**\r
+       An ordered set of the leftover rows that didn't go\r
+       in the last merge run (might be all the rows if there\r
+       are no merge runs).\r
+       **/\r
+       private SortBuffer sortBuffer = null;\r
+\r
+       /**\r
+       The maximum number of entries a sort buffer can hold.\r
+       **/\r
+       int sortBufferMax;\r
+\r
+       /**\r
+       The minimum number of entries a sort buffer can hold.\r
+       **/\r
+       int sortBufferMin;\r
+\r
+       /**\r
+       Properties for mergeSort\r
+       **/\r
+       static Properties properties = null;\r
+\r
+    /**\r
+    Static initializer for MergeSort, to initialize once the properties\r
+       for the sortBuffer.  \r
+       **/\r
+    static\r
+    {\r
+               properties = new Properties();\r
+               properties.put(RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER, "16384");\r
+    }\r
+\r
+       /*\r
+        * Methods of Sort\r
+        */\r
+\r
+       /**\r
+       Open a sort controller.\r
+       <p>\r
+       This implementation only supports a single sort controller\r
+       per sort.\r
+       @see Sort#open\r
+       **/\r
+       public SortController open(TransactionManager tran)\r
+               throws StandardException\r
+       {\r
+               if (SanityManager.DEBUG)\r
+                       SanityManager.ASSERT(state == STATE_INITIALIZED);\r
+\r
+               // Ready to start inserting rows.\r
+               state = STATE_INSERTING;\r
+\r
+               // Create and initialize an inserter.  When the caller\r
+               // closes it, it will call back to inserterIsClosed().\r
+               this.inserter = new MergeInserter();\r
+               if (this.inserter.initialize(this, tran) == false)\r
+        {\r
+                       throw StandardException.newException(SQLState.SORT_COULD_NOT_INIT);\r
+        }\r
+\r
+               return this.inserter;\r
+       }\r
+\r
+       /**\r
+       Open a scan controller.\r
+       @see Sort#openSortScan\r
+       **/\r
+\r
+       public ScanController openSortScan(\r
+    TransactionManager tran,\r
+    boolean            hold)\r
+                       throws StandardException\r
+       {\r
+               if (SanityManager.DEBUG)\r
+                       SanityManager.ASSERT(state == STATE_DONE_INSERTING);\r
+\r
+               if (mergeRuns == null || mergeRuns.size() == 0)\r
+               {\r
+                       // There were no merge runs so we can just return\r
+                       // the rows from the sort buffer.\r
+                       scan = new SortBufferScan(this, tran, sortBuffer, hold);\r
+\r
+                       // The scan now owns the sort buffer\r
+                       sortBuffer = null;\r
+               }\r
+               else\r
+               {\r
+                       // Dump the rows in the sort buffer to a merge run.\r
+                       long containerId = createMergeRun(tran, sortBuffer);\r
+                       mergeRuns.addElement(new Long(containerId));\r
+\r
+                       // If there are more merge runs than we can sort\r
+                       // at once with our sort buffer, we have to reduce\r
+                       // the number of merge runs\r
+                       if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||\r
+                               mergeRuns.size() > sortBuffer.capacity())\r
+                                       multiStageMerge(tran);\r
+\r
+                       // There are now few enough merge runs to sort\r
+                       // at once, so create a scan for them.\r
+                       MergeScan mscan = \r
+                new MergeScan(\r
+                    this, tran, sortBuffer, mergeRuns, sortObserver, hold);\r
+\r
+                       if (!mscan.init(tran))\r
+            {\r
+                throw StandardException.newException(\r
+                        SQLState.SORT_COULD_NOT_INIT);\r
+            }\r
+                       scan = mscan;\r
+\r
+                       // The scan now owns the sort buffer and merge runs.\r
+                       sortBuffer = null;\r
+                       mergeRuns = null;\r
+               }\r
+\r
+               // Ready to start retrieving rows.\r
+               this.state = STATE_SCANNING;\r
+\r
+               return scan;\r
+       }\r
+\r
+       /**\r
+       Open a row source to get rows out of the sorter.\r
+       @see Sort#openSortRowSource\r
+       **/\r
+       public ScanControllerRowSource openSortRowSource(TransactionManager tran)\r
+                       throws StandardException\r
+       {\r
+               if (SanityManager.DEBUG)\r
+                       SanityManager.ASSERT(state == STATE_DONE_INSERTING);\r
+\r
+               ScanControllerRowSource rowSource = null;\r
+\r
+               if (mergeRuns == null || mergeRuns.size() == 0)\r
+               {\r
+                       // There were no merge runs so we can just return\r
+                       // the rows from the sort buffer.\r
+                       scan = new SortBufferRowSource(sortBuffer, tran, sortObserver, false, sortBufferMax);\r
+                       rowSource = (ScanControllerRowSource)scan;\r
+\r
+                       // The scan now owns the sort buffer\r
+                       sortBuffer = null;\r
+               }\r
+               else\r
+               {\r
+                       // Dump the rows in the sort buffer to a merge run.\r
+                       long containerId = createMergeRun(tran, sortBuffer);\r
+                       mergeRuns.addElement(new Long(containerId));\r
+\r
+                       // If there are more merge runs than we can sort\r
+                       // at once with our sort buffer, we have to reduce\r
+                       // the number of merge runs\r
+                       if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||\r
+                               mergeRuns.size() > sortBuffer.capacity())       \r
+                               multiStageMerge(tran);\r
+\r
+                       // There are now few enough merge runs to sort\r
+                       // at once, so create a rowSource for them.\r
+                       MergeScanRowSource msRowSource = \r
+                               new MergeScanRowSource(this, tran, sortBuffer, mergeRuns, sortObserver, false);\r
+                       if (!msRowSource.init(tran))\r
+            {\r
+                throw StandardException.newException(\r
+                        SQLState.SORT_COULD_NOT_INIT);\r
+            }\r
+                       scan = msRowSource;\r
+                       rowSource = msRowSource;\r
+\r
+                       // The scan now owns the sort buffer and merge runs.\r
+                       sortBuffer = null;\r
+                       mergeRuns = null;\r
+               }\r
+\r
+               // Ready to start retrieving rows.\r
+               this.state = STATE_SCANNING;\r
+\r
+               return rowSource;\r
+       }\r
+\r
+\r
+\r
+       /**\r
+       Drop the sort.\r
+       @see Sort#drop\r
+       **/\r
+       public void drop(TransactionController tran)\r
+        throws StandardException\r
+       {\r
+               // Make sure the inserter is closed.  Note this\r
+               // will cause the callback to doneInserting()\r
+               // which will give us any in-progress merge\r
+               // runs, if there are any.\r
+               if (inserter != null)\r
+                       inserter.completedInserts();\r
+               inserter = null;\r
+\r
+               // Make sure the scan is closed, if there is one.\r
+               // This will cause the callback to doneScanning().\r
+               if (scan != null)\r
+               {\r
+                       scan.close();\r
+                       scan = null;\r
+               }\r
+\r
+               // If we have a row set, get rid of it.\r
+               if (sortBuffer != null)\r
+               {\r
+                       sortBuffer.close();\r
+                       sortBuffer = null;\r
+               }\r
+\r
+               // Clean out the rest of the objects.\r
+               template = null;\r
+               columnOrdering = null;\r
+               sortObserver = null;\r
+\r
+               // If there are any merge runs, drop them.\r
+               dropMergeRuns((TransactionManager)tran);\r
+\r
+               // Whew!\r
+               state = STATE_CLOSED;\r
+       }\r
+\r
+\r
+       /*\r
+        * Methods of MergeSort.  Arranged alphabetically.\r
+        */\r
+\r
+       /**\r
+       Check the column ordering against the template, making\r
+       sure that each column is present in the template,\r
+       implements Orderable, and is not mentioned more than\r
+       once.  Intended to be called as part of a sanity check.\r
+       **/\r
+       protected boolean checkColumnOrdering(\r
+    DataValueDescriptor[]   template, \r
+    ColumnOrdering          columnOrdering[])\r
+       {\r
+               // Allocate an array to check that each column mentioned only once.\r
+               int templateNColumns = template.length;\r
+               boolean seen[] = new boolean[templateNColumns];\r
+\r
+               // Check each column ordering.\r
+               for (int i = 0; i < columnOrdering.length; i++)\r
+               {\r
+                       int colid = columnOrdering[i].getColumnId();\r
+\r
+                       // Check that the column id is valid.\r
+                       if (colid < 0 || colid >= templateNColumns)\r
+                               return false;\r
+                       \r
+                       // Check that the column isn't mentioned more than once.\r
+                       if (seen[colid])\r
+                               return false;\r
+                       seen[colid] = true;\r
+\r
+                       Object columnVal = \r
+                RowUtil.getColumn(template, (FormatableBitSet) null, colid);\r
+\r
+                       if (!(columnVal instanceof Orderable))\r
+                               return false;\r
+               }\r
+\r
+               return true;\r
+       }\r
+\r
+       /**\r
+       Check that the columns in the row agree with the columns\r
+       in the template, both in number and in type.\r
+       <p>\r
+       XXX (nat) Currently checks that the classes implementing\r
+       each column are the same -- is this right?\r
+       **/\r
+       void checkColumnTypes(DataValueDescriptor[] row)\r
+               throws StandardException\r
+       {\r
+               int nCols = row.length;\r
+               if (template.length != nCols)\r
+               {\r
+            if (SanityManager.DEBUG)\r
+            {\r
+                SanityManager.THROWASSERT(\r
+                    "template.length (" + template.length +\r
+                    ") expected to be = to nCols (" +\r
+                    nCols + ")");\r
+            }\r
+            throw StandardException.newException(\r
+                    SQLState.SORT_TYPE_MISMATCH);\r
+               }\r
+\r
+        if (SanityManager.DEBUG)\r
+        {\r
+            for (int colid = 0; colid < nCols; colid++)\r
+            {\r
+                Object col1 = row[colid];\r
+                Object col2 = template[colid];\r
+                if (col1 == null)\r
+                               {\r
+                                       SanityManager.THROWASSERT(\r
+                                               "col[" + colid + "]  is null");\r
+                               }\r
+                                               \r
+                if (!(col1 instanceof CloneableObject))\r
+                               {\r
+                                       SanityManager.THROWASSERT(\r
+                                               "col[" + colid + "] (" +col1.getClass().getName()+\r
+                                               ") is not a CloneableObject.");\r
+                               }\r
+\r
+                if (col1.getClass() != col2.getClass())\r
+                {\r
+                    SanityManager.THROWASSERT(\r
+                        "col1.getClass() (" + col1.getClass() +\r
+                        ") expected to be the same as col2.getClass() (" +\r
+                        col2.getClass() + ")");\r
+                }\r
+            }\r
+        }\r
+       }\r
+\r
+       int compare(\r
+    DataValueDescriptor[] r1, \r
+    DataValueDescriptor[] r2)\r
+               throws StandardException\r
+       {\r
+               // Get the number of columns we have to compare.\r
+               int colsToCompare = this.columnOrdering.length;\r
+        int r;\r
+\r
+               // Compare the columns specified in the column\r
+               // ordering array.\r
+        for (int i = 0; i < colsToCompare; i++)\r
+        {\r
+                       // Get columns to compare.\r
+            int colid = this.columnOrderingMap[i];\r
+\r
+                       // If the columns don't compare equal, we're done.\r
+                       // Return the sense of the comparison.\r
+                       if ((r = r1[colid].compare(r2[colid])) \r
+                    != 0)\r
+                       {\r
+                               if (this.columnOrderingAscendingMap[i])\r
+                                       return r;\r
+                               else\r
+                                       return -r;\r
+                       }\r
+               }\r
+\r
+               // We made it through all the columns, and they must have\r
+               // all compared equal.  So return that the rows compare equal.\r
+               return 0;\r
+       }\r
+\r
+       /**\r
+       Go from the CLOSED to the INITIALIZED state.\r
+       **/\r
+       public void initialize(\r
+    DataValueDescriptor[]   template,\r
+    ColumnOrdering          columnOrdering[],\r
+    SortObserver            sortObserver,\r
+    boolean                 alreadyInOrder,\r
+    long                    estimatedRows,\r
+    int                     sortBufferMax)\r
+        throws StandardException\r
+       {\r
+        if (SanityManager.DEBUG)\r
+        {\r
+               SanityManager.ASSERT(state == STATE_CLOSED);\r
+       }\r
+\r
+               // Make sure the column ordering makes sense\r
+        if (SanityManager.DEBUG)\r
+        {\r
+               SanityManager.ASSERT(checkColumnOrdering(template, columnOrdering),\r
+                       "column ordering error");\r
+           }\r
+\r
+               // Set user-defined parameters.\r
+               this.template = template;\r
+               this.columnOrdering = columnOrdering;\r
+               this.sortObserver = sortObserver;\r
+               this.alreadyInOrder = alreadyInOrder;\r
+\r
+        // Cache results of columnOrdering calls, results are not allowed\r
+        // to change throughout a sort.\r
+        columnOrderingMap          = new int[columnOrdering.length];\r
+        columnOrderingAscendingMap = new boolean[columnOrdering.length];\r
+        for (int i = 0; i < columnOrdering.length; i++)\r
+        {\r
+            columnOrderingMap[i] = columnOrdering[i].getColumnId();\r
+            columnOrderingAscendingMap[i] = columnOrdering[i].getIsAscending();\r
+        }\r
+\r
+               // No inserter or scan yet.\r
+               this.inserter = null;\r
+               this.scan = null;\r
+\r
+               // We don't have any merge runs.\r
+               this.mergeRuns = null;\r
+               this.sortBuffer = null;\r
+               this.sortBufferMax = sortBufferMax;\r
+\r
+        if (estimatedRows > sortBufferMax)\r
+                       sortBufferMin = sortBufferMax;\r
+               else\r
+                       sortBufferMin = (int)estimatedRows;\r
+               if (SanityManager.DEBUG)\r
+        {\r
+            if (SanityManager.DEBUG_ON("testSort"))\r
+                sortBufferMin = sortBufferMax;\r
+        }\r
+\r
+               this.state = STATE_INITIALIZED;\r
+       }\r
+\r
+       /**\r
+       An inserter is closing.\r
+       **/\r
+       void doneInserting(MergeInserter inserter,\r
+               SortBuffer sortBuffer, Vector mergeRuns)\r
+       {\r
+        if (SanityManager.DEBUG)\r
+        {\r
+               SanityManager.ASSERT(state == STATE_INSERTING);\r
+       }\r
+\r
+               this.sortBuffer = sortBuffer;\r
+               this.mergeRuns = mergeRuns;\r
+               this.inserter = null;\r
+\r
+               this.state = STATE_DONE_INSERTING;\r
+       }\r
+\r
+       void doneScanning(Scan scan, SortBuffer sortBuffer)\r
+       {\r
+               if (SanityManager.DEBUG)\r
+               {\r
+                       // Make sure the scan we're getting back is the one we gave out\r
+\r
+                       if (this.scan != scan)\r
+                       SanityManager.THROWASSERT("this.scan = " + this.scan \r
+                                                                                 + " scan = " + scan);\r
+               }\r
+\r
+               this.sortBuffer = sortBuffer;\r
+               this.scan = null;\r
+\r
+               this.state = STATE_DONE_SCANNING;\r
+       }\r
+\r
+       void doneScanning(Scan scan, SortBuffer sortBuffer,\r
+               Vector mergeRuns)\r
+       {\r
+               this.mergeRuns = mergeRuns;\r
+\r
+               doneScanning(scan, sortBuffer);\r
+       }\r
+\r
+\r
+       /**\r
+       Get rid of the merge runs, if there are any.\r
+       Must not cause any errors because it's called\r
+       during error processing.\r
+       **/\r
+       void dropMergeRuns(TransactionManager tran)\r
+       {\r
+               if (mergeRuns != null)\r
+               {\r
+                       Enumeration e = mergeRuns.elements();\r
+\r
+                       try \r
+                       {\r
+                               Transaction rawTran = tran.getRawStoreXact();\r
+                               long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+\r
+                               while (e.hasMoreElements())\r
+                               {\r
+                                       long containerId = ((Long) e.nextElement()).longValue();\r
+                                       rawTran.dropStreamContainer(segmentId, containerId);\r
+                               }\r
+                       }\r
+                       catch (StandardException se)\r
+                       {\r
+                               // Ignore problems with dropping, worst case\r
+                               // the raw store will clean up at reboot.\r
+                       }\r
+                       mergeRuns = null;\r
+               }\r
+       }\r
+\r
+       /* DEBUG (nat)\r
+       void printRunInfo(TransactionController tran)\r
+               throws StandardException\r
+       {\r
+               java.util.Enumeration e = mergeRuns.elements();\r
+               while (e.hasMoreElements())\r
+               {\r
+                       long conglomid = ((Long) e.nextElement()).longValue();\r
+                       ScanController sc = tran.openScan(conglomid, false,\r
+                                                                       false, null, null, 0, null,\r
+                                                                       null, 0);\r
+                       System.out.println("Merge run: conglomid=" + conglomid);\r
+                       while (sc.next())\r
+                       {\r
+                               sc.fetch(template);\r
+                               System.out.println(template);\r
+                       }\r
+                       sc.close();\r
+               }\r
+       }\r
+       */\r
+\r
+       private void multiStageMerge(TransactionManager tran)\r
+               throws StandardException\r
+       {\r
+               Enumeration e;\r
+               //int iterations = 0; // DEBUG (nat)\r
+               int maxMergeRuns = sortBuffer.capacity();\r
+\r
+               if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN)\r
+                       maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN;\r
+\r
+               Vector subset;\r
+               Vector leftovers;\r
+\r
+               while (mergeRuns.size() > maxMergeRuns)\r
+               {\r
+                       // Move maxMergeRuns elements from the merge runs\r
+                       // vector into a subset, leaving the rest.\r
+                       subset = new Vector(maxMergeRuns);\r
+                       leftovers = new Vector(mergeRuns.size() - maxMergeRuns);\r
+                       e = mergeRuns.elements();\r
+                       while (e.hasMoreElements())\r
+                       {\r
+                               Long containerId = (Long) e.nextElement();\r
+                               if (subset.size() < maxMergeRuns)\r
+                                       subset.addElement(containerId);\r
+                               else\r
+                                       leftovers.addElement(containerId);\r
+                       }\r
+\r
+                       /* DEBUG (nat)\r
+                       iterations++;\r
+                               System.out.println(subset.size() + " elements in subset");\r
+                               System.out.println(leftovers.size() + " elements in leftovers");\r
+                               System.out.println(mergeRuns.size() + " elements in mergeRuns");\r
+                               System.out.println("maxMergeRuns is " + maxMergeRuns);\r
+                               System.out.println("iterations = " + iterations);\r
+                       if (subset.size() == 0)\r
+                       {\r
+                               System.exit(1);\r
+                       }\r
+                       */\r
+\r
+                       mergeRuns = leftovers;\r
+\r
+                       // Open a merge scan on the subset.\r
+                       MergeScanRowSource msRowSource = \r
+                               new MergeScanRowSource(this, tran, sortBuffer, subset, sortObserver, false);\r
+\r
+                       if (!msRowSource.init(tran))\r
+            {\r
+                throw StandardException.newException(\r
+                        SQLState.SORT_COULD_NOT_INIT);\r
+            }\r
+\r
+                       // Create and open another temporary stream conglomerate\r
+                       // which will become\r
+                       // a merge run made up with the merged runs from the subset.\r
+                       Transaction rawTran = tran.getRawStoreXact();\r
+                       int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+                       long id = rawTran.addAndLoadStreamContainer(segmentId,\r
+                               properties, msRowSource);\r
+\r
+                       mergeRuns.addElement(new Long(id));\r
+\r
+                       // Drop the conglomerates in the merge subset\r
+                       e = subset.elements();\r
+                       while (e.hasMoreElements())\r
+                       {\r
+                               Long containerId = (Long) e.nextElement();\r
+                               rawTran.dropStreamContainer(segmentId, containerId.longValue());\r
+                       }\r
+               }\r
+       }\r
+\r
+       /**\r
+       Remove all the rows from the sort buffer and store them\r
+       in a temporary conglomerate.  The temporary conglomerate\r
+       is a "merge run".  Returns the container id of the\r
+       merge run.\r
+       **/\r
+       long createMergeRun(TransactionManager tran, SortBuffer sortBuffer)\r
+               throws StandardException\r
+       {\r
+               // this sort buffer is not a scan and is not tracked by any\r
+               // TransactionManager. \r
+               SortBufferRowSource rowSource =\r
+                       new SortBufferRowSource(sortBuffer, (TransactionManager)null, sortObserver, true, sortBufferMax); \r
+\r
+               // Create a temporary stream conglomerate...\r
+               Transaction rawTran = tran.getRawStoreXact();  // get raw transaction\r
+               int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+               long id = rawTran.addAndLoadStreamContainer(segmentId,\r
+                       properties, rowSource);\r
+\r
+               // Don't close the sortBuffer, we just emptied it, the caller may reuse\r
+               // that sortBuffer for the next run.\r
+               rowSource = null;\r
+\r
+               return id;\r
+       }\r
+}\r