0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.hive.service.cli.operation;
0020
0021 import java.io.BufferedReader;
0022 import java.io.File;
0023 import java.io.FileNotFoundException;
0024 import java.io.FileOutputStream;
0025 import java.io.FileReader;
0026 import java.io.IOException;
0027 import java.io.PrintStream;
0028 import java.io.UnsupportedEncodingException;
0029 import java.util.ArrayList;
0030 import java.util.List;
0031 import java.util.Map;
0032
0033 import static java.nio.charset.StandardCharsets.UTF_8;
0034
0035 import org.apache.hadoop.hive.metastore.api.Schema;
0036 import org.apache.hadoop.hive.ql.processors.CommandProcessor;
0037 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
0038 import org.apache.hadoop.hive.ql.session.SessionState;
0039 import org.apache.hive.service.ServiceUtils;
0040 import org.apache.hive.service.cli.FetchOrientation;
0041 import org.apache.hive.service.cli.HiveSQLException;
0042 import org.apache.hive.service.cli.OperationState;
0043 import org.apache.hive.service.cli.RowSet;
0044 import org.apache.hive.service.cli.RowSetFactory;
0045 import org.apache.hive.service.cli.TableSchema;
0046 import org.apache.hive.service.cli.session.HiveSession;
0047
0048
0049
0050
0051 public class HiveCommandOperation extends ExecuteStatementOperation {
0052 private CommandProcessor commandProcessor;
0053 private TableSchema resultSchema = null;
0054
0055
0056
0057
0058
0059 private BufferedReader resultReader;
0060
0061
0062 protected HiveCommandOperation(HiveSession parentSession, String statement,
0063 CommandProcessor commandProcessor, Map<String, String> confOverlay) {
0064 super(parentSession, statement, confOverlay, false);
0065 this.commandProcessor = commandProcessor;
0066 setupSessionIO(parentSession.getSessionState());
0067 }
0068
0069 private void setupSessionIO(SessionState sessionState) {
0070 try {
0071 LOG.info("Putting temp output to file " + sessionState.getTmpOutputFile().toString());
0072 sessionState.in = null;
0073
0074 sessionState.out = new PrintStream(new FileOutputStream(sessionState.getTmpOutputFile()), true, UTF_8.name());
0075
0076
0077 sessionState.err = new PrintStream(System.err, true, UTF_8.name());
0078 } catch (IOException e) {
0079 LOG.error("Error in creating temp output file ", e);
0080 try {
0081 sessionState.in = null;
0082 sessionState.out = new PrintStream(System.out, true, UTF_8.name());
0083 sessionState.err = new PrintStream(System.err, true, UTF_8.name());
0084 } catch (UnsupportedEncodingException ee) {
0085 LOG.error("Error creating PrintStream", e);
0086 ee.printStackTrace();
0087 sessionState.out = null;
0088 sessionState.err = null;
0089 }
0090 }
0091 }
0092
0093
0094 private void tearDownSessionIO() {
0095 ServiceUtils.cleanup(LOG,
0096 parentSession.getSessionState().out, parentSession.getSessionState().err);
0097 }
0098
0099 @Override
0100 public void runInternal() throws HiveSQLException {
0101 setState(OperationState.RUNNING);
0102 try {
0103 String command = getStatement().trim();
0104 String[] tokens = statement.split("\\s");
0105 String commandArgs = command.substring(tokens[0].length()).trim();
0106
0107 CommandProcessorResponse response = commandProcessor.run(commandArgs);
0108 int returnCode = response.getResponseCode();
0109 if (returnCode != 0) {
0110 throw toSQLException("Error while processing statement", response);
0111 }
0112 Schema schema = response.getSchema();
0113 if (schema != null) {
0114 setHasResultSet(true);
0115 resultSchema = new TableSchema(schema);
0116 } else {
0117 setHasResultSet(false);
0118 resultSchema = new TableSchema();
0119 }
0120 } catch (HiveSQLException e) {
0121 setState(OperationState.ERROR);
0122 throw e;
0123 } catch (Exception e) {
0124 setState(OperationState.ERROR);
0125 throw new HiveSQLException("Error running query: " + e.toString(), e);
0126 }
0127 setState(OperationState.FINISHED);
0128 }
0129
0130
0131
0132
0133 @Override
0134 public void close() throws HiveSQLException {
0135 setState(OperationState.CLOSED);
0136 tearDownSessionIO();
0137 cleanTmpFile();
0138 cleanupOperationLog();
0139 }
0140
0141
0142
0143
0144 @Override
0145 public TableSchema getResultSetSchema() throws HiveSQLException {
0146 return resultSchema;
0147 }
0148
0149
0150
0151
0152 @Override
0153 public RowSet getNextRowSet(FetchOrientation orientation, long maxRows) throws HiveSQLException {
0154 validateDefaultFetchOrientation(orientation);
0155 if (orientation.equals(FetchOrientation.FETCH_FIRST)) {
0156 resetResultReader();
0157 }
0158 List<String> rows = readResults((int) maxRows);
0159 RowSet rowSet = RowSetFactory.create(resultSchema, getProtocolVersion(), false);
0160
0161 for (String row : rows) {
0162 rowSet.addRow(new String[] {row});
0163 }
0164 return rowSet;
0165 }
0166
0167
0168
0169
0170
0171
0172 private List<String> readResults(int nLines) throws HiveSQLException {
0173 if (resultReader == null) {
0174 SessionState sessionState = getParentSession().getSessionState();
0175 File tmp = sessionState.getTmpOutputFile();
0176 try {
0177 resultReader = new BufferedReader(new FileReader(tmp));
0178 } catch (FileNotFoundException e) {
0179 LOG.error("File " + tmp + " not found. ", e);
0180 throw new HiveSQLException(e);
0181 }
0182 }
0183 List<String> results = new ArrayList<String>();
0184
0185 for (int i = 0; i < nLines || nLines <= 0; ++i) {
0186 try {
0187 String line = resultReader.readLine();
0188 if (line == null) {
0189
0190 break;
0191 } else {
0192 results.add(line);
0193 }
0194 } catch (IOException e) {
0195 LOG.error("Reading temp results encountered an exception: ", e);
0196 throw new HiveSQLException(e);
0197 }
0198 }
0199 return results;
0200 }
0201
0202 private void cleanTmpFile() {
0203 resetResultReader();
0204 SessionState sessionState = getParentSession().getSessionState();
0205 sessionState.deleteTmpOutputFile();
0206 sessionState.deleteTmpErrOutputFile();
0207 }
0208
0209 private void resetResultReader() {
0210 if (resultReader != null) {
0211 ServiceUtils.cleanup(LOG, resultReader);
0212 resultReader = null;
0213 }
0214 }
0215 }