-
Notifications
You must be signed in to change notification settings - Fork 94
Description
DataJoint Version
datajoint 0.14.8 (datajoint/external.py, datajoint/s3.py)
Problem
ExternalTable.upload_filepath() only checks the external tracking table (DB) to determine whether a file already exists on S3 before uploading. When a transaction rolls back after a successful S3 upload, the DB tracking entry is lost but the file remains on S3. On retry, upload_filepath finds no DB entry and re-uploads the entire file, even though it's already there.
This is particularly painful for large files (multi-GB). In our case, a 10 GB recording.dat gets uploaded to S3 in ~5 minutes, but the DB connection times out during the enclosing transaction. Every retry re-uploads the same 10 GB file — wasting time and bandwidth — and then fails again at the same point.
Root Cause
S3 uploads are not transactional, but upload_filepath treats them as if they are by relying solely on the DB tracking table:
# external.py lines 293-315
check_hash = (self & {"hash": uuid}).fetch("contents_hash")
if check_hash.size:
# DB entry exists → skip (correct)
else:
# DB entry missing → always upload, even if file is already on S3
self._upload_file(local_filepath, external_path, metadata=...)
self.connection.query("INSERT INTO ...")After a transaction rollback:
- S3 file exists ✓ (upload succeeded before rollback)
- DB tracking entry gone ✗ (rolled back)
- Retry: DB check finds nothing → re-uploads entire file to S3
Proposed Fix
Before uploading, check S3 directly using s3.exists() + s3.get_size() (both already implemented in s3.py). If a file with matching size already exists at the expected path, skip the upload:
check_hash = (self & {"hash": uuid}).fetch("contents_hash")
if check_hash.size:
# DB tracking entry exists — skip
if not skip_checksum and contents_hash != check_hash[0]:
raise DataJointError(...)
else:
external_path = self._make_external_filepath(relative_filepath)
already_uploaded = False
if self.spec["protocol"] == "s3":
if self.s3.exists(str(external_path)):
remote_size = self.s3.get_size(str(external_path))
if remote_size == file_size:
already_uploaded = True
logger.info(
f"File already exists on S3 with matching size, "
f"skipping upload: '{relative_filepath}'"
)
if not already_uploaded:
self._upload_file(
local_filepath, external_path,
metadata={"contents_hash": str(contents_hash) if contents_hash else ""},
)
# Always insert the DB tracking entry
self.connection.query("INSERT INTO ...")For even stronger verification, the contents_hash is already stored in S3 object metadata (set at upload time on line 306). It could be checked via stat_object without downloading the file:
stat = self.s3.client.stat_object(self.bucket, str(external_path))
remote_contents_hash = stat.metadata.get("x-amz-meta-contents_hash")Why This Is Safe
s3.exists()ands3.get_size()are cheapstat_objectcalls (milliseconds)- The external path is deterministic (derived from the relative filepath), so path + size match is a strong identity signal
- If the S3 check fails or the file doesn't exist, it falls through to the normal upload path — zero risk to existing behavior
- This only affects the
elsebranch (no DB entry), so already-tracked files are unaffected
Reproduction
Any dj.Imported table that inserts filepath@store attributes pointing to large files inside make():
make()runs a long computation, then callsself.insert1(...)with afilepath@storeattribute referencing a large file- S3 upload succeeds but takes long enough that the DB connection times out
- Transaction rolls back (
LostConnectionError) - Retry:
upload_filepathfinds no DB entry → re-uploads the same large file → same timeout → infinite retry loop
In our pipeline, this happens with spike sorting, which produces a 10 GB recording.dat artifact.