diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java index 688158e78b251..4be14fd78255f 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/AbstractCli.java @@ -55,6 +55,59 @@ public abstract class AbstractCli { + /** + * Returns true if the SQLException is likely due to connection loss. Used so that CLI can rethrow + * and trigger reconnection. + */ + static boolean isConnectionRelated(SQLException e) { + if (e == null) { + return false; + } + if (matchesConnectionFailure(e.getMessage())) { + return true; + } + Throwable cause = e.getCause(); + return cause != null && matchesConnectionFailure(cause.getMessage()); + } + + private static boolean matchesConnectionFailure(String msg) { + if (msg == null) { + return false; + } + String lower = msg.toLowerCase(); + return lower.contains("connection") + || lower.contains("refused") + || lower.contains("timeout") + || lower.contains("closed") + || lower.contains("reset") + || lower.contains("network") + || lower.contains("broken pipe"); + } + + /** + * Returns true if the SQLException indicates a session/statement state error (e.g. statement ID + * no longer valid after reconnect). Used to show a friendly message instead of the raw exception. + */ + static boolean isSessionOrStatementError(SQLException e) { + if (e == null) { + return false; + } + if (e.getMessage() != null && matchesSessionOrStatementFailure(e.getMessage())) { + return true; + } + Throwable cause = e.getCause(); + return cause != null + && cause.getMessage() != null + && matchesSessionOrStatementFailure(cause.getMessage()); + } + + private static boolean matchesSessionOrStatementFailure(String msg) { + String lower = msg.toLowerCase(); + return lower.contains("doesn't exist in this session") + || lower.contains("statementid") + || lower.contains("statement id"); + } + static final String HOST_ARGS = "h"; static final String HOST_NAME = "host"; @@ -394,7 +447,8 @@ static void echoStarting(CliContext ctx) { ctx.getPrinter().println("---------------------"); } - static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection) { + static OperationResult handleInputCmd(CliContext ctx, String cmd, IoTDBConnection connection) + throws SQLException { lastProcessStatus = CODE_OK; String specialCmd = cmd.toLowerCase().trim(); @@ -495,7 +549,8 @@ private static int setTimestampDisplay(CliContext ctx, String specialCmd, String * @return execute result code */ private static int setTimeZone( - CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection) { + CliContext ctx, String specialCmd, String cmd, IoTDBConnection connection) + throws SQLException { String[] values = specialCmd.split("="); if (values.length != 2) { ctx.getPrinter() @@ -505,6 +560,12 @@ private static int setTimeZone( } try { connection.setTimeZone(cmd.split("=")[1].trim()); + } catch (SQLException e) { + if (isConnectionRelated(e)) { + throw e; + } + ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage())); + return CODE_ERROR; } catch (Exception e) { ctx.getPrinter().println(String.format("Time zone format error: %s", e.getMessage())); return CODE_ERROR; @@ -513,10 +574,13 @@ private static int setTimeZone( return CODE_OK; } - private static int showTimeZone(CliContext ctx, IoTDBConnection connection) { + private static int showTimeZone(CliContext ctx, IoTDBConnection connection) throws SQLException { try { ctx.getPrinter().println("Current time zone: " + connection.getTimeZone()); } catch (Exception e) { + if (e instanceof SQLException && isConnectionRelated((SQLException) e)) { + throw (SQLException) e; + } ctx.getPrinter().println("Cannot get time zone from server side because: " + e.getMessage()); return CODE_ERROR; } @@ -549,7 +613,8 @@ private static int importCmd( } @SuppressWarnings({"squid:S3776"}) // Suppress high Cognitive Complexity warning - private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd) { + private static int executeQuery(CliContext ctx, IoTDBConnection connection, String cmd) + throws SQLException { int executeStatus = CODE_OK; long startTime = System.currentTimeMillis(); try (Statement statement = connection.createStatement()) { @@ -610,6 +675,18 @@ private static int executeQuery(CliContext ctx, IoTDBConnection connection, Stri } else { ctx.getPrinter().println("Msg: " + SUCCESS_MESSAGE); } + } catch (SQLException e) { + if (isConnectionRelated(e)) { + throw e; + } + if (isSessionOrStatementError(e)) { + ctx.getPrinter() + .println( + "Reconnected, but the previous command could not be completed. Please run your command again."); + } else { + ctx.getPrinter().println("Msg: " + e); + } + executeStatus = CODE_ERROR; } catch (Exception e) { ctx.getPrinter().println("Msg: " + e); executeStatus = CODE_ERROR; @@ -858,7 +935,8 @@ enum OperationResult { NO_OPER } - static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection) { + static boolean processCommand(CliContext ctx, String s, IoTDBConnection connection) + throws SQLException { if (s == null) { return true; } diff --git a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java index 396c28676889e..10cd745e4ec0d 100644 --- a/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java +++ b/iotdb-client/cli/src/main/java/org/apache/iotdb/cli/Cli.java @@ -51,6 +51,35 @@ public class Cli extends AbstractCli { // TODO: Make non-static private static final Properties info = new Properties(); + /** Number of reconnection attempts when connection is lost during interactive session. */ + private static final int RECONNECT_RETRY_NUM = 3; + + /** Delay in ms between reconnection attempts. */ + private static final long RECONNECT_RETRY_INTERVAL_MS = 1000; + + /** Result of reading and processing one line; used to support reconnection. */ + private static class ReadLineResult { + final boolean stop; + final String failedCommand; + + ReadLineResult(boolean stop, String failedCommand) { + this.stop = stop; + this.failedCommand = failedCommand; + } + + static ReadLineResult continueLoop() { + return new ReadLineResult(false, null); + } + + static ReadLineResult stopLoop() { + return new ReadLineResult(true, null); + } + + static ReadLineResult reconnectAndRetry(String command) { + return new ReadLineResult(false, command); + } + } + /** * IoTDB Client main function. * @@ -155,6 +184,28 @@ private static boolean parseCommandLine( return true; } + private static IoTDBConnection openConnection() throws SQLException { + return (IoTDBConnection) + DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info); + } + + private static void setupConnection(IoTDBConnection connection) + throws java.sql.SQLException, org.apache.thrift.TException { + connection.setQueryTimeout(queryTimeout); + properties = connection.getServerProperties(); + timestampPrecision = properties.getTimestampPrecision(); + } + + private static void closeConnectionQuietly(IoTDBConnection connection) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException ignored) { + // ignore + } + } + } + private static void serve(CliContext ctx) { try { useSsl = commandLine.getOptionValue(USE_SSL_ARGS); @@ -188,41 +239,104 @@ private static void executeSql(CliContext ctx) throws TException { processCommand(ctx, execute, connection); ctx.exit(lastProcessStatus); } catch (SQLException e) { - ctx.getPrinter().println(IOTDB_ERROR_PREFIX + "Can't execute sql because" + e.getMessage()); + ctx.getPrinter() + .println(IOTDB_ERROR_PREFIX + ": Can't execute sql because " + e.getMessage()); ctx.exit(CODE_ERROR); } } private static void receiveCommands(CliContext ctx) throws TException { - try (IoTDBConnection connection = - (IoTDBConnection) - DriverManager.getConnection(Config.IOTDB_URL_PREFIX + host + ":" + port + "/", info)) { - connection.setQueryTimeout(queryTimeout); - properties = connection.getServerProperties(); - timestampPrecision = properties.getTimestampPrecision(); - + IoTDBConnection connection = null; + try { + connection = openConnection(); + setupConnection(connection); echoStarting(ctx); displayLogo(ctx, properties.getLogo(), properties.getVersion(), properties.getBuildInfo()); ctx.getPrinter().println(String.format("Successfully login at %s:%s", host, port)); while (true) { - boolean readLine = readerReadLine(ctx, connection); - if (readLine) { + ReadLineResult result = readerReadLine(ctx, connection); + if (result.stop) { break; } + if (result.failedCommand != null) { + // Connection failed during processCommand; try to reconnect and retry the command. + closeConnectionQuietly(connection); + connection = null; + boolean reconnected = false; + for (int attempt = 1; attempt <= RECONNECT_RETRY_NUM; attempt++) { + if (attempt > 1) { + try { + Thread.sleep(RECONNECT_RETRY_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + ctx.getErr().printf("%s: Reconnection interrupted.%n", IOTDB_ERROR_PREFIX); + ctx.exit(CODE_ERROR); + } + } + try { + connection = openConnection(); + setupConnection(connection); + ctx.getPrinter().println("Connection lost. Reconnected. Retrying command."); + processCommand(ctx, result.failedCommand, connection); + reconnected = true; + break; + } catch (SQLException e) { + if (isSessionOrStatementError(e)) { + // Reconnect succeeded but retry failed due to session/statement state; ask user to + // run the command again. + ctx.getPrinter() + .println( + "Reconnected, but the previous command could not be completed. Please run your command again."); + reconnected = true; + break; + } + if (attempt == RECONNECT_RETRY_NUM) { + ctx.getErr() + .printf( + "%s: Could not reconnect after %d attempts. Please check that the server is running and try again.%n", + IOTDB_ERROR_PREFIX, RECONNECT_RETRY_NUM); + ctx.exit(CODE_ERROR); + } + } catch (TException e) { + // RPC/network failure during reconnect (e.g. getServerProperties); treat as reconnect + // failure and retry with backoff. + if (attempt == RECONNECT_RETRY_NUM) { + ctx.getErr() + .printf( + "%s: Could not reconnect after %d attempts. Please check that the server is running and try again.%n", + IOTDB_ERROR_PREFIX, RECONNECT_RETRY_NUM); + ctx.exit(CODE_ERROR); + } + } + } + if (!reconnected) { + break; + } + } } } catch (SQLException e) { ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage()); ctx.exit(CODE_ERROR); + } finally { + closeConnectionQuietly(connection); } } - private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection) { + private static ReadLineResult readerReadLine(CliContext ctx, IoTDBConnection connection) { String s; try { s = ctx.getLineReader().readLine(cliPrefix + "> ", null); - boolean continues = processCommand(ctx, s, connection); - if (!continues) { - return true; + try { + boolean continues = processCommand(ctx, s, connection); + if (!continues) { + return ReadLineResult.stopLoop(); + } + } catch (SQLException e) { + if (isConnectionRelated(e)) { + return ReadLineResult.reconnectAndRetry(s); + } + ctx.getErr().printf("%s: %s%n", IOTDB_ERROR_PREFIX, e.getMessage()); + return ReadLineResult.stopLoop(); } } catch (UserInterruptException e) { // Exit on signal INT requires confirmation. @@ -231,12 +345,12 @@ private static boolean readerReadLine(CliContext ctx, IoTDBConnection connection // Exit on EOF (usually by pressing CTRL+D). ctx.exit(CODE_OK); } catch (IllegalArgumentException e) { - if (e.getMessage().contains("history")) { - return false; + if (e.getMessage() != null && e.getMessage().contains("history")) { + return ReadLineResult.continueLoop(); } throw e; } - return false; + return ReadLineResult.continueLoop(); } private static void readLine(CliContext ctx) { diff --git a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java index 635676ce2af8e..5ba951e42051d 100644 --- a/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java +++ b/iotdb-client/cli/src/test/java/org/apache/iotdb/cli/AbstractCliTest.java @@ -39,7 +39,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.SQLException; + import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.when; @@ -150,7 +154,7 @@ private void isTwoStringArrayEqual(String[] expected, String[] actual) { } @Test - public void testHandleInputInputCmd() { + public void testHandleInputInputCmd() throws SQLException { CliContext ctx = new CliContext(System.in, System.out, System.err, ExitType.EXCEPTION); assertEquals( OperationResult.STOP_OPER, @@ -199,4 +203,80 @@ public void testHandleInputInputCmd() { AbstractCli.handleInputCmd( ctx, String.format("%s=111", AbstractCli.SET_FETCH_SIZE), connection)); } + + // --- Tests for connection/session failure detection (reconnect feature) --- + + @Test + public void testIsConnectionRelatedNull() { + assertFalse(AbstractCli.isConnectionRelated(null)); + } + + @Test + public void testIsConnectionRelatedDirectMessage() { + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Connection refused"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Connection reset"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Io exception: Connection reset"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Connection closed"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Read timeout"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Network is unreachable"))); + assertTrue(AbstractCli.isConnectionRelated(new SQLException("Broken pipe"))); + } + + @Test + public void testIsConnectionRelatedViaCause() { + SQLException e = new SQLException("outer"); + e.initCause(new RuntimeException("Connection refused")); + assertTrue(AbstractCli.isConnectionRelated(e)); + } + + @Test + public void testIsConnectionRelatedNoMatch() { + assertFalse(AbstractCli.isConnectionRelated(new SQLException("Syntax error"))); + assertFalse(AbstractCli.isConnectionRelated(new SQLException("Table not found"))); + assertFalse(AbstractCli.isConnectionRelated(new SQLException((String) null))); + } + + @Test + public void testIsConnectionRelatedNullMessageWithNonMatchingCause() { + SQLException e = new SQLException((String) null); + e.initCause(new RuntimeException("Some other error")); + assertFalse(AbstractCli.isConnectionRelated(e)); + } + + @Test + public void testIsSessionOrStatementErrorNull() { + assertFalse(AbstractCli.isSessionOrStatementError(null)); + } + + @Test + public void testIsSessionOrStatementErrorDirectMessage() { + assertTrue( + AbstractCli.isSessionOrStatementError( + new SQLException("StatementId doesn't exist in this session"))); + assertTrue( + AbstractCli.isSessionOrStatementError(new SQLException("Statement ID 123 is invalid"))); + assertTrue( + AbstractCli.isSessionOrStatementError(new SQLException("something statementid something"))); + } + + @Test + public void testIsSessionOrStatementErrorViaCause() { + SQLException e = new SQLException("wrapper"); + e.initCause(new IllegalStateException("StatementId doesn't exist in this session")); + assertTrue(AbstractCli.isSessionOrStatementError(e)); + } + + @Test + public void testIsSessionOrStatementErrorNoMatch() { + assertFalse(AbstractCli.isSessionOrStatementError(new SQLException("Connection refused"))); + assertFalse(AbstractCli.isSessionOrStatementError(new SQLException("Syntax error"))); + assertFalse(AbstractCli.isSessionOrStatementError(new SQLException((String) null))); + } + + @Test + public void testIsSessionOrStatementErrorNullMessageWithNonMatchingCause() { + SQLException e = new SQLException((String) null); + e.initCause(new RuntimeException("Other error")); + assertFalse(AbstractCli.isSessionOrStatementError(e)); + } }