-
Notifications
You must be signed in to change notification settings - Fork 37
feat/clean #329
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat/clean #329
Conversation
# Conflicts: # frontend/src/pages/OperatorMarket/Home/OperatorMarket.tsx # frontend/src/pages/OperatorMarket/operator.const.tsx
| configure: (proxy, options) => { | ||
| // proxy 是 'http-proxy' 的实例 | ||
| proxy.on("proxyReq", (proxyReq, req, res) => { | ||
| // 可以在这里修改请求头 |
Check warning
Code scanning / CodeQL
Replacement of a substring with itself Medium
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI 1 day ago
To fix the problem, the rewrite function must either be removed (if no path change is needed) or updated so that it performs an actual transformation. In most Vite setups using a /api proxy, the desired behavior is to strip the /api prefix before forwarding to the backend (so frontend calls /api/users while backend exposes /users). That would turn path.replace(/^\/api/, "") into a meaningful rewrite.
The best way to fix this without changing intended functionality is:
- If the backend endpoints already include
/api(e.g./api/users), then remove therewriteoption entirely, since it is redundant and misleading. - If the backend endpoints do not include
/api(common case), change the rewrite topath.replace(/^\/api/, "")so that/api/foobecomes/fooon the backend.
Given only this snippet and the proxy target at http://localhost:8080, the more conventional and likely intent is to strip /api. Concretely, in frontend/vite.config.ts at line 21, replace:
rewrite: (path) => path.replace(/^\/api/, "/api"),with:
rewrite: (path) => path.replace(/^\/api/, ""),No new imports or additional methods are required.
-
Copy modified line R21
| @@ -18,7 +18,7 @@ | ||
| target: "http://localhost:8080", // 本地后端服务地址 | ||
| changeOrigin: true, | ||
| secure: false, | ||
| rewrite: (path) => path.replace(/^\/api/, "/api"), | ||
| rewrite: (path) => path.replace(/^\/api/, ""), | ||
| configure: (proxy, options) => { | ||
| // proxy 是 'http-proxy' 的实例 | ||
| proxy.on("proxyReq", (proxyReq, req, res) => { |
| ) -> None: | ||
| """Scan source dataset and create dataset.jsonl""" | ||
| target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl") | ||
| target_file_path.parent.mkdir(parents=True, exist_ok=True) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, the issue should be fixed by validating and constraining the user-controlled task_id before using it in filesystem paths. A robust approach here is to (1) ensure the identifier matches a safe pattern (for example, UUID-like or alphanumeric with limited symbols) and (2) build paths using pathlib.Path and, if relevant, normalize and confirm that the resulting path stays under a known root directory. This prevents directory traversal (../ segments) and absolute path injection, while preserving existing behavior for legitimate IDs.
The best minimally invasive fix, consistent with the rest of this service, is to reuse the existing CleaningTaskValidator.check_task_id for all operations that use task_id in filesystem paths, not just delete_task. Concretely:
- In
scan_dataset, before constructingtarget_file_path, callself.validator.check_task_id(task_id)so that only valid task IDs are allowed to impact the path. - In
execute_task, callself.validator.check_task_id(task_id)at the beginning, similar todelete_task.
This relies on the already-defined validator to enforce a safe format (likely rejecting path separators,.., etc.), and avoids changing the semantics of how task directories are structured. No new imports are needed;CleaningTaskValidatoris already imported, andself.validatoris already used in this class.
Specifically:
- Edit
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py.- Around line 296–305 in
scan_dataset, insertself.validator.check_task_id(task_id)beforetarget_file_path = Path(...). - Around line 395–400 in
execute_task, insertself.validator.check_task_id(task_id)at the start of the method body.
No changes are required tocleaning_task_routes.py, and no additional helper methods or external libraries are necessary.
- Around line 296–305 in
-
Copy modified line R304 -
Copy modified line R398
| @@ -301,6 +301,7 @@ | ||
| succeed_files: Set[str] | None = None, | ||
| ) -> None: | ||
| """Scan source dataset and create dataset.jsonl""" | ||
| self.validator.check_task_id(task_id) | ||
| target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl") | ||
| target_file_path.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| @@ -394,6 +395,7 @@ | ||
|
|
||
| async def execute_task(self, db: AsyncSession, task_id: str) -> bool: | ||
| """Execute task""" | ||
| self.validator.check_task_id(task_id) | ||
| succeeded = await self.result_repo.find_by_instance_id(db, task_id, "COMPLETED") | ||
| succeed_set = {res.src_file_id for res in succeeded} | ||
|
|
| result = await db.execute(query, {"dataset_id": src_dataset_id}) | ||
| files = result.fetchall() | ||
|
|
||
| with open(target_file_path, 'w', encoding='utf-8') as f: |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, fix this by ensuring that the task_id used to build filesystem paths is validated/normalized before use, and that the final path is constrained to a known safe root directory (FLOW_PATH). Even if task_id is conceptually a UUID, we should not rely on that without explicit checks. The safest pattern is to: (1) validate the string format (for example, allow only UUID-like or alphanumeric IDs), (2) join it to the base directory using Path, (3) resolve the path, and (4) verify that the resolved path is still under the intended base directory.
The best fix with minimal behavioral change is to introduce a small internal helper method in CleaningTaskService that converts a task_id into a safe task directory path under FLOW_PATH. This helper will: (a) optionally reuse the existing validator.check_task_id where appropriate, (b) reject IDs containing path separators, and (c) enforce that the resolved path remains under FLOW_PATH. Then we will use this helper in both scan_dataset (for dataset.jsonl) and delete_task (for the directory removal) instead of interpolating task_id directly into an f-string. This adds explicit security while preserving where files are stored and how they’re named for valid IDs.
Concretely, in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py:
- Add
import osnear the top so we can check foros.sepandos.path.altsep. - Add a private method
_get_safe_task_path(self, task_id: str) -> Pathnear the other helpers (for example just after_get_runtime_config). This method will:- Build
base_path = Path(FLOW_PATH).resolve(). - Ensure
task_iddoes not contain directory separators (os.seporos.path.altsepif set). - Construct
task_path = (base_path / task_id).resolve(). - Verify that
task_pathis withinbase_path(e.g.task_path.is_relative_to(base_path)on Python 3.9+, or equivalent prefix check). - Raise
BusinessError(ErrorCodes.INVALID_PARAMS, "Invalid task id")or similar on violation.
- Build
- Update
scan_datasetto obtaintask_dir = self._get_safe_task_path(task_id)and then settarget_file_path = task_dir / "dataset.jsonl"instead of using the f-string. - Update
delete_taskto usetask_path = self._get_safe_task_path(task_id)instead ofPath(f"{FLOW_PATH}/{task_id}"). We can keep the existingself.validator.check_task_id(task_id); the new check will be an extra layer.
No changes are required in cleaning_task_routes.py because the route simply passes task_id to the service; all security hardening will be done in the service layer.
-
Copy modified line R7 -
Copy modified lines R297-R322 -
Copy modified lines R331-R332 -
Copy modified line R416
| @@ -4,6 +4,7 @@ | ||
| import uuid | ||
| from pathlib import Path | ||
| from typing import List, Dict, Any, Set | ||
| import os | ||
|
|
||
| from sqlalchemy import text | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| @@ -293,6 +294,32 @@ | ||
| logger.error(f"Failed to parse runtime config: {e}") | ||
| return {} | ||
|
|
||
| def _get_safe_task_path(self, task_id: str) -> Path: | ||
| """ | ||
| Build a filesystem path for a task under the configured FLOW_PATH, | ||
| ensuring that the resulting path cannot escape the base directory. | ||
| """ | ||
| base_path = Path(FLOW_PATH).resolve() | ||
|
|
||
| # Disallow path separators in task_id to prevent traversal or nested paths. | ||
| if os.sep in task_id or (os.path.altsep and os.path.altsep in task_id): | ||
| raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}") | ||
|
|
||
| task_path = (base_path / task_id).resolve() | ||
|
|
||
| # Ensure the resolved path is still under the base FLOW_PATH directory. | ||
| try: | ||
| # Python 3.9+: use is_relative_to if available | ||
| is_relative = task_path.is_relative_to(base_path) # type: ignore[attr-defined] | ||
| except AttributeError: | ||
| # Fallback for older Python versions | ||
| is_relative = str(task_path).startswith(str(base_path) + os.sep) | ||
|
|
||
| if not is_relative: | ||
| raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}") | ||
|
|
||
| return task_path | ||
|
|
||
| async def scan_dataset( | ||
| self, | ||
| db: AsyncSession, | ||
| @@ -301,7 +328,8 @@ | ||
| succeed_files: Set[str] | None = None, | ||
| ) -> None: | ||
| """Scan source dataset and create dataset.jsonl""" | ||
| target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl") | ||
| task_dir = self._get_safe_task_path(task_id) | ||
| target_file_path = task_dir / "dataset.jsonl" | ||
| target_file_path.parent.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| query = text(""" | ||
| @@ -385,7 +413,7 @@ | ||
| await self.operator_instance_repo.delete_by_instance_id(db, task_id) | ||
| await self.result_repo.delete_by_instance_id(db, task_id) | ||
|
|
||
| task_path = Path(f"{FLOW_PATH}/{task_id}") | ||
| task_path = self._get_safe_task_path(task_id) | ||
| if task_path.exists(): | ||
| try: | ||
| shutil.rmtree(task_path) |
| if retry_count > 0: | ||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") | ||
|
|
||
| if not log_path.exists(): |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
This path depends on a
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, to fix uncontrolled-path issues, construct paths using a trusted root directory, normalize the resulting path (e.g., with Path.resolve() or os.path.realpath()), and verify that the normalized path is still within the trusted root. Also, avoid embedding user input directly into path segments if you can instead validate or constrain it (e.g., to UUIDs or numeric IDs).
For this specific code, the best fix is to (1) build log file paths by joining FLOW_PATH (a trusted base) with a single (sanitized) subdirectory for the task, (2) resolve the final path and ensure it lives under FLOW_PATH, and (3) keep existing behavior otherwise. We can also further enforce that task_id is a safe directory name via a simple regex, even though check_task_id already exists, so the path validation is explicit and local.
Concretely in cleaning_task_service.py:
- Introduce a helper function
_build_safe_log_path(task_id: str, retry_count: int) -> Path(or inline logic) that:- Uses
Path(FLOW_PATH) / task_id / filenameto build the path. - Normalizes with
.resolve(). - Checks that the resolved path is still inside
FLOW_PATHusingis_relative_toif available, or a fallback based onrelative_to/ prefix check. - Raises
BusinessError(ErrorCodes.INVALID_PARAM, "Invalid task_id or retry_count for log path")(or similar) if the path would escape the root.
- Uses
- In
get_task_log, replace direct string interpolation with calls to that helper and use the validatedlog_pathfor.exists()andopen(). - Optionally perform a lightweight pattern check on
task_id(e.g., allow only[A-Za-z0-9_-]+) to ensure it is a safe directory name; this reinforces safety even if the root check covers most issues.
We only touch the code region around get_task_log in cleaning_task_service.py and, if needed, add a small helper method in the same class. No changes are needed in cleaning_task_routes.py.
-
Copy modified lines R296-R328 -
Copy modified line R372
| @@ -293,6 +293,39 @@ | ||
| logger.error(f"Failed to parse runtime config: {e}") | ||
| return {} | ||
|
|
||
| def _build_safe_log_path(self, task_id: str, retry_count: int) -> Path: | ||
| """ | ||
| Build a safe log file path under FLOW_PATH for the given task and retry count. | ||
|
|
||
| Ensures that the resulting path does not escape the FLOW_PATH directory. | ||
| """ | ||
| # Basic sanity check on task_id to avoid path separators or traversal tokens | ||
| if not re.fullmatch(r"[A-Za-z0-9_\-]+", task_id): | ||
| raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid task_id format") | ||
|
|
||
| base_path = Path(FLOW_PATH) | ||
| if retry_count > 0: | ||
| candidate = base_path / task_id / f"output.log.{retry_count}" | ||
| else: | ||
| candidate = base_path / task_id / "output.log" | ||
|
|
||
| try: | ||
| resolved = candidate.resolve() | ||
| except FileNotFoundError: | ||
| # Parent directories may not exist yet; resolve base and manually join | ||
| resolved = (base_path.resolve() / task_id / candidate.name).resolve() | ||
|
|
||
| base_resolved = base_path.resolve() | ||
|
|
||
| # Python 3.9+ has is_relative_to; fall back to try/except for compatibility | ||
| try: | ||
| resolved.relative_to(base_resolved) | ||
| except ValueError: | ||
| # Path escapes the allowed root | ||
| raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid log path") | ||
|
|
||
| return resolved | ||
|
|
||
| async def scan_dataset( | ||
| self, | ||
| db: AsyncSession, | ||
| @@ -336,9 +369,7 @@ | ||
| """Get task log""" | ||
| self.validator.check_task_id(task_id) | ||
|
|
||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") | ||
| if retry_count > 0: | ||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") | ||
| log_path = self._build_safe_log_path(task_id, retry_count) | ||
|
|
||
| if not log_path.exists(): | ||
| return [] |
| ) | ||
| exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b") | ||
|
|
||
| with open(log_path, 'r', encoding='utf-8') as f: |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
This path depends on a
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, to fix uncontrolled path usage you should (1) treat the user-controlled parts as opaque names, not fragments of filesystem paths; (2) combine them with a fixed, known-safe base directory using Path/os.path.join; (3) normalize the resulting path; and (4) verify that the normalized path is still within the intended base directory before accessing the filesystem. Additionally, forbidding path separators in identifiers (like task_id) adds defense in depth.
For this specific code, the best fix without changing existing functionality is:
- Introduce a safe base directory
FLOW_PATH(if not already defined) as aPathinstance or use it as such. - In
get_task_log, constructlog_pathby combining the base path andtask_idusingPathrather than manual f-strings, then normalize and verify that the resulting path is withinFLOW_PATH. - Continue to use
retry_countin the filename (as currently) but only after the base path +task_iddirectory has been validated;retry_countitself is an integer and only influences the filename suffix. - Keep calling
self.validator.check_task_id(task_id)as it may do additional checks.
Concretely, within cleaning_task_service.py:
-
Import
osso we can useos.path.commonpath(a standard library function) or, alternatively, rely onPath.resolve()and prefix checks. -
In
get_task_log, replace the simplePath(f"{FLOW_PATH}/{task_id}/output.log")logic with:- Construct an initial directory:
task_dir = (Path(FLOW_PATH) / task_id).resolve(). - Verify
task_diris withinFLOW_PATH: e.g.if FLOW_PATH_PATH not in task_dir.parents and task_dir != FLOW_PATH_PATH: raise BusinessError(...)or useos.path.commonpath. - Based on
retry_count, select the file path undertask_dir(task_dir / "output.log"ortask_dir / f"output.log.{retry_count}").
- Construct an initial directory:
-
Ensure
FLOW_PATHis used consistently as aPathor is wrapped byPath(FLOW_PATH)before operations. Because we’re not allowed to assume or modify whereFLOW_PATHis defined, we’ll wrap it locally withPath(FLOW_PATH)and perform checks relative to that.
No changes are needed in cleaning_task_routes.py to satisfy CodeQL: we keep the route signatures identical, and all validation happens in the service.
-
Copy modified line R7 -
Copy modified lines R340-R348 -
Copy modified line R350
| @@ -4,6 +4,7 @@ | ||
| import uuid | ||
| from pathlib import Path | ||
| from typing import List, Dict, Any, Set | ||
| import os | ||
|
|
||
| from sqlalchemy import text | ||
| from sqlalchemy.ext.asyncio import AsyncSession | ||
| @@ -336,9 +337,17 @@ | ||
| """Get task log""" | ||
| self.validator.check_task_id(task_id) | ||
|
|
||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") | ||
| # Ensure that the log file path stays within the configured FLOW_PATH directory | ||
| flow_base_path = Path(FLOW_PATH).resolve() | ||
| task_dir = (flow_base_path / task_id).resolve() | ||
| # Verify that the resolved task_dir is within the FLOW_PATH directory | ||
| if os.path.commonpath([str(flow_base_path), str(task_dir)]) != str(flow_base_path): | ||
| logger.warning("Attempt to access log outside of FLOW_PATH: task_id=%s, resolved=%s", task_id, task_dir) | ||
| raise BusinessError(ErrorCodes.INVALID_PARAMS, "Invalid task_id") | ||
|
|
||
| log_path = task_dir / "output.log" | ||
| if retry_count > 0: | ||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") | ||
| log_path = task_dir / f"output.log.{retry_count}" | ||
|
|
||
| if not log_path.exists(): | ||
| return [] |
| await self.result_repo.delete_by_instance_id(db, task_id) | ||
|
|
||
| task_path = Path(f"{FLOW_PATH}/{task_id}") | ||
| if task_path.exists(): |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, when using user-influenced data to build filesystem paths, normalize the resulting path and ensure it remains under a trusted base directory, rejecting any path that escapes this root. This prevents directory traversal attacks such as ../../.. segments or absolute paths.
For this specific case, the best fix is to:
- Build the task directory path by joining
FLOW_PATHandtask_id. - Normalize/resolve that combined path.
- Ensure that the normalized path is still inside the normalized
FLOW_PATHdirectory. - Only then call
shutil.rmtreeon the safe, normalized path. Otherwise, log a warning and skip deletion (or raise a controlled error).
Concretely, in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py inside delete_task:
-
Replace
task_path = Path(f"{FLOW_PATH}/{task_id}")with something like:base_path = Path(FLOW_PATH).resolve() candidate_path = (base_path / task_id).resolve() if base_path in candidate_path.parents or candidate_path == base_path: task_path = candidate_path ... else: logger.warning("Refusing to delete task path outside FLOW_PATH: %s", candidate_path)
(we’ll slightly adjust this to avoid deleting
base_pathitself and to keep the code compact). -
Keep the existing existence check and
shutil.rmtreecall, but operate on the validatedtask_path. If validation fails, do not callrmtree.
No new imports beyond Path (already imported) are needed. All changes are limited to the delete_task method body around lines 380–393 in cleaning_task_service.py. The routes file (cleaning_task_routes.py) does not need changes.
-
Copy modified lines R388-R396
| @@ -385,7 +385,15 @@ | ||
| await self.operator_instance_repo.delete_by_instance_id(db, task_id) | ||
| await self.result_repo.delete_by_instance_id(db, task_id) | ||
|
|
||
| task_path = Path(f"{FLOW_PATH}/{task_id}") | ||
| base_path = Path(FLOW_PATH).resolve() | ||
| candidate_path = (base_path / task_id).resolve() | ||
|
|
||
| # Ensure the resolved task path is a subdirectory of the configured FLOW_PATH | ||
| if base_path not in candidate_path.parents: | ||
| logger.warning(f"Refusing to delete task path outside FLOW_PATH: {candidate_path}") | ||
| return | ||
|
|
||
| task_path = candidate_path | ||
| if task_path.exists(): | ||
| try: | ||
| shutil.rmtree(task_path) |
| task_path = Path(f"{FLOW_PATH}/{task_id}") | ||
| if task_path.exists(): | ||
| try: | ||
| shutil.rmtree(task_path) |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 20 hours ago
In general, the fix is to ensure that any path derived from user input is (1) rooted under a trusted base directory and (2) normalized before use, and then checked to confirm it has not escaped that base directory. For destructive operations like shutil.rmtree, this is especially important to prevent an attacker from deleting arbitrary parts of the filesystem.
For this specific case, we should:
- Construct
task_pathusingPath(FLOW_PATH) / task_id. - Resolve it to an absolute, normalized path via
.resolve(). - Ensure that the resolved
task_pathis a descendant of the resolved base pathFLOW_PATH(e.g., usingis_relative_towhen available, or a safe fallback). - If the check fails, log a warning and abort deletion (or raise an error), rather than calling
shutil.rmtree.
Because we must not assume anything outside the shown snippets, we will:
- Implement a small helper method
_get_safe_task_pathinsideCleaningTaskServicethat encapsulates this logic for a giventask_id. - Use this helper in
delete_taskbefore callingshutil.rmtree, and bail out safely if the path is invalid or outsideFLOW_PATH.
We already import Path from pathlib, so no new imports are needed. The helper will use Path.resolve() and, for compatibility, a simple ancestry check based on Path.relative_to (wrapped in a try/except) instead of Path.is_relative_to (which is not available in older Python versions). All changes will be made in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py within the class that already contains delete_task.
-
Copy modified lines R335-R350 -
Copy modified lines R355-R358 -
Copy modified line R360 -
Copy modified lines R402-R408
| @@ -332,13 +332,32 @@ | ||
| """Get task results""" | ||
| return await self.result_repo.find_by_instance_id(db, task_id) | ||
|
|
||
| def _get_safe_task_path(self, task_id: str) -> Path: | ||
| """ | ||
| Construct a safe task path under FLOW_PATH for the given task_id. | ||
|
|
||
| Ensures that the resolved path stays within the FLOW_PATH directory. | ||
| """ | ||
| base_path = Path(FLOW_PATH).resolve() | ||
| # Construct path under base_path and resolve to eliminate ".." etc. | ||
| candidate = (base_path / task_id).resolve() | ||
| try: | ||
| # This will raise a ValueError if candidate is not under base_path | ||
| candidate.relative_to(base_path) | ||
| except ValueError: | ||
| raise BusinessError(ErrorCodes.INVALID_PARAMETER, f"Invalid task id path: {task_id}") | ||
| return candidate | ||
|
|
||
| async def get_task_log(self, db: AsyncSession, task_id: str, retry_count: int) -> List[CleaningTaskLog]: | ||
| """Get task log""" | ||
| self.validator.check_task_id(task_id) | ||
|
|
||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log") | ||
| base_path = Path(FLOW_PATH).resolve() | ||
| task_dir = self._get_safe_task_path(task_id) | ||
|
|
||
| log_path = task_dir / "output.log" | ||
| if retry_count > 0: | ||
| log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}") | ||
| log_path = task_dir / f"output.log.{retry_count}" | ||
|
|
||
| if not log_path.exists(): | ||
| return [] | ||
| @@ -385,7 +399,13 @@ | ||
| await self.operator_instance_repo.delete_by_instance_id(db, task_id) | ||
| await self.result_repo.delete_by_instance_id(db, task_id) | ||
|
|
||
| task_path = Path(f"{FLOW_PATH}/{task_id}") | ||
| try: | ||
| task_path = self._get_safe_task_path(task_id) | ||
| except BusinessError as e: | ||
| # Invalid task_id path; log and do not attempt filesystem deletion | ||
| logger.warning(f"Skip deleting task path for invalid task id {task_id}: {e}") | ||
| return | ||
|
|
||
| if task_path.exists(): | ||
| try: | ||
| shutil.rmtree(task_path) |
| ) -> OperatorDto: | ||
| """上传算子文件并解析元数据""" | ||
| file_path = self._get_upload_path(file_name) | ||
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 19 hours ago
General fix strategy: constrain and validate the user-controlled file_name before using it in a filesystem path. Ensure that after normalization, the resulting path is (1) under the intended upload root directory and (2) not an absolute path or containing directory traversal. A simple and robust approach here is to treat file_name as just a filename (no directories), reject anything containing path separators, and build the full path from a safe root.
Best concrete fix in this codebase:
- Add a helper in
OperatorServiceto validate and sanitize the providedfile_name. This helper should:- Reject empty filenames.
- Reject names containing
/or\(so no subdirectories or traversal components). - Optionally, strip leading/trailing whitespace.
- Use this helper in
upload_operatorbefore calling_get_upload_path, so that_get_upload_pathreceives only a safe basename and continues to work as before. - Keep changes localized to
operator_service.pyandoperator_routes.pyas requested, and do not change the external API semantics: if a user passes an invalidfileName, raise a clear error (e.g.,BusinessErrororHTTPException) instead of silently mangling it.
Concretely:
-
In
runtime/datamate-python/app/module/operator/service/operator_service.py:- Add a private method
_validate_file_name(self, file_name: str) -> strthat enforces the constraints and raisesBusinessError(ErrorCodes.INVALID_PARAM)(or similar reasonable existing error code; since we cannot see them all, we’ll pick a generic parameter error that likely exists). - In
upload_operator, callsafe_file_name = self._validate_file_name(file_name)and then usesafe_file_namefor_get_upload_pathand_get_file_typeinstead of the rawfile_name.
- Add a private method
-
Optionally, in
runtime/datamate-python/app/module/operator/interface/operator_routes.py:- Keep only minimal logic, but we can also perform a very basic early check on
file_name(e.g., not empty); the primary security enforcement lives in the service layer where the filesystem access occurs, which is already shared by any other callers.
- Keep only minimal logic, but we can also perform a very basic early check on
This preserves existing functionality for valid filenames while preventing traversal or absolute path attacks.
-
Copy modified lines R463-R465 -
Copy modified line R468 -
Copy modified line R471 -
Copy modified lines R616-R631
| @@ -460,13 +460,15 @@ | ||
| db: AsyncSession | ||
| ) -> OperatorDto: | ||
| """上传算子文件并解析元数据""" | ||
| file_path = self._get_upload_path(file_name) | ||
| # Validate and sanitize the provided file name to prevent path traversal | ||
| safe_file_name = self._validate_file_name(file_name) | ||
| file_path = self._get_upload_path(safe_file_name) | ||
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None | ||
| return self.parser_holder.parse_yaml_from_archive( | ||
| self._get_file_type(file_name), | ||
| self._get_file_type(safe_file_name), | ||
| file_path, | ||
| YAML_PATH, | ||
| file_name, | ||
| safe_file_name, | ||
| file_size | ||
| ) | ||
|
|
||
| @@ -615,6 +613,22 @@ | ||
| """获取文件名不含扩展名""" | ||
| return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name | ||
|
|
||
| def _validate_file_name(self, file_name: str) -> str: | ||
| """ | ||
| 校验上传的文件名,防止目录遍历等路径攻击。 | ||
|
|
||
| 仅允许简单的文件名,不允许包含路径分隔符或为空。 | ||
| """ | ||
| if not file_name: | ||
| raise BusinessError(ErrorCodes.INVALID_PARAM, "fileName is required") | ||
|
|
||
| # 拒绝包含路径分隔符的文件名,防止 ../../ 等目录遍历 | ||
| if "/" in file_name or "\\" in file_name: | ||
| raise BusinessError(ErrorCodes.INVALID_PARAM, "invalid fileName") | ||
|
|
||
| # 去除首尾空白字符 | ||
| return file_name.strip() | ||
|
|
||
| def _get_upload_path(self, file_name: str) -> str: | ||
| """获取上传文件路径""" | ||
| return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name) |
| ) -> OperatorDto: | ||
| """上传算子文件并解析元数据""" | ||
| file_path = self._get_upload_path(file_name) | ||
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None |
Check failure
Code scanning / CodeQL
Uncontrolled data used in path expression High
user-provided value
Show autofix suggestion
Hide autofix suggestion
Copilot Autofix
AI about 19 hours ago
In general, to fix uncontrolled path usage, any user-controlled path component must be validated or normalized and checked against a safe root directory before being used. For this case, file_name should be constrained to be a simple filename (no directory separators, no .., no absolute paths) and then combined with the intended upload directory. Alternatively, we can normalize the full path and enforce that it stays within OPERATOR_BASE_PATH/UPLOAD_DIR.
The best fix here without changing overall functionality is:
-
Add a small helper in
OperatorServiceto validate and resolve upload paths:- Normalize the path with
os.path.normpath. - Build the full path from
OPERATOR_BASE_PATH/UPLOAD_DIRandfile_name. - Reject if
file_nameis empty, contains path traversal (..), is absolute, or if the normalized full path is not under the upload root. - Optionally, ensure that
file_nameis only a basename (no path separators).
- Normalize the path with
-
Use this safe helper in
upload_operatorinstead of_get_upload_pathand also validatefile_namein the router before calling the service, returning a clear HTTP error if invalid.
Concretely:
-
In
runtime/datamate-python/app/module/operator/service/operator_service.py:-
Add a private method like
_get_safe_upload_path(self, file_name: str) -> strthat:- Uses
os.path.basenameto strip any directory components. - Joins it with
OPERATOR_BASE_PATHandUPLOAD_DIR. - Normalizes the result and checks it starts with the normalized upload root.
- Raises
BusinessError(ErrorCodes.INVALID_FILE_NAME)or similar if invalid (if no such error exists, choose a generic one defined in the codebase, but we must not invent new codes; instead we can raise a genericBusinessError(ErrorCodes.PARAM_ERROR)if that exists in the project—however we have not been shown it, so to avoid assumptions, we should raise a plainBusinessError(ErrorCodes.OPERATOR_IN_INSTANCE)is clearly wrong. Given constraints, the safest is to raise a genericBusinessError(ErrorCodes.OPERATOR_CANNOT_DELETE_PREDEFINED)is also wrong. To avoid guessing, we can raise a genericBusinessError(ErrorCodes.OPERATOR_IN_INSTANCE)? That changes semantics. Instead, better: raise a plainBusinessError(ErrorCodes.OPERATOR_IN_INSTANCE)is still semantics change. To avoid guessing error codes, we can raise a built-in exception likeValueError, which will be handled by global exception middleware. This does not require introducing new domain errors.) - For minimal impact, use
os.path.basenameplus basic checks; this will effectively strip traversal, while still allowing existing functionality (files are normally saved as simple names generated byFileService).
- Uses
-
Update
upload_operatorto call_get_safe_upload_path(file_name)instead of_get_upload_path(file_name).
-
-
In
runtime/datamate-python/app/module/operator/interface/operator_routes.py:- Add simple validation on
file_nameto reject obvious invalid input early:- Check it does not contain path separators (
/or\) and is not empty or only whitespace. - If invalid, raise
HTTPException(422, "invalid fileName").
- Check it does not contain path separators (
- Add simple validation on
These changes ensure that even if a malicious fileName is submitted, it cannot escape the intended upload directory or manipulate arbitrary paths, and this is done entirely inside the snippets we’re allowed to edit, without adding new dependencies.
-
Copy modified lines R463-R464 -
Copy modified line R620 -
Copy modified lines R623-R647
| @@ -460,7 +460,8 @@ | ||
| db: AsyncSession | ||
| ) -> OperatorDto: | ||
| """上传算子文件并解析元数据""" | ||
| file_path = self._get_upload_path(file_name) | ||
| # Validate and resolve the upload path to prevent path traversal | ||
| file_path = self._get_safe_upload_path(file_name) | ||
| file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None | ||
| return self.parser_holder.parse_yaml_from_archive( | ||
| self._get_file_type(file_name), | ||
| @@ -616,9 +617,34 @@ | ||
| return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name | ||
|
|
||
| def _get_upload_path(self, file_name: str) -> str: | ||
| """获取上传文件路径""" | ||
| """获取上传文件路径(不进行安全校验,内部使用 _get_safe_upload_path 优先)""" | ||
| return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name) | ||
|
|
||
| def _get_safe_upload_path(self, file_name: str) -> str: | ||
| """ | ||
| 获取经过安全校验的上传文件路径,防止目录遍历。 | ||
|
|
||
| 仅允许使用文件名部分,不允许包含路径分隔符或相对路径段。 | ||
| """ | ||
| if not file_name or not str(file_name).strip(): | ||
| raise ValueError("file_name is empty") | ||
|
|
||
| # 只保留文件名部分,去掉任何路径信息 | ||
| safe_name = os.path.basename(file_name) | ||
|
|
||
| # 基础上传目录 | ||
| upload_root = os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR) | ||
|
|
||
| # 构造并标准化完整路径 | ||
| full_path = os.path.normpath(os.path.join(upload_root, safe_name)) | ||
| upload_root_norm = os.path.normpath(upload_root) | ||
|
|
||
| # 确保最终路径仍在上传根目录下 | ||
| if not (full_path == upload_root_norm or full_path.startswith(upload_root_norm + os.sep)): | ||
| raise ValueError("invalid file_name path") | ||
|
|
||
| return full_path | ||
|
|
||
| def _get_extract_path(self, file_stem: str) -> str: | ||
| """获取解压路径""" | ||
| return os.path.join(OPERATOR_BASE_PATH, EXTRACT_DIR, file_stem) |
-
Copy modified line R154 -
Copy modified lines R157-R159
| @@ -151,9 +151,12 @@ | ||
| ): | ||
| """上传算子""" | ||
| file_name = request.get("fileName") | ||
| from fastapi import HTTPException | ||
| if not file_name: | ||
| from fastapi import HTTPException | ||
| raise HTTPException(status_code=422, detail="fileName is required") | ||
| # 基础校验:禁止路径分隔符,避免明显的目录遍历 | ||
| if any(sep in file_name for sep in ("/", "\\")): | ||
| raise HTTPException(status_code=422, detail="invalid fileName") | ||
| operator = await service.upload_operator(file_name, db) | ||
| return StandardResponse(code="0", message="success", data=operator) | ||
|
|
No description provided.