diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index d95d51e390eb..ef4a4910e1b8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -1939,6 +1939,21 @@ private String getNewTsFileName(long time, long version, int mergeCnt, int unseq return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt); } + /** + * close the TsFile represented by the given resource, thread-safe + * + * @param tsFileResource TsFile to be closed + * @return a future related to the close task + */ + public Future asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) { + writeLock("asyncCloseOneTsFileProcessor"); + try { + return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), tsFileResource.getProcessor()); + } finally { + writeUnlock(); + } + } + /** * close one tsfile processor, thread-safety should be ensured by caller * @@ -1946,31 +1961,26 @@ private String getNewTsFileName(long time, long version, int mergeCnt, int unseq * @param tsFileProcessor tsfile processor */ public Future asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) { - // for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed. - // for unsequence tsfile, we have maintained the endTimeMap when an insertion comes. - if (closingSequenceTsFileProcessor.contains(tsFileProcessor) - || closingUnSequenceTsFileProcessor.contains(tsFileProcessor) - || tsFileProcessor.alreadyMarkedClosing()) { + if (tsFileProcessor == null) { return CompletableFuture.completedFuture(null); } - Future future; - if (sequence) { - closingSequenceTsFileProcessor.add(tsFileProcessor); - future = tsFileProcessor.asyncClose(); - if (future.isDone()) { - closingSequenceTsFileProcessor.remove(tsFileProcessor); - } + if (tsFileProcessor.getCloseFuture() != null) { + return tsFileProcessor.getCloseFuture(); + } - workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); - } else { - closingUnSequenceTsFileProcessor.add(tsFileProcessor); - future = tsFileProcessor.asyncClose(); - if (future.isDone()) { - closingUnSequenceTsFileProcessor.remove(tsFileProcessor); - } + Future future; + Set closingTsFileProcessors = + sequence ? closingSequenceTsFileProcessor : closingUnSequenceTsFileProcessor; + TreeMap workTsFileProcessors = + sequence ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors; - workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); + closingTsFileProcessors.add(tsFileProcessor); + future = tsFileProcessor.asyncClose(); + if (future.isDone()) { + closingTsFileProcessors.remove(tsFileProcessor); } + workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId()); + TsFileResource resource = tsFileProcessor.getTsFileResource(); logger.info( "Async close tsfile: {}, file start time: {}, file end time: {}", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index acdac61180bf..78b33c02c0bd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -210,6 +210,8 @@ public class TsFileProcessor { private int walEntryNum = 0; + private volatile Future closeFuture; + @SuppressWarnings("squid:S107") public TsFileProcessor( String dataRegionName, @@ -1250,10 +1252,13 @@ public void syncClose() throws ExecutionException { logger.info( "Sync close file: {}, will firstly async close it", tsFileResource.getTsFile().getAbsolutePath()); - if (shouldClose) { - return; - } + try { + if (closeFuture != null) { + closeFuture.get(); + return; + } + asyncClose().get(); logger.info("Start to wait until file {} is closed", tsFileResource); // if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly, @@ -1273,6 +1278,10 @@ public Future asyncClose() { flushQueryLock.writeLock().lock(); logFlushQueryWriteLocked(); try { + if (closeFuture != null) { + return closeFuture; + } + if (logger.isDebugEnabled()) { if (workMemTable != null) { logger.debug( @@ -1293,10 +1302,6 @@ public Future asyncClose() { tsFileResource.getTsFileSize()); } } - - if (shouldClose) { - return CompletableFuture.completedFuture(null); - } // when a flush thread serves this TsFileProcessor (because the processor is submitted by // registerTsFileProcessor()), the thread will seal the corresponding TsFile and // execute other cleanup works if "shouldClose == true and flushingMemTables is empty". @@ -1315,6 +1320,7 @@ public Future asyncClose() { // flushing memTable in System module. Future future = addAMemtableIntoFlushingList(tmpMemTable); shouldClose = true; + closeFuture = future; return future; } catch (Exception e) { logger.error( @@ -1794,6 +1800,9 @@ public boolean isManagedByFlushManager() { public void setManagedByFlushManager(boolean managedByFlushManager) { this.managedByFlushManager = managedByFlushManager; + if (!managedByFlushManager) { + closeFuture = CompletableFuture.completedFuture(null); + } } /** Close this tsfile */ @@ -2384,4 +2393,12 @@ private void logFlushQueryReadUnlocked() { public String toString() { return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}'; } + + public Future getCloseFuture() { + return closeFuture; + } + + public void setCloseFuture(Future closeFuture) { + this.closeFuture = closeFuture; + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java index c3895a058c63..0cb7143e708a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/DataRegionTest.java @@ -93,9 +93,13 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertRowNode; import static org.apache.iotdb.db.queryengine.plan.statement.StatementTestUtils.genInsertTabletNode; +import static org.junit.Assert.assertTrue; public class DataRegionTest { private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -266,7 +270,7 @@ device, new MeasurementSchema(measurementId, TSDataType.INT32))), null); Assert.assertEquals(10, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -314,7 +318,7 @@ public void testRelationalTabletWriteAndSyncClose() Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -362,7 +366,7 @@ public void testRelationRowWriteAndSyncClose() Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -451,7 +455,7 @@ public void testIoTDBTabletWriteAndSyncClose() Assert.assertEquals(2, queryDataSource.getSeqResources().size()); Assert.assertEquals(1, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -518,7 +522,7 @@ public void testIoTDBTabletWriteAndDeleteDataRegion() times.length); dataRegion.insertTablet(insertTabletNode2); - Assert.assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0); + assertTrue(SystemInfo.getInstance().getTotalMemTableSize() > 0); dataRegion.syncDeleteDataFiles(); Assert.assertEquals(0, SystemInfo.getInstance().getTotalMemTableSize()); @@ -603,7 +607,7 @@ public void testEmptyTabletWriteAndSyncClose() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -679,7 +683,7 @@ public void testAllMeasurementsFailedTabletWriteAndSyncClose() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -706,10 +710,10 @@ public void testSeqAndUnSeqSyncClose() Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(10, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -740,10 +744,10 @@ public void testAllMeasurementsFailedRecordSeqAndUnSeqSyncClose() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } } @@ -773,10 +777,10 @@ public void testDisableSeparateDataForInsertRowPlan() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(20, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultValue); @@ -855,7 +859,7 @@ public void testDisableSeparateDataForInsertTablet1() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -935,7 +939,7 @@ public void testDisableSeparateDataForInsertTablet2() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -1015,7 +1019,7 @@ public void testDisableSeparateDataForInsertTablet3() Assert.assertEquals(0, queryDataSource.getSeqResources().size()); Assert.assertEquals(2, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } config.setEnableSeparateData(defaultEnableDiscard); @@ -1055,7 +1059,7 @@ tmpDeviceId, new MeasurementSchema(measurementId, TSDataType.INT32))), Assert.assertEquals(1, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } dataRegion1.syncDeleteDataFiles(); } @@ -1092,10 +1096,10 @@ tmpDeviceId, new MeasurementSchema(measurementId, TSDataType.INT32))), Assert.assertEquals(10, queryDataSource.getSeqResources().size()); Assert.assertEquals(0, queryDataSource.getUnseqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } dataRegion1.syncDeleteDataFiles(); @@ -1160,10 +1164,10 @@ public void testMerge() Collections.singletonList(nonAlignedFullPath), device, context, null, null); Assert.assertEquals(2, queryDataSource.getSeqResources().size()); for (TsFileResource resource : queryDataSource.getSeqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } for (TsFileResource resource : queryDataSource.getUnseqResources()) { - Assert.assertTrue(resource.isClosed()); + assertTrue(resource.isClosed()); } IoTDBDescriptor.getInstance() .getConfig() @@ -1232,7 +1236,7 @@ public void testDeleteStorageGroupWhenCompacting() throws Exception { + CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX); Assert.assertFalse(logFile.exists()); Assert.assertFalse(CommonDescriptor.getInstance().getConfig().isReadOnly()); - Assert.assertTrue(dataRegion.getTsFileManager().isAllowCompaction()); + assertTrue(dataRegion.getTsFileManager().isAllowCompaction()); } finally { new CompactionConfigRestorer().restoreCompactionConfig(); } @@ -1392,10 +1396,10 @@ public void testDeleteDataNotInFile() for (int i = 0; i < dataRegion.getSequenceFileList().size(); i++) { TsFileResource resource = dataRegion.getSequenceFileList().get(i); if (i == 1) { - Assert.assertTrue(resource.anyModFileExists()); + assertTrue(resource.anyModFileExists()); Assert.assertEquals(2, resource.getAllModEntries().size()); } else if (i == 3) { - Assert.assertTrue(resource.anyModFileExists()); + assertTrue(resource.anyModFileExists()); Assert.assertEquals(1, resource.getAllModEntries().size()); } else { Assert.assertFalse(resource.anyModFileExists()); @@ -1489,7 +1493,7 @@ public void testDeleteDataInSeqFlushingMemtable() dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), deleteDataNode4); dataRegion.syncCloseAllWorkingTsFileProcessors(); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.anyModFileExists()); Assert.assertEquals(3, tsFileResource.getAllModEntries().size()); } @@ -1584,7 +1588,7 @@ public void testDeleteDataInUnSeqFlushingMemtable() dataRegion.deleteByDevice(new MeasurementPath("root.vehicle.d0.s0"), deleteDataNode12); dataRegion.syncCloseAllWorkingTsFileProcessors(); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.anyModFileExists()); Assert.assertEquals(3, tsFileResource.getAllModEntries().size()); } @@ -1686,7 +1690,7 @@ public void testDeleteDataDirectlySeqWriteModsOrDeleteFiles() new DeleteDataNode(new PlanNodeId("1"), Collections.singletonList(path), 50, 100); deleteDataNode1.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode1); - Assert.assertTrue(tsFileResource.getTsFile().exists()); + assertTrue(tsFileResource.getTsFile().exists()); Assert.assertFalse(tsFileResource.anyModFileExists()); dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -1696,8 +1700,8 @@ public void testDeleteDataDirectlySeqWriteModsOrDeleteFiles() new DeleteDataNode(new PlanNodeId("2"), Collections.singletonList(path), 100, 120); deleteDataNode2.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode2); - Assert.assertTrue(tsFileResource.getTsFile().exists()); - Assert.assertTrue(tsFileResource.anyModFileExists()); + assertTrue(tsFileResource.getTsFile().exists()); + assertTrue(tsFileResource.anyModFileExists()); // delete data in closed file, and time all match DeleteDataNode deleteDataNode3 = @@ -1727,8 +1731,8 @@ public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles() dataRegion.syncCloseWorkingTsFileProcessors(true); TsFileResource tsFileResourceUnSeq = dataRegion.getTsFileManager().getTsFileList(false).get(0); - Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); // already closed, will have a mods file. MeasurementPath path = new MeasurementPath("root.vehicle.d0.**"); @@ -1743,9 +1747,9 @@ public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles() dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode2); // delete data in mem table, there is no mods - Assert.assertTrue(tsFileResourceSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceSeq.anyModFileExists()); + assertTrue(tsFileResourceSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceSeq.anyModFileExists()); Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists()); dataRegion.syncCloseAllWorkingTsFileProcessors(); @@ -1753,8 +1757,8 @@ public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles() new DeleteDataNode(new PlanNodeId("3"), Collections.singletonList(path), 40, 80); deleteDataNode3.setSearchIndex(0); dataRegion.deleteDataDirectly(new MeasurementPath("root.vehicle.d0.**"), deleteDataNode3); - Assert.assertTrue(tsFileResourceUnSeq.getTsFile().exists()); - Assert.assertTrue(tsFileResourceUnSeq.anyModFileExists()); + assertTrue(tsFileResourceUnSeq.getTsFile().exists()); + assertTrue(tsFileResourceUnSeq.anyModFileExists()); // seq file and unseq file have data file and mod file now, // this deletion will remove data file and mod file. @@ -1772,4 +1776,21 @@ public void testDeleteDataDirectlyUnseqWriteModsOrDeleteFiles() Assert.assertFalse(tsFileResourceSeq.anyModFileExists()); Assert.assertFalse(tsFileResourceUnSeq.anyModFileExists()); } + + @Test + public void testFlushSpecifiedResource() + throws IllegalPathException, WriteProcessException, ExecutionException, InterruptedException { + for (int j = 100; j < 200; j++) { + TSRecord record = new TSRecord(deviceId, j); + record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j))); + dataRegion.insert(buildInsertRowNodeByTSRecord(record)); + } + TsFileResource tsFileResourceSeq = dataRegion.getTsFileManager().getTsFileList(true).get(0); + Future future = dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq); + Future future2 = dataRegion.asyncCloseOneTsFileProcessor(tsFileResourceSeq); + assertTrue(future == future2 || future2 instanceof CompletableFuture); + + future.get(); + assertTrue(tsFileResourceSeq.isClosed()); + } }