Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1939,38 +1939,48 @@ private String getNewTsFileName(long time, long version, int mergeCnt, int unseq
return TsFileNameGenerator.generateNewTsFileName(time, version, mergeCnt, unseqCompactionCnt);
}

/**
* close the TsFile represented by the given resource, thread-safe
*
* @param tsFileResource TsFile to be closed
* @return a future related to the close task
*/
public Future<?> asyncCloseOneTsFileProcessor(TsFileResource tsFileResource) {
writeLock("asyncCloseOneTsFileProcessor");
try {
return asyncCloseOneTsFileProcessor(tsFileResource.isSeq(), tsFileResource.getProcessor());
} finally {
writeUnlock();
}
}

/**
* close one tsfile processor, thread-safety should be ensured by caller
*
* @param sequence whether this tsfile processor is sequence or not
* @param tsFileProcessor tsfile processor
*/
public Future<?> asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
// for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
// for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (closingSequenceTsFileProcessor.contains(tsFileProcessor)
|| closingUnSequenceTsFileProcessor.contains(tsFileProcessor)
|| tsFileProcessor.alreadyMarkedClosing()) {
if (tsFileProcessor == null) {
return CompletableFuture.completedFuture(null);
}
Future<?> future;
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
future = tsFileProcessor.asyncClose();
if (future.isDone()) {
closingSequenceTsFileProcessor.remove(tsFileProcessor);
}
if (tsFileProcessor.getCloseFuture() != null) {
return tsFileProcessor.getCloseFuture();
}

workSequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
} else {
closingUnSequenceTsFileProcessor.add(tsFileProcessor);
future = tsFileProcessor.asyncClose();
if (future.isDone()) {
closingUnSequenceTsFileProcessor.remove(tsFileProcessor);
}
Future<?> future;
Set<TsFileProcessor> closingTsFileProcessors =
sequence ? closingSequenceTsFileProcessor : closingUnSequenceTsFileProcessor;
TreeMap<Long, TsFileProcessor> workTsFileProcessors =
sequence ? workSequenceTsFileProcessors : workUnsequenceTsFileProcessors;

workUnsequenceTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());
closingTsFileProcessors.add(tsFileProcessor);
future = tsFileProcessor.asyncClose();
if (future.isDone()) {
closingTsFileProcessors.remove(tsFileProcessor);
}
workTsFileProcessors.remove(tsFileProcessor.getTimeRangeId());

TsFileResource resource = tsFileProcessor.getTsFileResource();
logger.info(
"Async close tsfile: {}, file start time: {}, file end time: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ public class TsFileProcessor {

private int walEntryNum = 0;

private volatile Future<?> closeFuture;

@SuppressWarnings("squid:S107")
public TsFileProcessor(
String dataRegionName,
Expand Down Expand Up @@ -1250,10 +1252,13 @@ public void syncClose() throws ExecutionException {
logger.info(
"Sync close file: {}, will firstly async close it",
tsFileResource.getTsFile().getAbsolutePath());
if (shouldClose) {
return;
}

try {
if (closeFuture != null) {
closeFuture.get();
return;
}

asyncClose().get();
logger.info("Start to wait until file {} is closed", tsFileResource);
// if this TsFileProcessor is closing, asyncClose().get() of this thread will return quickly,
Expand All @@ -1273,6 +1278,10 @@ public Future<?> asyncClose() {
flushQueryLock.writeLock().lock();
logFlushQueryWriteLocked();
try {
if (closeFuture != null) {
return closeFuture;
}

if (logger.isDebugEnabled()) {
if (workMemTable != null) {
logger.debug(
Expand All @@ -1293,10 +1302,6 @@ public Future<?> asyncClose() {
tsFileResource.getTsFileSize());
}
}

if (shouldClose) {
return CompletableFuture.completedFuture(null);
}
// when a flush thread serves this TsFileProcessor (because the processor is submitted by
// registerTsFileProcessor()), the thread will seal the corresponding TsFile and
// execute other cleanup works if "shouldClose == true and flushingMemTables is empty".
Expand All @@ -1315,6 +1320,7 @@ public Future<?> asyncClose() {
// flushing memTable in System module.
Future<?> future = addAMemtableIntoFlushingList(tmpMemTable);
shouldClose = true;
closeFuture = future;
return future;
} catch (Exception e) {
logger.error(
Expand Down Expand Up @@ -1794,6 +1800,9 @@ public boolean isManagedByFlushManager() {

public void setManagedByFlushManager(boolean managedByFlushManager) {
this.managedByFlushManager = managedByFlushManager;
if (!managedByFlushManager) {
closeFuture = CompletableFuture.completedFuture(null);
}
}

/** Close this tsfile */
Expand Down Expand Up @@ -2384,4 +2393,12 @@ private void logFlushQueryReadUnlocked() {
public String toString() {
return "TsFileProcessor{" + "tsFileResource=" + tsFileResource + '}';
}

public Future<?> getCloseFuture() {
return closeFuture;
}

public void setCloseFuture(Future<?> closeFuture) {
this.closeFuture = closeFuture;
}
}
Loading
Loading