--- /dev/null
+/*\r
+\r
+ Derby - Class org.apache.derby.impl.store.access.sort.MergeSort\r
+\r
+ Licensed to the Apache Software Foundation (ASF) under one or more\r
+ contributor license agreements. See the NOTICE file distributed with\r
+ this work for additional information regarding copyright ownership.\r
+ The ASF licenses this file to you under the Apache License, Version 2.0\r
+ (the "License"); you may not use this file except in compliance with\r
+ the License. You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+\r
+ */\r
+\r
+package org.apache.derby.impl.store.access.sort;\r
+\r
+import org.apache.derby.iapi.reference.SQLState;\r
+\r
+import org.apache.derby.iapi.services.io.FormatableBitSet;\r
+\r
+import org.apache.derby.iapi.services.sanity.SanityManager;\r
+import org.apache.derby.iapi.error.StandardException;\r
+import org.apache.derby.iapi.store.access.conglomerate.ScanControllerRowSource;\r
+import org.apache.derby.iapi.store.access.conglomerate.Sort;\r
+import org.apache.derby.iapi.store.access.conglomerate.TransactionManager;\r
+import org.apache.derby.iapi.types.CloneableObject;\r
+import org.apache.derby.iapi.store.access.ColumnOrdering;\r
+import org.apache.derby.iapi.store.access.RowUtil;\r
+import org.apache.derby.iapi.store.access.ScanController;\r
+import org.apache.derby.iapi.store.access.SortObserver;\r
+import org.apache.derby.iapi.store.access.SortController;\r
+import org.apache.derby.iapi.store.access.TransactionController;\r
+\r
+import org.apache.derby.iapi.store.raw.StreamContainerHandle;\r
+import org.apache.derby.iapi.store.raw.RawStoreFactory;\r
+import org.apache.derby.iapi.store.raw.Transaction;\r
+\r
+import org.apache.derby.iapi.types.DataValueDescriptor;\r
+\r
+import org.apache.derby.iapi.types.Orderable;\r
+\r
+import java.util.Enumeration;\r
+import java.util.Properties;\r
+import java.util.Vector;\r
+\r
+/**\r
+\r
+ A sort implementation which does the sort in-memory if it can,\r
+ but which can do an external merge sort so that it can sort an\r
+ arbitrary number of rows.\r
+\r
+**/\r
+\r
+final class MergeSort implements Sort\r
+{\r
+\r
+ /*\r
+ * Fields\r
+ */\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_CLOSED = 0;\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_INITIALIZED = 1;\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_INSERTING = 2;\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_DONE_INSERTING = 3;\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_SCANNING = 4;\r
+\r
+ /**\r
+ **/\r
+ private static final int STATE_DONE_SCANNING = 5;\r
+\r
+ /**\r
+ Maintains the current state of the sort as defined in\r
+ the preceding values. Sorts start off and end up closed.\r
+ **/\r
+ private int state = STATE_CLOSED;\r
+\r
+ /**\r
+ The template as passed in on create. Valid when the state\r
+ is INITIALIZED through SCANNING, null otherwise.\r
+ **/\r
+ protected DataValueDescriptor[] template;\r
+\r
+ /**\r
+ The column ordering as passed in on create. Valid when\r
+ the state is INITIALIZED through SCANNING, null otherwise.\r
+ May be null if there is no column ordering - this means\r
+ that all rows are considered to be duplicates, and the\r
+ sort will only emit a single row.\r
+ **/\r
+ protected ColumnOrdering columnOrdering[];\r
+\r
+ /**\r
+ A lookup table to speed up lookup of a column associated with the i'th\r
+ column to compare. To find the column id to compare as the i'th column\r
+ look in columnOrderingMap[i].\r
+ **/\r
+ private int columnOrderingMap[];\r
+\r
+ /**\r
+ A lookup table to speed up lookup of Ascending state of a column, \r
+ **/\r
+ private boolean columnOrderingAscendingMap[];\r
+\r
+ /**\r
+ The sort observer. May be null. Used as a callback.\r
+ **/\r
+ SortObserver sortObserver;\r
+\r
+ /**\r
+ Whether the rows are expected to be in order on insert,\r
+ as passed in on create.\r
+ **/\r
+ protected boolean alreadyInOrder;\r
+\r
+ /**\r
+ The inserter that's being used to insert rows into the sort.\r
+ This field is only valid when the state is INSERTING.\r
+ **/\r
+ private MergeInserter inserter = null;\r
+\r
+ /**\r
+ The scan that's being used to return rows from the sort.\r
+ This field is only valid when the state is SCANNING.\r
+ **/\r
+ private Scan scan = null;\r
+\r
+ /**\r
+ A vector of merge runs, produced by the MergeInserter.\r
+ Might be null if no merge runs were produced.\r
+ It is a vector of container ids.\r
+ **/\r
+ private Vector mergeRuns = null;\r
+\r
+ /**\r
+ An ordered set of the leftover rows that didn't go\r
+ in the last merge run (might be all the rows if there\r
+ are no merge runs).\r
+ **/\r
+ private SortBuffer sortBuffer = null;\r
+\r
+ /**\r
+ The maximum number of entries a sort buffer can hold.\r
+ **/\r
+ int sortBufferMax;\r
+\r
+ /**\r
+ The minimum number of entries a sort buffer can hold.\r
+ **/\r
+ int sortBufferMin;\r
+\r
+ /**\r
+ Properties for mergeSort\r
+ **/\r
+ static Properties properties = null;\r
+\r
+ /**\r
+ Static initializer for MergeSort, to initialize once the properties\r
+ for the sortBuffer. \r
+ **/\r
+ static\r
+ {\r
+ properties = new Properties();\r
+ properties.put(RawStoreFactory.STREAM_FILE_BUFFER_SIZE_PARAMETER, "16384");\r
+ }\r
+\r
+ /*\r
+ * Methods of Sort\r
+ */\r
+\r
+ /**\r
+ Open a sort controller.\r
+ <p>\r
+ This implementation only supports a single sort controller\r
+ per sort.\r
+ @see Sort#open\r
+ **/\r
+ public SortController open(TransactionManager tran)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ SanityManager.ASSERT(state == STATE_INITIALIZED);\r
+\r
+ // Ready to start inserting rows.\r
+ state = STATE_INSERTING;\r
+\r
+ // Create and initialize an inserter. When the caller\r
+ // closes it, it will call back to inserterIsClosed().\r
+ this.inserter = new MergeInserter();\r
+ if (this.inserter.initialize(this, tran) == false)\r
+ {\r
+ throw StandardException.newException(SQLState.SORT_COULD_NOT_INIT);\r
+ }\r
+\r
+ return this.inserter;\r
+ }\r
+\r
+ /**\r
+ Open a scan controller.\r
+ @see Sort#openSortScan\r
+ **/\r
+\r
+ public ScanController openSortScan(\r
+ TransactionManager tran,\r
+ boolean hold)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ SanityManager.ASSERT(state == STATE_DONE_INSERTING);\r
+\r
+ if (mergeRuns == null || mergeRuns.size() == 0)\r
+ {\r
+ // There were no merge runs so we can just return\r
+ // the rows from the sort buffer.\r
+ scan = new SortBufferScan(this, tran, sortBuffer, hold);\r
+\r
+ // The scan now owns the sort buffer\r
+ sortBuffer = null;\r
+ }\r
+ else\r
+ {\r
+ // Dump the rows in the sort buffer to a merge run.\r
+ long containerId = createMergeRun(tran, sortBuffer);\r
+ mergeRuns.addElement(new Long(containerId));\r
+\r
+ // If there are more merge runs than we can sort\r
+ // at once with our sort buffer, we have to reduce\r
+ // the number of merge runs\r
+ if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||\r
+ mergeRuns.size() > sortBuffer.capacity())\r
+ multiStageMerge(tran);\r
+\r
+ // There are now few enough merge runs to sort\r
+ // at once, so create a scan for them.\r
+ MergeScan mscan = \r
+ new MergeScan(\r
+ this, tran, sortBuffer, mergeRuns, sortObserver, hold);\r
+\r
+ if (!mscan.init(tran))\r
+ {\r
+ throw StandardException.newException(\r
+ SQLState.SORT_COULD_NOT_INIT);\r
+ }\r
+ scan = mscan;\r
+\r
+ // The scan now owns the sort buffer and merge runs.\r
+ sortBuffer = null;\r
+ mergeRuns = null;\r
+ }\r
+\r
+ // Ready to start retrieving rows.\r
+ this.state = STATE_SCANNING;\r
+\r
+ return scan;\r
+ }\r
+\r
+ /**\r
+ Open a row source to get rows out of the sorter.\r
+ @see Sort#openSortRowSource\r
+ **/\r
+ public ScanControllerRowSource openSortRowSource(TransactionManager tran)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ SanityManager.ASSERT(state == STATE_DONE_INSERTING);\r
+\r
+ ScanControllerRowSource rowSource = null;\r
+\r
+ if (mergeRuns == null || mergeRuns.size() == 0)\r
+ {\r
+ // There were no merge runs so we can just return\r
+ // the rows from the sort buffer.\r
+ scan = new SortBufferRowSource(sortBuffer, tran, sortObserver, false, sortBufferMax);\r
+ rowSource = (ScanControllerRowSource)scan;\r
+\r
+ // The scan now owns the sort buffer\r
+ sortBuffer = null;\r
+ }\r
+ else\r
+ {\r
+ // Dump the rows in the sort buffer to a merge run.\r
+ long containerId = createMergeRun(tran, sortBuffer);\r
+ mergeRuns.addElement(new Long(containerId));\r
+\r
+ // If there are more merge runs than we can sort\r
+ // at once with our sort buffer, we have to reduce\r
+ // the number of merge runs\r
+ if (mergeRuns.size() > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN ||\r
+ mergeRuns.size() > sortBuffer.capacity()) \r
+ multiStageMerge(tran);\r
+\r
+ // There are now few enough merge runs to sort\r
+ // at once, so create a rowSource for them.\r
+ MergeScanRowSource msRowSource = \r
+ new MergeScanRowSource(this, tran, sortBuffer, mergeRuns, sortObserver, false);\r
+ if (!msRowSource.init(tran))\r
+ {\r
+ throw StandardException.newException(\r
+ SQLState.SORT_COULD_NOT_INIT);\r
+ }\r
+ scan = msRowSource;\r
+ rowSource = msRowSource;\r
+\r
+ // The scan now owns the sort buffer and merge runs.\r
+ sortBuffer = null;\r
+ mergeRuns = null;\r
+ }\r
+\r
+ // Ready to start retrieving rows.\r
+ this.state = STATE_SCANNING;\r
+\r
+ return rowSource;\r
+ }\r
+\r
+\r
+\r
+ /**\r
+ Drop the sort.\r
+ @see Sort#drop\r
+ **/\r
+ public void drop(TransactionController tran)\r
+ throws StandardException\r
+ {\r
+ // Make sure the inserter is closed. Note this\r
+ // will cause the callback to doneInserting()\r
+ // which will give us any in-progress merge\r
+ // runs, if there are any.\r
+ if (inserter != null)\r
+ inserter.completedInserts();\r
+ inserter = null;\r
+\r
+ // Make sure the scan is closed, if there is one.\r
+ // This will cause the callback to doneScanning().\r
+ if (scan != null)\r
+ {\r
+ scan.close();\r
+ scan = null;\r
+ }\r
+\r
+ // If we have a row set, get rid of it.\r
+ if (sortBuffer != null)\r
+ {\r
+ sortBuffer.close();\r
+ sortBuffer = null;\r
+ }\r
+\r
+ // Clean out the rest of the objects.\r
+ template = null;\r
+ columnOrdering = null;\r
+ sortObserver = null;\r
+\r
+ // If there are any merge runs, drop them.\r
+ dropMergeRuns((TransactionManager)tran);\r
+\r
+ // Whew!\r
+ state = STATE_CLOSED;\r
+ }\r
+\r
+\r
+ /*\r
+ * Methods of MergeSort. Arranged alphabetically.\r
+ */\r
+\r
+ /**\r
+ Check the column ordering against the template, making\r
+ sure that each column is present in the template,\r
+ implements Orderable, and is not mentioned more than\r
+ once. Intended to be called as part of a sanity check.\r
+ **/\r
+ protected boolean checkColumnOrdering(\r
+ DataValueDescriptor[] template, \r
+ ColumnOrdering columnOrdering[])\r
+ {\r
+ // Allocate an array to check that each column mentioned only once.\r
+ int templateNColumns = template.length;\r
+ boolean seen[] = new boolean[templateNColumns];\r
+\r
+ // Check each column ordering.\r
+ for (int i = 0; i < columnOrdering.length; i++)\r
+ {\r
+ int colid = columnOrdering[i].getColumnId();\r
+\r
+ // Check that the column id is valid.\r
+ if (colid < 0 || colid >= templateNColumns)\r
+ return false;\r
+ \r
+ // Check that the column isn't mentioned more than once.\r
+ if (seen[colid])\r
+ return false;\r
+ seen[colid] = true;\r
+\r
+ Object columnVal = \r
+ RowUtil.getColumn(template, (FormatableBitSet) null, colid);\r
+\r
+ if (!(columnVal instanceof Orderable))\r
+ return false;\r
+ }\r
+\r
+ return true;\r
+ }\r
+\r
+ /**\r
+ Check that the columns in the row agree with the columns\r
+ in the template, both in number and in type.\r
+ <p>\r
+ XXX (nat) Currently checks that the classes implementing\r
+ each column are the same -- is this right?\r
+ **/\r
+ void checkColumnTypes(DataValueDescriptor[] row)\r
+ throws StandardException\r
+ {\r
+ int nCols = row.length;\r
+ if (template.length != nCols)\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ SanityManager.THROWASSERT(\r
+ "template.length (" + template.length +\r
+ ") expected to be = to nCols (" +\r
+ nCols + ")");\r
+ }\r
+ throw StandardException.newException(\r
+ SQLState.SORT_TYPE_MISMATCH);\r
+ }\r
+\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ for (int colid = 0; colid < nCols; colid++)\r
+ {\r
+ Object col1 = row[colid];\r
+ Object col2 = template[colid];\r
+ if (col1 == null)\r
+ {\r
+ SanityManager.THROWASSERT(\r
+ "col[" + colid + "] is null");\r
+ }\r
+ \r
+ if (!(col1 instanceof CloneableObject))\r
+ {\r
+ SanityManager.THROWASSERT(\r
+ "col[" + colid + "] (" +col1.getClass().getName()+\r
+ ") is not a CloneableObject.");\r
+ }\r
+\r
+ if (col1.getClass() != col2.getClass())\r
+ {\r
+ SanityManager.THROWASSERT(\r
+ "col1.getClass() (" + col1.getClass() +\r
+ ") expected to be the same as col2.getClass() (" +\r
+ col2.getClass() + ")");\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ int compare(\r
+ DataValueDescriptor[] r1, \r
+ DataValueDescriptor[] r2)\r
+ throws StandardException\r
+ {\r
+ // Get the number of columns we have to compare.\r
+ int colsToCompare = this.columnOrdering.length;\r
+ int r;\r
+\r
+ // Compare the columns specified in the column\r
+ // ordering array.\r
+ for (int i = 0; i < colsToCompare; i++)\r
+ {\r
+ // Get columns to compare.\r
+ int colid = this.columnOrderingMap[i];\r
+\r
+ // If the columns don't compare equal, we're done.\r
+ // Return the sense of the comparison.\r
+ if ((r = r1[colid].compare(r2[colid])) \r
+ != 0)\r
+ {\r
+ if (this.columnOrderingAscendingMap[i])\r
+ return r;\r
+ else\r
+ return -r;\r
+ }\r
+ }\r
+\r
+ // We made it through all the columns, and they must have\r
+ // all compared equal. So return that the rows compare equal.\r
+ return 0;\r
+ }\r
+\r
+ /**\r
+ Go from the CLOSED to the INITIALIZED state.\r
+ **/\r
+ public void initialize(\r
+ DataValueDescriptor[] template,\r
+ ColumnOrdering columnOrdering[],\r
+ SortObserver sortObserver,\r
+ boolean alreadyInOrder,\r
+ long estimatedRows,\r
+ int sortBufferMax)\r
+ throws StandardException\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ SanityManager.ASSERT(state == STATE_CLOSED);\r
+ }\r
+\r
+ // Make sure the column ordering makes sense\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ SanityManager.ASSERT(checkColumnOrdering(template, columnOrdering),\r
+ "column ordering error");\r
+ }\r
+\r
+ // Set user-defined parameters.\r
+ this.template = template;\r
+ this.columnOrdering = columnOrdering;\r
+ this.sortObserver = sortObserver;\r
+ this.alreadyInOrder = alreadyInOrder;\r
+\r
+ // Cache results of columnOrdering calls, results are not allowed\r
+ // to change throughout a sort.\r
+ columnOrderingMap = new int[columnOrdering.length];\r
+ columnOrderingAscendingMap = new boolean[columnOrdering.length];\r
+ for (int i = 0; i < columnOrdering.length; i++)\r
+ {\r
+ columnOrderingMap[i] = columnOrdering[i].getColumnId();\r
+ columnOrderingAscendingMap[i] = columnOrdering[i].getIsAscending();\r
+ }\r
+\r
+ // No inserter or scan yet.\r
+ this.inserter = null;\r
+ this.scan = null;\r
+\r
+ // We don't have any merge runs.\r
+ this.mergeRuns = null;\r
+ this.sortBuffer = null;\r
+ this.sortBufferMax = sortBufferMax;\r
+\r
+ if (estimatedRows > sortBufferMax)\r
+ sortBufferMin = sortBufferMax;\r
+ else\r
+ sortBufferMin = (int)estimatedRows;\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ if (SanityManager.DEBUG_ON("testSort"))\r
+ sortBufferMin = sortBufferMax;\r
+ }\r
+\r
+ this.state = STATE_INITIALIZED;\r
+ }\r
+\r
+ /**\r
+ An inserter is closing.\r
+ **/\r
+ void doneInserting(MergeInserter inserter,\r
+ SortBuffer sortBuffer, Vector mergeRuns)\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ SanityManager.ASSERT(state == STATE_INSERTING);\r
+ }\r
+\r
+ this.sortBuffer = sortBuffer;\r
+ this.mergeRuns = mergeRuns;\r
+ this.inserter = null;\r
+\r
+ this.state = STATE_DONE_INSERTING;\r
+ }\r
+\r
+ void doneScanning(Scan scan, SortBuffer sortBuffer)\r
+ {\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ // Make sure the scan we're getting back is the one we gave out\r
+\r
+ if (this.scan != scan)\r
+ SanityManager.THROWASSERT("this.scan = " + this.scan \r
+ + " scan = " + scan);\r
+ }\r
+\r
+ this.sortBuffer = sortBuffer;\r
+ this.scan = null;\r
+\r
+ this.state = STATE_DONE_SCANNING;\r
+ }\r
+\r
+ void doneScanning(Scan scan, SortBuffer sortBuffer,\r
+ Vector mergeRuns)\r
+ {\r
+ this.mergeRuns = mergeRuns;\r
+\r
+ doneScanning(scan, sortBuffer);\r
+ }\r
+\r
+\r
+ /**\r
+ Get rid of the merge runs, if there are any.\r
+ Must not cause any errors because it's called\r
+ during error processing.\r
+ **/\r
+ void dropMergeRuns(TransactionManager tran)\r
+ {\r
+ if (mergeRuns != null)\r
+ {\r
+ Enumeration e = mergeRuns.elements();\r
+\r
+ try \r
+ {\r
+ Transaction rawTran = tran.getRawStoreXact();\r
+ long segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+\r
+ while (e.hasMoreElements())\r
+ {\r
+ long containerId = ((Long) e.nextElement()).longValue();\r
+ rawTran.dropStreamContainer(segmentId, containerId);\r
+ }\r
+ }\r
+ catch (StandardException se)\r
+ {\r
+ // Ignore problems with dropping, worst case\r
+ // the raw store will clean up at reboot.\r
+ }\r
+ mergeRuns = null;\r
+ }\r
+ }\r
+\r
+ /* DEBUG (nat)\r
+ void printRunInfo(TransactionController tran)\r
+ throws StandardException\r
+ {\r
+ java.util.Enumeration e = mergeRuns.elements();\r
+ while (e.hasMoreElements())\r
+ {\r
+ long conglomid = ((Long) e.nextElement()).longValue();\r
+ ScanController sc = tran.openScan(conglomid, false,\r
+ false, null, null, 0, null,\r
+ null, 0);\r
+ System.out.println("Merge run: conglomid=" + conglomid);\r
+ while (sc.next())\r
+ {\r
+ sc.fetch(template);\r
+ System.out.println(template);\r
+ }\r
+ sc.close();\r
+ }\r
+ }\r
+ */\r
+\r
+ private void multiStageMerge(TransactionManager tran)\r
+ throws StandardException\r
+ {\r
+ Enumeration e;\r
+ //int iterations = 0; // DEBUG (nat)\r
+ int maxMergeRuns = sortBuffer.capacity();\r
+\r
+ if (maxMergeRuns > ExternalSortFactory.DEFAULT_MAX_MERGE_RUN)\r
+ maxMergeRuns = ExternalSortFactory.DEFAULT_MAX_MERGE_RUN;\r
+\r
+ Vector subset;\r
+ Vector leftovers;\r
+\r
+ while (mergeRuns.size() > maxMergeRuns)\r
+ {\r
+ // Move maxMergeRuns elements from the merge runs\r
+ // vector into a subset, leaving the rest.\r
+ subset = new Vector(maxMergeRuns);\r
+ leftovers = new Vector(mergeRuns.size() - maxMergeRuns);\r
+ e = mergeRuns.elements();\r
+ while (e.hasMoreElements())\r
+ {\r
+ Long containerId = (Long) e.nextElement();\r
+ if (subset.size() < maxMergeRuns)\r
+ subset.addElement(containerId);\r
+ else\r
+ leftovers.addElement(containerId);\r
+ }\r
+\r
+ /* DEBUG (nat)\r
+ iterations++;\r
+ System.out.println(subset.size() + " elements in subset");\r
+ System.out.println(leftovers.size() + " elements in leftovers");\r
+ System.out.println(mergeRuns.size() + " elements in mergeRuns");\r
+ System.out.println("maxMergeRuns is " + maxMergeRuns);\r
+ System.out.println("iterations = " + iterations);\r
+ if (subset.size() == 0)\r
+ {\r
+ System.exit(1);\r
+ }\r
+ */\r
+\r
+ mergeRuns = leftovers;\r
+\r
+ // Open a merge scan on the subset.\r
+ MergeScanRowSource msRowSource = \r
+ new MergeScanRowSource(this, tran, sortBuffer, subset, sortObserver, false);\r
+\r
+ if (!msRowSource.init(tran))\r
+ {\r
+ throw StandardException.newException(\r
+ SQLState.SORT_COULD_NOT_INIT);\r
+ }\r
+\r
+ // Create and open another temporary stream conglomerate\r
+ // which will become\r
+ // a merge run made up with the merged runs from the subset.\r
+ Transaction rawTran = tran.getRawStoreXact();\r
+ int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+ long id = rawTran.addAndLoadStreamContainer(segmentId,\r
+ properties, msRowSource);\r
+\r
+ mergeRuns.addElement(new Long(id));\r
+\r
+ // Drop the conglomerates in the merge subset\r
+ e = subset.elements();\r
+ while (e.hasMoreElements())\r
+ {\r
+ Long containerId = (Long) e.nextElement();\r
+ rawTran.dropStreamContainer(segmentId, containerId.longValue());\r
+ }\r
+ }\r
+ }\r
+\r
+ /**\r
+ Remove all the rows from the sort buffer and store them\r
+ in a temporary conglomerate. The temporary conglomerate\r
+ is a "merge run". Returns the container id of the\r
+ merge run.\r
+ **/\r
+ long createMergeRun(TransactionManager tran, SortBuffer sortBuffer)\r
+ throws StandardException\r
+ {\r
+ // this sort buffer is not a scan and is not tracked by any\r
+ // TransactionManager. \r
+ SortBufferRowSource rowSource =\r
+ new SortBufferRowSource(sortBuffer, (TransactionManager)null, sortObserver, true, sortBufferMax); \r
+\r
+ // Create a temporary stream conglomerate...\r
+ Transaction rawTran = tran.getRawStoreXact(); // get raw transaction\r
+ int segmentId = StreamContainerHandle.TEMPORARY_SEGMENT;\r
+ long id = rawTran.addAndLoadStreamContainer(segmentId,\r
+ properties, rowSource);\r
+\r
+ // Don't close the sortBuffer, we just emptied it, the caller may reuse\r
+ // that sortBuffer for the next run.\r
+ rowSource = null;\r
+\r
+ return id;\r
+ }\r
+}\r