Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
return true;
}

@Override
public boolean isInit() {
return init;
}

/**
* Init seq file list and unseq file list in {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private boolean isFinishedInternal() {
finished =
state.get() != State.ALIVE
|| driverContext.isDone()
|| root.isFinished()
|| (isInit() && root.isFinished())
|| sink.isClosed();
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -225,6 +225,8 @@ private boolean isFinishedInternal() {
return finished;
}

abstract boolean isInit();

@SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
return true;
}

@Override
boolean isInit() {
return true;
}

@Override
protected void releaseResource() {
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
Expand Down Expand Up @@ -254,4 +256,120 @@ public void batchTest() {
instanceNotificationExecutor.shutdown();
}
}

@Test
public void testCallIsFinishedBeforeDataSourcePrepared() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
IFullPath measurementPath1 =
new NonAlignedFullPath(
IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"),
new MeasurementSchema("sensor0", TSDataType.INT32));
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
allSensors.add("sensor1");
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
DataRegion dataRegion = Mockito.mock(DataRegion.class);
Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
fragmentInstanceContext.setDataRegion(dataRegion);
DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId1 = new PlanNodeId("1");
driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
PlanNodeId planNodeId2 = new PlanNodeId("2");
driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
driverContext.addOperatorContext(
3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName());
driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());

SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(allSensors);
SeriesScanOperator seriesScanOperator1 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(0),
planNodeId1,
measurementPath1,
Ordering.ASC,
scanOptionsBuilder.build());
driverContext.addSourceOperator(seriesScanOperator1);
driverContext.addPath(measurementPath1);
seriesScanOperator1
.getOperatorContext()
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

IFullPath measurementPath2 =
new NonAlignedFullPath(
IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + ".device0"),
new MeasurementSchema("sensor1", TSDataType.INT32));
SeriesScanOperator seriesScanOperator2 =
new SeriesScanOperator(
driverContext.getOperatorContexts().get(1),
planNodeId2,
measurementPath2,
Ordering.ASC,
scanOptionsBuilder.build());
driverContext.addSourceOperator(seriesScanOperator2);
driverContext.addPath(measurementPath2);

seriesScanOperator2
.getOperatorContext()
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

LeftOuterTimeJoinOperator timeJoinOperator =
new LeftOuterTimeJoinOperator(
driverContext.getOperatorContexts().get(2),
seriesScanOperator1,
1,
seriesScanOperator2,
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
new AscTimeComparator());
SingleDeviceViewOperator fakeOperator =
new SingleDeviceViewOperator(
driverContext.getOperatorContexts().get(3),
"d1",
timeJoinOperator,
Arrays.asList(0),
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));

fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
String deviceId = DATA_DRIVER_TEST_SG + ".device0";
Mockito.when(
dataRegion.query(
eq(driverContext.getPaths()),
eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)),
eq(fragmentInstanceContext),
Mockito.isNull(),
Mockito.isNull(),
Mockito.anyLong()))
.thenReturn(null);
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
fragmentInstanceContext.initializeNumOfDrivers(1);

StubSink stubSink = new StubSink(fragmentInstanceContext);
driverContext.setSink(stubSink);
IDriver dataDriver = null;
try {
dataDriver = new DataDriver(fakeOperator, driverContext, 0);
assertEquals(
fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId());
assertFalse(dataDriver.isFinished());
} finally {
if (dataDriver != null) {
dataDriver.close();
}
}
} catch (QueryProcessException e) {
e.printStackTrace();
fail();
} finally {
instanceNotificationExecutor.shutdown();
}
}
}
Loading