diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeConnectorCustomPortIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeConnectorCustomPortIT.java new file mode 100644 index 0000000000000..926d10f42dbd6 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeConnectorCustomPortIT.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.MultiEnvFactory; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; +import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2DualTreeAutoBasic.class}) +public class IoTDBPipeConnectorCustomPortIT extends AbstractPipeDualTreeModelAutoIT { + + @Override + @Before + public void setUp() { + // Override to enable air-gap + MultiEnvFactory.createEnv(2); + senderEnv = MultiEnvFactory.getEnv(0); + receiverEnv = MultiEnvFactory.getEnv(1); + + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setPipeAirGapReceiverEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS); + + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); + } + + @Test + public void testAsyncPortRange() { + doTest( + "iotdb-thrift-async-connector", receiverEnv.getIP() + ":" + receiverEnv.getPort(), "range"); + } + + @Test + public void testAsyncPortCandidate() { + doTest( + "iotdb-thrift-async-connector", + receiverEnv.getIP() + ":" + receiverEnv.getPort(), + "candidate"); + } + + @Test + public void testSyncThriftPortRange() { + doTest( + "iotdb-thrift-sync-connector", receiverEnv.getIP() + ":" + receiverEnv.getPort(), "range"); + } + + @Test + public void testSyncThriftPortCandidate() { + doTest( + "iotdb-thrift-sync-connector", + receiverEnv.getIP() + ":" + receiverEnv.getPort(), + "candidate"); + } + + @Test + public void testAirGapPortRange() { + doTest( + "iotdb-air-gap-connector", + receiverEnv.getIP() + ":" + receiverEnv.getDataNodeWrapper(0).getPipeAirGapReceiverPort(), + "range"); + } + + @Test + public void testAirGapPortCandidate() { + doTest( + "iotdb-air-gap-connector", + receiverEnv.getIP() + ":" + receiverEnv.getDataNodeWrapper(0).getPipeAirGapReceiverPort(), + "candidate"); + } + + private void doTest(final String connector, final String urls, final String strategy) { + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map extractorAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map connectorAttributes = new HashMap<>(); + extractorAttributes.put("realtime.mode", "forced-log"); + connectorAttributes.put("connector", connector); + connectorAttributes.put("batch.enable", "false"); + connectorAttributes.put("node-urls", urls); + connectorAttributes.put("send-port.restriction-strategy", strategy); + if (strategy.equals("range")) { + connectorAttributes.put("send-port.range.min", "1024"); + connectorAttributes.put("send-port.range.max", "1524"); + } else { + StringBuilder candidateBuilder = new StringBuilder(); + for (int i = 0; i < 30; i++) { + candidateBuilder.append(1024 + i).append(","); + } + candidateBuilder.append(1024 + 30); + connectorAttributes.put("send-port.candidate", candidateBuilder.toString()); + } + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("p1", connectorAttributes) + .setExtractorAttributes(extractorAttributes) + .setProcessorAttributes(processorAttributes)); + + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Thread.sleep(2000); + TestUtils.tryExecuteNonQueriesWithRetry( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s1) values (2010-01-01T10:00:00+08:00, 1)", + "insert into root.db.d1(time, s1) values (2010-01-02T10:00:00+08:00, 2)", + "flush")); + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select count(*) from root.**", + "count(root.db.d1.s1),", + Collections.singleton("2,")); + + } catch (Exception e) { + Assert.fail(); + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java index c579fad73c123..54b884d58b91d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/client/IoTDBConfigNodeSyncClientManager.java @@ -48,6 +48,10 @@ public IoTDBConfigNodeSyncClientManager( String loadTsFileStrategy, boolean validateTsFile, boolean shouldMarkAsPipeRequest, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { super( endPoints, @@ -62,6 +66,10 @@ public IoTDBConfigNodeSyncClientManager( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java index 36bd1cb1a5e7f..2618395db677b 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/sink/protocol/IoTDBConfigRegionSink.java @@ -77,6 +77,10 @@ protected IoTDBSyncClientManager constructClient( final String loadTsFileStrategy, final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { return new IoTDBConfigNodeSyncClientManager( nodeUrls, @@ -90,6 +94,10 @@ protected IoTDBSyncClientManager constructClient( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java index 73e2213eea616..d7a6c663d853b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java @@ -103,6 +103,10 @@ public IoTDBDataNodeAsyncClientManager( final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, final boolean isTSFileUsed, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { super( endPoints, @@ -113,6 +117,10 @@ public IoTDBDataNodeAsyncClientManager( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); endPointSet = new HashSet<>(endPoints); @@ -134,9 +142,16 @@ public IoTDBDataNodeAsyncClientManager( new IClientManager.Factory() .createClientManager( isTSFileUsed - ? new ClientPoolFactory - .AsyncPipeTsFileDataTransferServiceClientPoolFactory() - : new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory())); + ? new ClientPoolFactory.AsyncPipeTsFileDataTransferServiceClientPoolFactory( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts) + : new ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts))); } endPoint2Client = ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java index b8eff41a9eab1..eeb7ded79c5be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeSyncClientManager.java @@ -58,6 +58,10 @@ public IoTDBDataNodeSyncClientManager( final String loadTsFileStrategy, final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts, final boolean skipIfNoPrivileges) { super( endPoints, @@ -72,6 +76,10 @@ public IoTDBDataNodeSyncClientManager( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index f8d0b104096f1..7ad5533862571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -167,6 +167,10 @@ public void customize( loadTsFileValidation, shouldMarkAsPipeRequest, false, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); transferTsFileClientManager = @@ -183,6 +187,10 @@ public void customize( loadTsFileValidation, shouldMarkAsPipeRequest, isSplitTSFileBatchModeEnabled, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); if (isTabletBatchModeEnabled) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java index 72486d6c4e7cd..6dee5fb221e66 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataNodeSyncSink.java @@ -53,6 +53,10 @@ protected IoTDBSyncClientManager constructClient( final String loadTsFileStrategy, final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { clientManager = new IoTDBDataNodeSyncClientManager( @@ -68,6 +72,10 @@ protected IoTDBSyncClientManager constructClient( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); return clientManager; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java index 9e41b585ec409..0d5a2f68a8622 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java @@ -42,6 +42,8 @@ import org.apache.commons.pool2.impl.GenericKeyedObjectPool; +import java.util.List; + public class ClientPoolFactory { private static final CommonConfig conf = CommonDescriptor.getInstance().getConfig(); @@ -273,6 +275,22 @@ public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory public static class AsyncPipeDataTransferServiceClientPoolFactory implements IClientPoolFactory { + final String customSendPortStrategy; + final int minSendPortRange; + final int maxSendPortRange; + final List candidatePorts; + + public AsyncPipeDataTransferServiceClientPoolFactory( + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts) { + this.customSendPortStrategy = customSendPortStrategy; + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -285,7 +303,11 @@ public GenericKeyedObjectPool cre .setRpcThriftCompressionEnabled(conf.isPipeSinkRPCThriftCompressionEnabled()) .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .build(), - ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), + ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName(), + this.customSendPortStrategy, + this.minSendPortRange, + this.maxSendPortRange, + this.candidatePorts), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxClientNumber()) .build() @@ -298,6 +320,23 @@ public GenericKeyedObjectPool cre public static class AsyncPipeTsFileDataTransferServiceClientPoolFactory implements IClientPoolFactory { + + final String customSendPortStrategy; + final int minSendPortRange; + final int maxSendPortRange; + final List candidatePorts; + + public AsyncPipeTsFileDataTransferServiceClientPoolFactory( + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts) { + this.customSendPortStrategy = customSendPortStrategy; + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; + } + @Override public GenericKeyedObjectPool createClientPool( ClientManager manager) { @@ -311,7 +350,11 @@ public GenericKeyedObjectPool cre .setSelectorNumOfAsyncClientManager(conf.getPipeAsyncSinkSelectorNumber()) .setPrintLogWhenEncounterException(conf.isPrintLogWhenEncounterException()) .build(), - ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName()), + ThreadName.PIPE_ASYNC_CONNECTOR_CLIENT_POOL.getName(), + this.customSendPortStrategy, + this.minSendPortRange, + this.maxSendPortRange, + this.candidatePorts), new ClientPoolProperty.Builder() .setMaxClientNumForEachNode(conf.getPipeAsyncSinkMaxTsFileClientNumber()) .build() diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java index 36295ec8500f7..5beb6de69d7bb 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncPipeDataTransferServiceClient.java @@ -24,6 +24,7 @@ import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.client.util.IoTDBSinkPortBinder; import org.apache.iotdb.rpc.TNonblockingTransportWrapper; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; @@ -36,6 +37,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.SocketChannel; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -62,13 +66,24 @@ public AsyncPipeDataTransferServiceClient( final ThriftClientProperty property, final TEndPoint endpoint, final TAsyncClientManager tClientManager, - final ClientManager clientManager) + final ClientManager clientManager, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts) throws IOException { super( property.getProtocolFactory(), tClientManager, TNonblockingTransportWrapper.wrap( endpoint.getIp(), endpoint.getPort(), property.getConnectionTimeoutMs())); + final SocketChannel socketChannel = ((TNonblockingSocket) ___transport).getSocketChannel(); + IoTDBSinkPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> socketChannel.bind(new InetSocketAddress(sendPort))); setTimeout(property.getConnectionTimeoutMs()); this.printLogWhenEncounterException = property.isPrintLogWhenEncounterException(); this.endpoint = endpoint; @@ -208,11 +223,24 @@ public String toString() { public static class Factory extends AsyncThriftClientFactory { + final String customSendPortStrategy; + final int minSendPortRange; + final int maxSendPortRange; + final List candidatePorts; + public Factory( final ClientManager clientManager, final ThriftClientProperty thriftClientProperty, - final String threadName) { + final String threadName, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + List candidatePorts) { super(clientManager, thriftClientProperty, threadName); + this.customSendPortStrategy = customSendPortStrategy; + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; } @Override @@ -230,7 +258,11 @@ public PooledObject makeObject(final TEndPoi thriftClientProperty, endPoint, tManagers[clientCnt.incrementAndGet() % tManagers.length], - clientManager)); + clientManager, + this.customSendPortStrategy, + this.minSendPortRange, + this.maxSendPortRange, + this.candidatePorts)); } @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBSinkPortBinder.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBSinkPortBinder.java new file mode 100644 index 0000000000000..ad04323720f41 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/util/IoTDBSinkPortBinder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.client.util; + +import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; +import org.apache.iotdb.commons.utils.function.Consumer; +import org.apache.iotdb.pipe.api.exception.PipeConnectionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class IoTDBSinkPortBinder { + + private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSinkPortBinder.class); + + // ===========================bind================================ + + public static void bindPort( + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts, + final Consumer consumer) { + final boolean isRange = + PipeSinkConstant.SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY.equals( + customSendPortStrategy); + boolean portFound = false; + int index = 0; + boolean searching = isRange || !candidatePorts.isEmpty(); + while (searching) { + final int port = isRange ? minSendPortRange + index : candidatePorts.get(index); + try { + consumer.accept(port); + portFound = true; + break; + } catch (final Exception e) { + System.currentTimeMillis(); + } + index++; + searching = isRange ? port <= maxSendPortRange : candidatePorts.size() > index; + } + if (!portFound) { + final String exceptionMessage = + isRange + ? String.format( + "Failed to find an available send port within the range %d to %d.", + minSendPortRange, maxSendPortRange) + : String.format( + "Failed to find an available send port in the candidate list [%s].", + candidatePorts); + LOGGER.warn(exceptionMessage); + throw new PipeConnectionException(exceptionMessage); + } + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index ec9afce7c6f6b..fcfb7ef957cff 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -47,6 +47,28 @@ public class PipeSinkConstant { public static final String CONNECTOR_IOTDB_NODE_URLS_KEY = "connector.node-urls"; public static final String SINK_IOTDB_NODE_URLS_KEY = "sink.node-urls"; + public static final String CONNECTOR_IOTDB_SEND_PORT_MIN_KEY = "connector.send-port.min"; + public static final String SINK_IOTDB_SEND_PORT_MIN_KEY = "sink.send-port.min"; + public static final String CONNECTOR_IOTDB_SEND_PORT_MAX_KEY = "connector.send-port.max"; + public static final String SINK_IOTDB_SEND_PORT_MAX_KEY = "sink.send-port.max"; + public static final String CONNECTOR_IOTDB_SEND_PORTS_KEY = "connector.send-ports"; + public static final String SINK_IOTDB_SEND_PORT_CANDIDATE_KEY = "sink.send-ports"; + public static final String CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY = + "connector.send-port.restriction-strategy"; + public static final String SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY = + "sink.send-port.restriction-strategy"; + public static final int CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE = 0; + public static final int CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE = 65535; + public static final String CONNECTOR_IOTDB_SEND_PORTS_CANDIDATE_VALUE = ""; + public static final String SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY = "range"; + public static final String CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_CANDIDATE_STRATEGY = "candidate"; + public static final Set CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_CANDIDATE_STRATEGY, + SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY))); + public static final String SINK_IOTDB_SSL_ENABLE_KEY = "sink.ssl.enable"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY = "sink.ssl.trust-store-path"; public static final String SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY = "sink.ssl.trust-store-pwd"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java index 1f76f5d2453f5..44243a35a7e04 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBClientManager.java @@ -55,6 +55,14 @@ public abstract class IoTDBClientManager { // it is a DataNode receiver. The flag is useless for configNode receiver. protected boolean supportModsIfIsDataNodeReceiver = true; + protected int minSendPortRange; + + protected int maxSendPortRange; + + protected List candidatePorts; + + protected String customSendPortStrategy; + private static final int MAX_CONNECTION_TIMEOUT_MS = 24 * 60 * 60 * 1000; // 1 day private static final int FIRST_ADJUSTMENT_TIMEOUT_MS = 6 * 60 * 60 * 1000; // 6 hours protected static final AtomicInteger CONNECTION_TIMEOUT_MS = @@ -71,6 +79,10 @@ protected IoTDBClientManager( final String loadTsFileStrategy, final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { this.endPointList = endPointList; @@ -82,6 +94,11 @@ protected IoTDBClientManager( this.loadTsFileStrategy = loadTsFileStrategy; this.validateTsFile = validateTsFile; this.shouldMarkAsPipeRequest = shouldMarkAsPipeRequest; + + this.customSendPortStrategy = customSendPortStrategy; + this.minSendPortRange = minSendPortRange; + this.maxSendPortRange = maxSendPortRange; + this.candidatePorts = candidatePorts; this.skipIfNoPrivileges = skipIfNoPrivileges; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java index b7f42295e6cc3..cfcaefaa42f7d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClient.java @@ -22,23 +22,28 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.client.ThriftClient; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.client.util.IoTDBSinkPortBinder; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport; import org.apache.iotdb.rpc.TimeoutChangeableTransport; import org.apache.iotdb.service.rpc.thrift.IClientRPCService; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.thrift.TException; +import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class IoTDBSyncClient extends IClientRPCService.Client @@ -53,12 +58,57 @@ public class IoTDBSyncClient extends IClientRPCService.Client private final TEndPoint endPoint; public IoTDBSyncClient( - ThriftClientProperty property, - String ipAddress, - int port, - boolean useSSL, - String trustStore, - String trustStorePwd) + final ThriftClientProperty property, + final String ipAddress, + final int port, + final boolean useSSL, + final String trustStore, + final String trustStorePwd, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts) + throws TTransportException { + super( + property + .getProtocolFactory() + .getProtocol( + useSSL + ? DeepCopyRpcTransportFactory.INSTANCE.getTransport( + ipAddress, + port, + property.getConnectionTimeoutMs(), + trustStore, + trustStorePwd) + : DeepCopyRpcTransportFactory.INSTANCE.getTransport( + ipAddress, port, property.getConnectionTimeoutMs()))); + this.ipAddress = ipAddress; + this.port = port; + this.endPoint = new TEndPoint(ipAddress, port); + final TTransport transport = getInputProtocol().getTransport(); + if (!transport.isOpen()) { + IoTDBSinkPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> { + final InetSocketAddress isa = new InetSocketAddress(sendPort); + ((TSocket) ((TimeoutChangeableTFastFramedTransport) transport).getSocket()) + .getSocket() + .bind(isa); + transport.open(); + }); + } + } + + public IoTDBSyncClient( + final ThriftClientProperty property, + final String ipAddress, + final int port, + final boolean useSSL, + final String trustStore, + final String trustStorePwd) throws TTransportException { super( property diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java index 76c145d0dabc2..f96da74cdbf2a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/client/IoTDBSyncClientManager.java @@ -80,6 +80,10 @@ protected IoTDBSyncClientManager( String loadTsFileStrategy, boolean validateTsFile, boolean shouldMarkAsPipeRequest, + String customSendPortStrategy, + int minSendPortRange, + int maxSendPortRange, + List candidatePorts, final boolean skipIfNoPrivileges) { super( endPoints, @@ -90,6 +94,10 @@ protected IoTDBSyncClientManager( loadTsFileStrategy, validateTsFile, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); this.useSSL = useSSL; @@ -206,7 +214,11 @@ private boolean initClientAndStatus( endPoint.getPort(), useSSL, trustStorePath, - trustStorePwd)); + trustStorePwd, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts)); return true; } catch (Exception e) { endPoint2HandshakeErrorMessage.put(endPoint, e.getMessage()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java index 7d84e3bb98fca..30a3b95228058 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBAirGapSink.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.util.IoTDBSinkPortBinder; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapOneByteResponse; @@ -187,12 +188,26 @@ public void handshake() throws Exception { } } - final AirGapSocket socket = new AirGapSocket(ip, port); - try { - socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); - socket.setKeepAlive(true); - sockets.set(i, socket); + final int finalI = i; + IoTDBSinkPortBinder.bindPort( + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, + (sendPort) -> { + final AirGapSocket socket = new AirGapSocket(ip, port); + try { + socket.bind(new InetSocketAddress(sendPort)); + socket.connect(new InetSocketAddress(ip, port), handshakeTimeoutMs); + socket.setKeepAlive(true); + sockets.set(finalI, socket); + } catch (final Exception e) { + socket.close(); + throw e; + } + }); + LOGGER.info("Successfully connected to target server ip: {}, port: {}.", ip, port); failLogTimes.remove(nodeUrls.get(i)); } catch (final Exception e) { @@ -211,7 +226,7 @@ public void handshake() throws Exception { } try { - sendHandshakeReq(socket); + sendHandshakeReq(sockets.get(i)); isSocketAlive.set(i, true); } catch (Exception e) { LOGGER.warn( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java index 88a8b71775f45..f7f1a70f5a8ee 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSink.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -95,6 +96,14 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORTS_CANDIDATE_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORTS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_MAX_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_MIN_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; @@ -135,6 +144,11 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PORT_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SEND_PORT_CANDIDATE_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SEND_PORT_MAX_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SEND_PORT_MIN_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_ID; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; @@ -154,6 +168,9 @@ public abstract class IoTDBSink implements PipeConnector { "Exception occurred while parsing node urls from target servers: {}"; private static final String PARSE_URL_ERROR_MESSAGE = "Error occurred while parsing node urls from target servers, please check the specified 'host':'port' or 'node-urls'"; + public static final int MIN_PORT = + 0; // The minimum port number allocated to user processes by the operating system + public static final int MAX_PORT = 65535; // The maximum value for port numbers private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBSink.class); @@ -165,6 +182,14 @@ public abstract class IoTDBSink implements PipeConnector { protected UserEntity userEntity; protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; + protected int minSendPortRange; + + protected int maxSendPortRange; + + protected List candidatePorts; + + protected String customSendPortStrategy; + protected String loadBalanceStrategy; protected String loadTsFileStrategy; @@ -271,6 +296,75 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + this.customSendPortStrategy = + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY, + SINK_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_KEY), + SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY) + .trim() + .toLowerCase(); + + validator.validate( + arg -> CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET.contains(customSendPortStrategy), + String.format( + "send port restriction strategy should be one of %s, but got %s.", + CONNECTOR_IOTDB_SEND_PORT_RESTRICTION_STRATEGY_SET, customSendPortStrategy), + customSendPortStrategy); + + if (SINK_IOTDB_SEND_PORT_RESTRICTION_RANGE_STRATEGY.equals(customSendPortStrategy)) { + minSendPortRange = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SEND_PORT_MIN_KEY, SINK_IOTDB_SEND_PORT_MIN_KEY), + CONNECTOR_IOTDB_SEND_PORT_MIN_VALUE); + maxSendPortRange = + parameters.getIntOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SEND_PORT_MAX_KEY, SINK_IOTDB_SEND_PORT_MAX_KEY), + CONNECTOR_IOTDB_SEND_PORT_MAX_VALUE); + validator.validate( + args -> (int) args[0] <= (int) args[1], + String.format( + "%s must be less than or equal to %s, but got %d > %d.", + SINK_IOTDB_SEND_PORT_MIN_KEY, + SINK_IOTDB_SEND_PORT_MAX_KEY, + minSendPortRange, + maxSendPortRange), + minSendPortRange, + maxSendPortRange); + validator.validate( + args -> (int) args[0] <= (int) args[1] && (int) args[2] >= (int) args[3], + String.format( + "Port range is invalid: %s must be >= %d and %s must be <= %d. Current values are %d and %d respectively.", + SINK_IOTDB_SEND_PORT_MIN_KEY, + MIN_PORT, + SINK_IOTDB_SEND_PORT_MAX_KEY, + MAX_PORT, + minSendPortRange, + maxSendPortRange), + MIN_PORT, + minSendPortRange, + MAX_PORT, + maxSendPortRange); + } else { + this.candidatePorts = + parseCandidatePorts( + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_SEND_PORTS_KEY, SINK_IOTDB_SEND_PORT_CANDIDATE_KEY), + CONNECTOR_IOTDB_SEND_PORTS_CANDIDATE_VALUE)); + validator.validate( + arg -> (int) arg > 0, + "The number of candidate ports must be greater than 0.", + candidatePorts.size()); + validator.validate( + arg -> (int) arg[0] >= MIN_PORT && (int) arg[1] <= MAX_PORT, + String.format( + "Candidate port range is invalid: Ports must be between 0 and 65535, but got minimum port: %d and maximum port: %d", + candidatePorts.get(0), candidatePorts.get(candidatePorts.size() - 1)), + candidatePorts.get(0), + candidatePorts.get(candidatePorts.size() - 1)); + } + loadBalanceStrategy = parameters .getStringOrDefault( @@ -577,6 +671,17 @@ private void checkNodeUrls(final Set nodeUrls) throws PipeParameterNo } } + private static List parseCandidatePorts(String candidate) { + if (candidate == null || candidate.isEmpty()) { + return Collections.emptyList(); + } + return Arrays.stream(candidate.split(",")) + .map(String::trim) + .map(Integer::parseInt) + .sorted() + .collect(Collectors.toList()); + } + @Override public void close() { // TODO: Not all the limiters should be closed here, but it's fine for now. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java index 75a4607a23b6b..e1bff7511fc43 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/protocol/IoTDBSslSyncSink.java @@ -141,6 +141,10 @@ public void customize( loadTsFileStrategy, loadTsFileValidation, shouldMarkAsPipeRequest, + customSendPortStrategy, + minSendPortRange, + maxSendPortRange, + candidatePorts, skipIfNoPrivileges); } @@ -159,6 +163,10 @@ protected abstract IoTDBSyncClientManager constructClient( final String loadTsFileStrategy, final boolean validateTsFile, final boolean shouldMarkAsPipeRequest, + final String customSendPortStrategy, + final int minSendPortRange, + final int maxSendPortRange, + final List candidatePorts, final boolean skipIfNoPrivileges); @Override diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java new file mode 100644 index 0000000000000..e6c84451d7907 --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/function/Consumer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.utils.function; + +public interface Consumer { + void accept(INPUT1 var1) throws THROWABLE; +}