Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution;
0019 
0020 import java.io.IOException;
0021 import java.util.LinkedList;
0022 
0023 import scala.collection.Iterator;
0024 
0025 import org.apache.spark.TaskContext;
0026 import org.apache.spark.sql.catalyst.InternalRow;
0027 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0028 
0029 /**
0030  * An iterator interface used to pull the output from generated function for multiple operators
0031  * (whole stage codegen).
0032  */
0033 public abstract class BufferedRowIterator {
0034   protected LinkedList<InternalRow> currentRows = new LinkedList<>();
0035   // used when there is no column in output
0036   protected UnsafeRow unsafeRow = new UnsafeRow(0);
0037   private long startTimeNs = System.nanoTime();
0038 
0039   protected int partitionIndex = -1;
0040 
0041   public boolean hasNext() throws IOException {
0042     if (currentRows.isEmpty()) {
0043       processNext();
0044     }
0045     return !currentRows.isEmpty();
0046   }
0047 
0048   public InternalRow next() {
0049     return currentRows.remove();
0050   }
0051 
0052   /**
0053    * Returns the elapsed time since this object is created. This object represents a pipeline so
0054    * this is a measure of how long the pipeline has been running.
0055    */
0056   public long durationMs() {
0057     return (System.nanoTime() - startTimeNs) / (1000 * 1000);
0058   }
0059 
0060   /**
0061    * Initializes from array of iterators of InternalRow.
0062    */
0063   public abstract void init(int index, Iterator<InternalRow>[] iters);
0064 
0065   /*
0066    * Attributes of the following four methods are public. Thus, they can be also accessed from
0067    * methods in inner classes. See SPARK-23598
0068    */
0069   /**
0070    * Append a row to currentRows.
0071    */
0072   public void append(InternalRow row) {
0073     currentRows.add(row);
0074   }
0075 
0076   /**
0077    * Returns whether `processNext()` should stop processing next row from `input` or not.
0078    *
0079    * If it returns true, the caller should exit the loop (return from processNext()).
0080    */
0081   public boolean shouldStop() {
0082     return !currentRows.isEmpty();
0083   }
0084 
0085   /**
0086    * Increase the peak execution memory for current task.
0087    */
0088   public void incPeakExecutionMemory(long size) {
0089     TaskContext.get().taskMetrics().incPeakExecutionMemory(size);
0090   }
0091 
0092   /**
0093    * Processes the input until have a row as output (currentRow).
0094    *
0095    * After it's called, if currentRow is still null, it means no more rows left.
0096    */
0097   protected abstract void processNext() throws IOException;
0098 }