diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java index 743a05d37775a..c9c41f5b993ab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java @@ -74,6 +74,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + public boolean isInit() { + return init; + } + /** * Init seq file list and unseq file list in {@link * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java index 690830a9b38ef..ad5eba152292e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java @@ -214,7 +214,7 @@ private boolean isFinishedInternal() { finished = state.get() != State.ALIVE || driverContext.isDone() - || root.isFinished() + || (isInit() && root.isFinished()) || sink.isClosed(); } catch (Exception e) { throw new RuntimeException(e); @@ -225,6 +225,8 @@ private boolean isFinishedInternal() { return finished; } + abstract boolean isInit(); + @SuppressWarnings({"squid:S1181", "squid:S112"}) private ListenableFuture processInternal() { long startTimeNanos = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java index b507aa1c7487d..e3230b7ff7f33 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java @@ -42,6 +42,11 @@ protected boolean init(SettableFuture blockedFuture) { return true; } + @Override + boolean isInit() { + return true; + } + @Override protected void releaseResource() { driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java index 0af5098d4ba07..c24fd3dac57d3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java @@ -35,7 +35,9 @@ import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState; import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine; import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator; +import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator; import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger; import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator; @@ -254,4 +256,120 @@ public void batchTest() { instanceNotificationExecutor.shutdown(); } } + + @Test + public void testCallIsFinishedBeforeDataSourcePrepared() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + IFullPath measurementPath1 = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor0", TSDataType.INT32)); + Set allSensors = new HashSet<>(); + allSensors.add("sensor0"); + allSensors.add("sensor1"); + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + fragmentInstanceContext.setDataRegion(dataRegion); + DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0); + PlanNodeId planNodeId1 = new PlanNodeId("1"); + driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName()); + PlanNodeId planNodeId2 = new PlanNodeId("2"); + driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName()); + driverContext.addOperatorContext( + 3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName()); + driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName()); + + SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder(); + scanOptionsBuilder.withAllSensors(allSensors); + SeriesScanOperator seriesScanOperator1 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(0), + planNodeId1, + measurementPath1, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator1); + driverContext.addPath(measurementPath1); + seriesScanOperator1 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + IFullPath measurementPath2 = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"), + new MeasurementSchema("sensor1", TSDataType.INT32)); + SeriesScanOperator seriesScanOperator2 = + new SeriesScanOperator( + driverContext.getOperatorContexts().get(1), + planNodeId2, + measurementPath2, + Ordering.ASC, + scanOptionsBuilder.build()); + driverContext.addSourceOperator(seriesScanOperator2); + driverContext.addPath(measurementPath2); + + seriesScanOperator2 + .getOperatorContext() + .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + LeftOuterTimeJoinOperator timeJoinOperator = + new LeftOuterTimeJoinOperator( + driverContext.getOperatorContexts().get(2), + seriesScanOperator1, + 1, + seriesScanOperator2, + Arrays.asList(TSDataType.INT32, TSDataType.INT32), + new AscTimeComparator()); + SingleDeviceViewOperator fakeOperator = + new SingleDeviceViewOperator( + driverContext.getOperatorContexts().get(3), + "d1", + timeJoinOperator, + Arrays.asList(0), + Arrays.asList(TSDataType.INT32, TSDataType.INT32)); + fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS)); + + fragmentInstanceContext.setSourcePaths(driverContext.getPaths()); + String deviceId = DATA_DRIVER_TEST_SG + ".device0"; + Mockito.when( + dataRegion.query( + eq(driverContext.getPaths()), + eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)), + eq(fragmentInstanceContext), + Mockito.isNull(), + Mockito.isNull(), + Mockito.anyLong())) + .thenReturn(null); + fragmentInstanceContext.initQueryDataSource(driverContext.getPaths()); + fragmentInstanceContext.initializeNumOfDrivers(1); + + StubSink stubSink = new StubSink(fragmentInstanceContext); + driverContext.setSink(stubSink); + IDriver dataDriver = null; + try { + dataDriver = new DataDriver(fakeOperator, driverContext, 0); + assertEquals( + fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId()); + assertFalse(dataDriver.isFinished()); + } finally { + if (dataDriver != null) { + dataDriver.close(); + } + } + } catch (QueryProcessException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } }