Back to home page

OSCL-LXR

 
 

    


0001 /**
0002  * Licensed to the Apache Software Foundation (ASF) under one
0003  * or more contributor license agreements.  See the NOTICE file
0004  * distributed with this work for additional information
0005  * regarding copyright ownership.  The ASF licenses this file
0006  * to you under the Apache License, Version 2.0 (the
0007  * "License"); you may not use this file except in compliance
0008  * with the License.  You may obtain a copy of the License at
0009  *
0010  *     http://www.apache.org/licenses/LICENSE-2.0
0011  *
0012  * Unless required by applicable law or agreed to in writing, software
0013  * distributed under the License is distributed on an "AS IS" BASIS,
0014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0015  * See the License for the specific language governing permissions and
0016  * limitations under the License.
0017  */
0018 
0019 package org.apache.hive.service.cli.operation;
0020 
0021 import java.io.BufferedReader;
0022 import java.io.File;
0023 import java.io.FileNotFoundException;
0024 import java.io.FileOutputStream;
0025 import java.io.FileReader;
0026 import java.io.IOException;
0027 import java.io.PrintStream;
0028 import java.io.UnsupportedEncodingException;
0029 import java.util.ArrayList;
0030 import java.util.List;
0031 import java.util.Map;
0032 
0033 import static java.nio.charset.StandardCharsets.UTF_8;
0034 
0035 import org.apache.hadoop.hive.metastore.api.Schema;
0036 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
0037 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0038 import org.apache.hadoop.hive.ql.session.SessionState;
0039 import org.apache.hive.service.ServiceUtils;
0040 import org.apache.hive.service.cli.FetchOrientation;
0041 import org.apache.hive.service.cli.HiveSQLException;
0042 import org.apache.hive.service.cli.OperationState;
0043 import org.apache.hive.service.cli.RowSet;
0044 import org.apache.hive.service.cli.RowSetFactory;
0045 import org.apache.hive.service.cli.TableSchema;
0046 import org.apache.hive.service.cli.session.HiveSession;
0047 
0048 /**
0049  * Executes a HiveCommand
0050  */
0051 public class HiveCommandOperation extends ExecuteStatementOperation {
0052   private CommandProcessor commandProcessor;
0053   private TableSchema resultSchema = null;
0054 
0055   /**
0056    * For processors other than Hive queries (Driver), they output to session.out (a temp file)
0057    * first and the fetchOne/fetchN/fetchAll functions get the output from pipeIn.
0058    */
0059   private BufferedReader resultReader;
0060 
0061 
0062   protected HiveCommandOperation(HiveSession parentSession, String statement,
0063       CommandProcessor commandProcessor, Map<String, String> confOverlay) {
0064     super(parentSession, statement, confOverlay, false);
0065     this.commandProcessor = commandProcessor;
0066     setupSessionIO(parentSession.getSessionState());
0067   }
0068 
0069   private void setupSessionIO(SessionState sessionState) {
0070     try {
0071       LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString());
0072       sessionState.in = null; // hive server's session input stream is not used
0073       // open a per-session file in auto-flush mode for writing temp results
0074       sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, UTF_8.name());
0075       // TODO: for hadoop jobs, progress is printed out to session.err,
0076       // we should find a way to feed back job progress to client
0077       sessionState.err = new PrintStream(System.err, true, UTF_8.name());
0078     } catch (IOException e) {
0079       LOG.error("Error in creating temp output file ", e);
0080       try {
0081         sessionState.in = null;
0082         sessionState.out = new PrintStream(System.out, true, UTF_8.name());
0083         sessionState.err = new PrintStream(System.err, true, UTF_8.name());
0084       } catch (UnsupportedEncodingException ee) {
0085         LOG.error("Error creating PrintStream", e);
0086         ee.printStackTrace();
0087         sessionState.out = null;
0088         sessionState.err = null;
0089       }
0090     }
0091   }
0092 
0093 
0094   private void tearDownSessionIO() {
0095     ServiceUtils.cleanup(LOG,
0096         parentSession.getSessionState().out, parentSession.getSessionState().err);
0097   }
0098 
0099   @Override
0100   public void runInternal() throws HiveSQLException {
0101     setState(OperationState.RUNNING);
0102     try {
0103       String command = getStatement().trim();
0104       String[] tokens = statement.split("\\s");
0105       String commandArgs = command.substring(tokens[0].length()).trim();
0106 
0107       CommandProcessorResponse response = commandProcessor.run(commandArgs);
0108       int returnCode = response.getResponseCode();
0109       if (returnCode != 0) {
0110         throw toSQLException("Error while processing statement", response);
0111       }
0112       Schema schema = response.getSchema();
0113       if (schema != null) {
0114         setHasResultSet(true);
0115         resultSchema = new TableSchema(schema);
0116       } else {
0117         setHasResultSet(false);
0118         resultSchema = new TableSchema();
0119       }
0120     } catch (HiveSQLException e) {
0121       setState(OperationState.ERROR);
0122       throw e;
0123     } catch (Exception e) {
0124       setState(OperationState.ERROR);
0125       throw new HiveSQLException("Error running query: " + e.toString(), e);
0126     }
0127     setState(OperationState.FINISHED);
0128   }
0129 
0130   /* (non-Javadoc)
0131    * @see org.apache.hive.service.cli.operation.Operation#close()
0132    */
0133   @Override
0134   public void close() throws HiveSQLException {
0135     setState(OperationState.CLOSED);
0136     tearDownSessionIO();
0137     cleanTmpFile();
0138     cleanupOperationLog();
0139   }
0140 
0141   /* (non-Javadoc)
0142    * @see org.apache.hive.service.cli.operation.Operation#getResultSetSchema()
0143    */
0144   @Override
0145   public TableSchema getResultSetSchema() throws HiveSQLException {
0146     return resultSchema;
0147   }
0148 
0149   /* (non-Javadoc)
0150    * @see org.apache.hive.service.cli.operation.Operation#getNextRowSet(org.apache.hive.service.cli.FetchOrientation, long)
0151    */
0152   @Override
0153   public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
0154     validateDefaultFetchOrientation(orientation);
0155     if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
0156       resetResultReader();
0157     }
0158     List<String> rows = readResults((int) maxRows);
0159     RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false);
0160 
0161     for (String row : rows) {
0162       rowSet.addRow(new String[] {row});
0163     }
0164     return rowSet;
0165   }
0166 
0167   /**
0168    * Reads the temporary results for non-Hive (non-Driver) commands to the
0169    * resulting List of strings.
0170    * @param nLines number of lines read at once. If it is <= 0, then read all lines.
0171    */
0172   private List<String> readResults(int nLines) throws HiveSQLException {
0173     if (resultReader == null) {
0174       SessionState sessionState = getParentSession().getSessionState();
0175       File tmp = sessionState.getTmpOutputFile();
0176       try {
0177         resultReader = new BufferedReader(new FileReader(tmp));
0178       } catch (FileNotFoundException e) {
0179         LOG.error("File " + tmp + " not found. ", e);
0180         throw new HiveSQLException(e);
0181       }
0182     }
0183     List<String> results = new ArrayList<String>();
0184 
0185     for (int i = 0; i < nLines || nLines <= 0; ++i) {
0186       try {
0187         String line = resultReader.readLine();
0188         if (line == null) {
0189           // reached the end of the result file
0190           break;
0191         } else {
0192           results.add(line);
0193         }
0194       } catch (IOException e) {
0195         LOG.error("Reading temp results encountered an exception: ", e);
0196         throw new HiveSQLException(e);
0197       }
0198     }
0199     return results;
0200   }
0201 
0202   private void cleanTmpFile() {
0203     resetResultReader();
0204     SessionState sessionState = getParentSession().getSessionState();
0205     sessionState.deleteTmpOutputFile();
0206     sessionState.deleteTmpErrOutputFile();
0207   }
0208 
0209   private void resetResultReader() {
0210     if (resultReader != null) {
0211       ServiceUtils.cleanup(LOG, resultReader);
0212       resultReader = null;
0213     }
0214   }
0215 }