--- /dev/null
+/*\r
+\r
+ Derby - Class org.apache.derby.impl.sql.execute.AggregateSortObserver\r
+\r
+ Licensed to the Apache Software Foundation (ASF) under one or more\r
+ contributor license agreements. See the NOTICE file distributed with\r
+ this work for additional information regarding copyright ownership.\r
+ The ASF licenses this file to you under the Apache License, Version 2.0\r
+ (the "License"); you may not use this file except in compliance with\r
+ the License. You may obtain a copy of the License at\r
+\r
+ http://www.apache.org/licenses/LICENSE-2.0\r
+\r
+ Unless required by applicable law or agreed to in writing, software\r
+ distributed under the License is distributed on an "AS IS" BASIS,\r
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ See the License for the specific language governing permissions and\r
+ limitations under the License.\r
+\r
+ */\r
+\r
+package org.apache.derby.impl.sql.execute;\r
+\r
+import org.apache.derby.iapi.error.StandardException;\r
+import org.apache.derby.iapi.services.sanity.SanityManager;\r
+import org.apache.derby.iapi.sql.execute.ExecRow;\r
+import org.apache.derby.iapi.types.DataValueDescriptor;\r
+import org.apache.derby.iapi.types.UserDataValue;\r
+\r
+/**\r
+ * This sort observer performs aggregation.\r
+ *\r
+ */\r
+public class AggregateSortObserver extends BasicSortObserver\r
+{\r
+\r
+ protected GenericAggregator[] aggsToProcess;\r
+ protected GenericAggregator[] aggsToInitialize;\r
+\r
+ private int firstAggregatorColumn;\r
+\r
+ /**\r
+ * Simple constructor\r
+ *\r
+ * @param doClone If true, then rows that are retained\r
+ * by the sorter will be cloned. This is needed\r
+ * if language is reusing row wrappers.\r
+ *\r
+ * @param aggsToProcess the array of aggregates that \r
+ * need to be accumulated/merged in the sorter.\r
+ *\r
+ * @param aggsToInitialize the array of aggregates that\r
+ * need to be iniitialized as they are inserted\r
+ * into the sorter. This may be different than\r
+ * aggsToProcess in the case where some distinct\r
+ * aggregates are dropped in the initial pass of\r
+ * a two phase aggregation for scalar or vector\r
+ * distinct aggregation. The initialization process\r
+ * consists of replacing an empty UserValue with a new, \r
+ * initialized aggregate of the appropriate type.\r
+ * Note that for each row, only the first aggregate\r
+ * in this list is checked to see whether initialization\r
+ * is needed. If so, ALL aggregates are initialized;\r
+ * otherwise, NO aggregates are initialized.\r
+ *\r
+ * @param execRow ExecRow to use as source of clone for store.\r
+ */\r
+ public AggregateSortObserver(boolean doClone, GenericAggregator[] aggsToProcess, \r
+ GenericAggregator[] aggsToInitialize,\r
+ ExecRow execRow)\r
+ {\r
+ super(doClone, false, execRow, true);\r
+ this.aggsToProcess = aggsToProcess;\r
+ this.aggsToInitialize = aggsToInitialize;\r
+\r
+ /*\r
+ ** We expect aggsToInitialize and aggsToProcess to\r
+ ** be non null. However, if it is deemed ok for them\r
+ ** to be null, it shouldn't be too hard to add the\r
+ ** extra null checks herein.\r
+ */\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ SanityManager.ASSERT(aggsToInitialize != null, "aggsToInitialize argument to AggregateSortObserver is null");\r
+ SanityManager.ASSERT(aggsToProcess != null, "aggsToProcess argument to AggregateSortObserver is null");\r
+ }\r
+\r
+ if (aggsToInitialize.length > 0)\r
+ {\r
+ firstAggregatorColumn = aggsToInitialize[0].aggregatorColumnId;\r
+ } \r
+ }\r
+\r
+ /**\r
+ * Called prior to inserting a distinct sort\r
+ * key. \r
+ *\r
+ * @param insertRow the current row that the sorter\r
+ * is on the verge of retaining\r
+ *\r
+ * @return the row to be inserted by the sorter. If null,\r
+ * then nothing is inserted by the sorter. Distinct\r
+ * sorts will want to return null.\r
+ *\r
+ * @exception StandardException never thrown\r
+ */\r
+ public DataValueDescriptor[] insertNonDuplicateKey(DataValueDescriptor[] insertRow)\r
+ throws StandardException\r
+ {\r
+ DataValueDescriptor[] returnRow = \r
+ super.insertNonDuplicateKey(insertRow);\r
+\r
+ /*\r
+ ** If we have an aggregator column that hasn't been\r
+ ** initialized, then initialize the entire row now. \r
+ */\r
+ if (aggsToInitialize.length > 0 &&\r
+ returnRow[firstAggregatorColumn].isNull())\r
+ {\r
+ for (int i = 0; i < aggsToInitialize.length; i++)\r
+ {\r
+ GenericAggregator aggregator = aggsToInitialize[i];\r
+ UserDataValue wrapper = ((UserDataValue)returnRow[aggregator.aggregatorColumnId]);\r
+ if (SanityManager.DEBUG)\r
+ {\r
+ if (!wrapper.isNull())\r
+ {\r
+ SanityManager.THROWASSERT("during aggregate "+\r
+ "initialization, all wrappers expected to be empty; "+\r
+ "however, the wrapper for the following aggregate " +\r
+ "was not empty:" +aggregator+". The value stored is "+\r
+ wrapper.getObject());\r
+ }\r
+ }\r
+ wrapper.setValue(aggregator.getAggregatorInstance());\r
+ aggregator.accumulate(returnRow, returnRow);\r
+ }\r
+ }\r
+\r
+ return returnRow;\r
+ \r
+ } \r
+ /**\r
+ * Called prior to inserting a duplicate sort\r
+ * key. We do aggregation here.\r
+ *\r
+ * @param insertRow the current row that the sorter\r
+ * is on the verge of retaining. It is a duplicate\r
+ * of existingRow.\r
+ *\r
+ * @param existingRow the row that is already in the\r
+ * the sorter which is a duplicate of insertRow\r
+ *\r
+ * @exception StandardException never thrown\r
+ */\r
+ public DataValueDescriptor[] insertDuplicateKey(DataValueDescriptor[] insertRow, DataValueDescriptor[] existingRow) \r
+ throws StandardException\r
+ {\r
+ if (aggsToProcess.length == 0)\r
+ {\r
+ return null;\r
+ }\r
+\r
+ /*\r
+ ** If the other row already has an aggregator, then\r
+ ** we need to merge with it. Otherwise, accumulate\r
+ ** it.\r
+ */\r
+ for (int i = 0; i < aggsToProcess.length; i++)\r
+ {\r
+ GenericAggregator aggregator = aggsToProcess[i];\r
+ if (insertRow[aggregator.getColumnId()].isNull())\r
+ {\r
+ aggregator.accumulate(insertRow, existingRow);\r
+ }\r
+ else\r
+ {\r
+ aggregator.merge(insertRow, existingRow);\r
+ }\r
+ }\r
+ return null;\r
+ }\r
+}\r