Skip to content

Conversation

@hhhhsc701
Copy link
Collaborator

No description provided.

configure: (proxy, options) => {
// proxy 是 'http-proxy' 的实例
proxy.on("proxyReq", (proxyReq, req, res) => {
// 可以在这里修改请求头

Check warning

Code scanning / CodeQL

Replacement of a substring with itself Medium

This replaces '/api' with itself.

Copilot Autofix

AI 1 day ago

To fix the problem, the rewrite function must either be removed (if no path change is needed) or updated so that it performs an actual transformation. In most Vite setups using a /api proxy, the desired behavior is to strip the /api prefix before forwarding to the backend (so frontend calls /api/users while backend exposes /users). That would turn path.replace(/^\/api/, "") into a meaningful rewrite.

The best way to fix this without changing intended functionality is:

  • If the backend endpoints already include /api (e.g. /api/users), then remove the rewrite option entirely, since it is redundant and misleading.
  • If the backend endpoints do not include /api (common case), change the rewrite to path.replace(/^\/api/, "") so that /api/foo becomes /foo on the backend.

Given only this snippet and the proxy target at http://localhost:8080, the more conventional and likely intent is to strip /api. Concretely, in frontend/vite.config.ts at line 21, replace:

rewrite: (path) => path.replace(/^\/api/, "/api"),

with:

rewrite: (path) => path.replace(/^\/api/, ""),

No new imports or additional methods are required.

Suggested changeset 1
frontend/vite.config.ts

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts
--- a/frontend/vite.config.ts
+++ b/frontend/vite.config.ts
@@ -18,7 +18,7 @@
         target: "http://localhost:8080", // 本地后端服务地址
         changeOrigin: true,
         secure: false,
-        rewrite: (path) => path.replace(/^\/api/, "/api"),
+        rewrite: (path) => path.replace(/^\/api/, ""),
         configure: (proxy, options) => {
           // proxy 是 'http-proxy' 的实例
           proxy.on("proxyReq", (proxyReq, req, res) => {
EOF
@@ -18,7 +18,7 @@
target: "http://localhost:8080", // 本地后端服务地址
changeOrigin: true,
secure: false,
rewrite: (path) => path.replace(/^\/api/, "/api"),
rewrite: (path) => path.replace(/^\/api/, ""),
configure: (proxy, options) => {
// proxy 是 'http-proxy' 的实例
proxy.on("proxyReq", (proxyReq, req, res) => {
Copilot is powered by AI and may make mistakes. Always verify output.
) -> None:
"""Scan source dataset and create dataset.jsonl"""
target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl")
target_file_path.parent.mkdir(parents=True, exist_ok=True)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, the issue should be fixed by validating and constraining the user-controlled task_id before using it in filesystem paths. A robust approach here is to (1) ensure the identifier matches a safe pattern (for example, UUID-like or alphanumeric with limited symbols) and (2) build paths using pathlib.Path and, if relevant, normalize and confirm that the resulting path stays under a known root directory. This prevents directory traversal (../ segments) and absolute path injection, while preserving existing behavior for legitimate IDs.

The best minimally invasive fix, consistent with the rest of this service, is to reuse the existing CleaningTaskValidator.check_task_id for all operations that use task_id in filesystem paths, not just delete_task. Concretely:

  • In scan_dataset, before constructing target_file_path, call self.validator.check_task_id(task_id) so that only valid task IDs are allowed to impact the path.
  • In execute_task, call self.validator.check_task_id(task_id) at the beginning, similar to delete_task.
    This relies on the already-defined validator to enforce a safe format (likely rejecting path separators, .., etc.), and avoids changing the semantics of how task directories are structured. No new imports are needed; CleaningTaskValidator is already imported, and self.validator is already used in this class.

Specifically:

  • Edit runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py.
    • Around line 296–305 in scan_dataset, insert self.validator.check_task_id(task_id) before target_file_path = Path(...).
    • Around line 395–400 in execute_task, insert self.validator.check_task_id(task_id) at the start of the method body.
      No changes are required to cleaning_task_routes.py, and no additional helper methods or external libraries are necessary.

Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -301,6 +301,7 @@
         succeed_files: Set[str] | None = None,
     ) -> None:
         """Scan source dataset and create dataset.jsonl"""
+        self.validator.check_task_id(task_id)
         target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl")
         target_file_path.parent.mkdir(parents=True, exist_ok=True)
 
@@ -394,6 +395,7 @@
 
     async def execute_task(self, db: AsyncSession, task_id: str) -> bool:
         """Execute task"""
+        self.validator.check_task_id(task_id)
         succeeded = await self.result_repo.find_by_instance_id(db, task_id, "COMPLETED")
         succeed_set = {res.src_file_id for res in succeeded}
 
EOF
@@ -301,6 +301,7 @@
succeed_files: Set[str] | None = None,
) -> None:
"""Scan source dataset and create dataset.jsonl"""
self.validator.check_task_id(task_id)
target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl")
target_file_path.parent.mkdir(parents=True, exist_ok=True)

@@ -394,6 +395,7 @@

async def execute_task(self, db: AsyncSession, task_id: str) -> bool:
"""Execute task"""
self.validator.check_task_id(task_id)
succeeded = await self.result_repo.find_by_instance_id(db, task_id, "COMPLETED")
succeed_set = {res.src_file_id for res in succeeded}

Copilot is powered by AI and may make mistakes. Always verify output.
result = await db.execute(query, {"dataset_id": src_dataset_id})
files = result.fetchall()

with open(target_file_path, 'w', encoding='utf-8') as f:

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, fix this by ensuring that the task_id used to build filesystem paths is validated/normalized before use, and that the final path is constrained to a known safe root directory (FLOW_PATH). Even if task_id is conceptually a UUID, we should not rely on that without explicit checks. The safest pattern is to: (1) validate the string format (for example, allow only UUID-like or alphanumeric IDs), (2) join it to the base directory using Path, (3) resolve the path, and (4) verify that the resolved path is still under the intended base directory.

The best fix with minimal behavioral change is to introduce a small internal helper method in CleaningTaskService that converts a task_id into a safe task directory path under FLOW_PATH. This helper will: (a) optionally reuse the existing validator.check_task_id where appropriate, (b) reject IDs containing path separators, and (c) enforce that the resolved path remains under FLOW_PATH. Then we will use this helper in both scan_dataset (for dataset.jsonl) and delete_task (for the directory removal) instead of interpolating task_id directly into an f-string. This adds explicit security while preserving where files are stored and how they’re named for valid IDs.

Concretely, in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py:

  • Add import os near the top so we can check for os.sep and os.path.altsep.
  • Add a private method _get_safe_task_path(self, task_id: str) -> Path near the other helpers (for example just after _get_runtime_config). This method will:
    • Build base_path = Path(FLOW_PATH).resolve().
    • Ensure task_id does not contain directory separators (os.sep or os.path.altsep if set).
    • Construct task_path = (base_path / task_id).resolve().
    • Verify that task_path is within base_path (e.g. task_path.is_relative_to(base_path) on Python 3.9+, or equivalent prefix check).
    • Raise BusinessError(ErrorCodes.INVALID_PARAMS, "Invalid task id") or similar on violation.
  • Update scan_dataset to obtain task_dir = self._get_safe_task_path(task_id) and then set target_file_path = task_dir / "dataset.jsonl" instead of using the f-string.
  • Update delete_task to use task_path = self._get_safe_task_path(task_id) instead of Path(f"{FLOW_PATH}/{task_id}"). We can keep the existing self.validator.check_task_id(task_id); the new check will be an extra layer.

No changes are required in cleaning_task_routes.py because the route simply passes task_id to the service; all security hardening will be done in the service layer.

Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -4,6 +4,7 @@
 import uuid
 from pathlib import Path
 from typing import List, Dict, Any, Set
+import os
 
 from sqlalchemy import text
 from sqlalchemy.ext.asyncio import AsyncSession
@@ -293,6 +294,32 @@
             logger.error(f"Failed to parse runtime config: {e}")
             return {}
 
+    def _get_safe_task_path(self, task_id: str) -> Path:
+        """
+        Build a filesystem path for a task under the configured FLOW_PATH,
+        ensuring that the resulting path cannot escape the base directory.
+        """
+        base_path = Path(FLOW_PATH).resolve()
+
+        # Disallow path separators in task_id to prevent traversal or nested paths.
+        if os.sep in task_id or (os.path.altsep and os.path.altsep in task_id):
+            raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}")
+
+        task_path = (base_path / task_id).resolve()
+
+        # Ensure the resolved path is still under the base FLOW_PATH directory.
+        try:
+            # Python 3.9+: use is_relative_to if available
+            is_relative = task_path.is_relative_to(base_path)  # type: ignore[attr-defined]
+        except AttributeError:
+            # Fallback for older Python versions
+            is_relative = str(task_path).startswith(str(base_path) + os.sep)
+
+        if not is_relative:
+            raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}")
+
+        return task_path
+
     async def scan_dataset(
         self,
         db: AsyncSession,
@@ -301,7 +328,8 @@
         succeed_files: Set[str] | None = None,
     ) -> None:
         """Scan source dataset and create dataset.jsonl"""
-        target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl")
+        task_dir = self._get_safe_task_path(task_id)
+        target_file_path = task_dir / "dataset.jsonl"
         target_file_path.parent.mkdir(parents=True, exist_ok=True)
 
         query = text("""
@@ -385,7 +413,7 @@
         await self.operator_instance_repo.delete_by_instance_id(db, task_id)
         await self.result_repo.delete_by_instance_id(db, task_id)
 
-        task_path = Path(f"{FLOW_PATH}/{task_id}")
+        task_path = self._get_safe_task_path(task_id)
         if task_path.exists():
             try:
                 shutil.rmtree(task_path)
EOF
@@ -4,6 +4,7 @@
import uuid
from pathlib import Path
from typing import List, Dict, Any, Set
import os

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -293,6 +294,32 @@
logger.error(f"Failed to parse runtime config: {e}")
return {}

def _get_safe_task_path(self, task_id: str) -> Path:
"""
Build a filesystem path for a task under the configured FLOW_PATH,
ensuring that the resulting path cannot escape the base directory.
"""
base_path = Path(FLOW_PATH).resolve()

# Disallow path separators in task_id to prevent traversal or nested paths.
if os.sep in task_id or (os.path.altsep and os.path.altsep in task_id):
raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}")

task_path = (base_path / task_id).resolve()

# Ensure the resolved path is still under the base FLOW_PATH directory.
try:
# Python 3.9+: use is_relative_to if available
is_relative = task_path.is_relative_to(base_path) # type: ignore[attr-defined]
except AttributeError:
# Fallback for older Python versions
is_relative = str(task_path).startswith(str(base_path) + os.sep)

if not is_relative:
raise BusinessError(ErrorCodes.INVALID_PARAMS, f"Invalid task id: {task_id}")

return task_path

async def scan_dataset(
self,
db: AsyncSession,
@@ -301,7 +328,8 @@
succeed_files: Set[str] | None = None,
) -> None:
"""Scan source dataset and create dataset.jsonl"""
target_file_path = Path(f"{FLOW_PATH}/{task_id}/dataset.jsonl")
task_dir = self._get_safe_task_path(task_id)
target_file_path = task_dir / "dataset.jsonl"
target_file_path.parent.mkdir(parents=True, exist_ok=True)

query = text("""
@@ -385,7 +413,7 @@
await self.operator_instance_repo.delete_by_instance_id(db, task_id)
await self.result_repo.delete_by_instance_id(db, task_id)

task_path = Path(f"{FLOW_PATH}/{task_id}")
task_path = self._get_safe_task_path(task_id)
if task_path.exists():
try:
shutil.rmtree(task_path)
Copilot is powered by AI and may make mistakes. Always verify output.
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")

if not log_path.exists():

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, to fix uncontrolled-path issues, construct paths using a trusted root directory, normalize the resulting path (e.g., with Path.resolve() or os.path.realpath()), and verify that the normalized path is still within the trusted root. Also, avoid embedding user input directly into path segments if you can instead validate or constrain it (e.g., to UUIDs or numeric IDs).

For this specific code, the best fix is to (1) build log file paths by joining FLOW_PATH (a trusted base) with a single (sanitized) subdirectory for the task, (2) resolve the final path and ensure it lives under FLOW_PATH, and (3) keep existing behavior otherwise. We can also further enforce that task_id is a safe directory name via a simple regex, even though check_task_id already exists, so the path validation is explicit and local.

Concretely in cleaning_task_service.py:

  • Introduce a helper function _build_safe_log_path(task_id: str, retry_count: int) -> Path (or inline logic) that:
    • Uses Path(FLOW_PATH) / task_id / filename to build the path.
    • Normalizes with .resolve().
    • Checks that the resolved path is still inside FLOW_PATH using is_relative_to if available, or a fallback based on relative_to / prefix check.
    • Raises BusinessError(ErrorCodes.INVALID_PARAM, "Invalid task_id or retry_count for log path") (or similar) if the path would escape the root.
  • In get_task_log, replace direct string interpolation with calls to that helper and use the validated log_path for .exists() and open().
  • Optionally perform a lightweight pattern check on task_id (e.g., allow only [A-Za-z0-9_-]+) to ensure it is a safe directory name; this reinforces safety even if the root check covers most issues.

We only touch the code region around get_task_log in cleaning_task_service.py and, if needed, add a small helper method in the same class. No changes are needed in cleaning_task_routes.py.


Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -293,6 +293,39 @@
             logger.error(f"Failed to parse runtime config: {e}")
             return {}
 
+    def _build_safe_log_path(self, task_id: str, retry_count: int) -> Path:
+        """
+        Build a safe log file path under FLOW_PATH for the given task and retry count.
+
+        Ensures that the resulting path does not escape the FLOW_PATH directory.
+        """
+        # Basic sanity check on task_id to avoid path separators or traversal tokens
+        if not re.fullmatch(r"[A-Za-z0-9_\-]+", task_id):
+            raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid task_id format")
+
+        base_path = Path(FLOW_PATH)
+        if retry_count > 0:
+            candidate = base_path / task_id / f"output.log.{retry_count}"
+        else:
+            candidate = base_path / task_id / "output.log"
+
+        try:
+            resolved = candidate.resolve()
+        except FileNotFoundError:
+            # Parent directories may not exist yet; resolve base and manually join
+            resolved = (base_path.resolve() / task_id / candidate.name).resolve()
+
+        base_resolved = base_path.resolve()
+
+        # Python 3.9+ has is_relative_to; fall back to try/except for compatibility
+        try:
+            resolved.relative_to(base_resolved)
+        except ValueError:
+            # Path escapes the allowed root
+            raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid log path")
+
+        return resolved
+
     async def scan_dataset(
         self,
         db: AsyncSession,
@@ -336,9 +369,7 @@
         """Get task log"""
         self.validator.check_task_id(task_id)
 
-        log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
-        if retry_count > 0:
-            log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
+        log_path = self._build_safe_log_path(task_id, retry_count)
 
         if not log_path.exists():
             return []
EOF
@@ -293,6 +293,39 @@
logger.error(f"Failed to parse runtime config: {e}")
return {}

def _build_safe_log_path(self, task_id: str, retry_count: int) -> Path:
"""
Build a safe log file path under FLOW_PATH for the given task and retry count.

Ensures that the resulting path does not escape the FLOW_PATH directory.
"""
# Basic sanity check on task_id to avoid path separators or traversal tokens
if not re.fullmatch(r"[A-Za-z0-9_\-]+", task_id):
raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid task_id format")

base_path = Path(FLOW_PATH)
if retry_count > 0:
candidate = base_path / task_id / f"output.log.{retry_count}"
else:
candidate = base_path / task_id / "output.log"

try:
resolved = candidate.resolve()
except FileNotFoundError:
# Parent directories may not exist yet; resolve base and manually join
resolved = (base_path.resolve() / task_id / candidate.name).resolve()

base_resolved = base_path.resolve()

# Python 3.9+ has is_relative_to; fall back to try/except for compatibility
try:
resolved.relative_to(base_resolved)
except ValueError:
# Path escapes the allowed root
raise BusinessError(ErrorCodes.INVALID_PARAM, "Invalid log path")

return resolved

async def scan_dataset(
self,
db: AsyncSession,
@@ -336,9 +369,7 @@
"""Get task log"""
self.validator.check_task_id(task_id)

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
log_path = self._build_safe_log_path(task_id, retry_count)

if not log_path.exists():
return []
Copilot is powered by AI and may make mistakes. Always verify output.
)
exception_suffix_pattern = re.compile(r"\b\w+(Warning|Error|Exception)\b")

with open(log_path, 'r', encoding='utf-8') as f:

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.
This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, to fix uncontrolled path usage you should (1) treat the user-controlled parts as opaque names, not fragments of filesystem paths; (2) combine them with a fixed, known-safe base directory using Path/os.path.join; (3) normalize the resulting path; and (4) verify that the normalized path is still within the intended base directory before accessing the filesystem. Additionally, forbidding path separators in identifiers (like task_id) adds defense in depth.

For this specific code, the best fix without changing existing functionality is:

  • Introduce a safe base directory FLOW_PATH (if not already defined) as a Path instance or use it as such.
  • In get_task_log, construct log_path by combining the base path and task_id using Path rather than manual f-strings, then normalize and verify that the resulting path is within FLOW_PATH.
  • Continue to use retry_count in the filename (as currently) but only after the base path + task_id directory has been validated; retry_count itself is an integer and only influences the filename suffix.
  • Keep calling self.validator.check_task_id(task_id) as it may do additional checks.

Concretely, within cleaning_task_service.py:

  1. Import os so we can use os.path.commonpath (a standard library function) or, alternatively, rely on Path.resolve() and prefix checks.

  2. In get_task_log, replace the simple Path(f"{FLOW_PATH}/{task_id}/output.log") logic with:

    • Construct an initial directory: task_dir = (Path(FLOW_PATH) / task_id).resolve().
    • Verify task_dir is within FLOW_PATH: e.g. if FLOW_PATH_PATH not in task_dir.parents and task_dir != FLOW_PATH_PATH: raise BusinessError(...) or use os.path.commonpath.
    • Based on retry_count, select the file path under task_dir (task_dir / "output.log" or task_dir / f"output.log.{retry_count}").
  3. Ensure FLOW_PATH is used consistently as a Path or is wrapped by Path(FLOW_PATH) before operations. Because we’re not allowed to assume or modify where FLOW_PATH is defined, we’ll wrap it locally with Path(FLOW_PATH) and perform checks relative to that.

No changes are needed in cleaning_task_routes.py to satisfy CodeQL: we keep the route signatures identical, and all validation happens in the service.

Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -4,6 +4,7 @@
 import uuid
 from pathlib import Path
 from typing import List, Dict, Any, Set
+import os
 
 from sqlalchemy import text
 from sqlalchemy.ext.asyncio import AsyncSession
@@ -336,9 +337,17 @@
         """Get task log"""
         self.validator.check_task_id(task_id)
 
-        log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
+        # Ensure that the log file path stays within the configured FLOW_PATH directory
+        flow_base_path = Path(FLOW_PATH).resolve()
+        task_dir = (flow_base_path / task_id).resolve()
+        # Verify that the resolved task_dir is within the FLOW_PATH directory
+        if os.path.commonpath([str(flow_base_path), str(task_dir)]) != str(flow_base_path):
+            logger.warning("Attempt to access log outside of FLOW_PATH: task_id=%s, resolved=%s", task_id, task_dir)
+            raise BusinessError(ErrorCodes.INVALID_PARAMS, "Invalid task_id")
+
+        log_path = task_dir / "output.log"
         if retry_count > 0:
-            log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
+            log_path = task_dir / f"output.log.{retry_count}"
 
         if not log_path.exists():
             return []
EOF
@@ -4,6 +4,7 @@
import uuid
from pathlib import Path
from typing import List, Dict, Any, Set
import os

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@@ -336,9 +337,17 @@
"""Get task log"""
self.validator.check_task_id(task_id)

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
# Ensure that the log file path stays within the configured FLOW_PATH directory
flow_base_path = Path(FLOW_PATH).resolve()
task_dir = (flow_base_path / task_id).resolve()
# Verify that the resolved task_dir is within the FLOW_PATH directory
if os.path.commonpath([str(flow_base_path), str(task_dir)]) != str(flow_base_path):
logger.warning("Attempt to access log outside of FLOW_PATH: task_id=%s, resolved=%s", task_id, task_dir)
raise BusinessError(ErrorCodes.INVALID_PARAMS, "Invalid task_id")

log_path = task_dir / "output.log"
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
log_path = task_dir / f"output.log.{retry_count}"

if not log_path.exists():
return []
Copilot is powered by AI and may make mistakes. Always verify output.
await self.result_repo.delete_by_instance_id(db, task_id)

task_path = Path(f"{FLOW_PATH}/{task_id}")
if task_path.exists():

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, when using user-influenced data to build filesystem paths, normalize the resulting path and ensure it remains under a trusted base directory, rejecting any path that escapes this root. This prevents directory traversal attacks such as ../../.. segments or absolute paths.

For this specific case, the best fix is to:

  1. Build the task directory path by joining FLOW_PATH and task_id.
  2. Normalize/resolve that combined path.
  3. Ensure that the normalized path is still inside the normalized FLOW_PATH directory.
  4. Only then call shutil.rmtree on the safe, normalized path. Otherwise, log a warning and skip deletion (or raise a controlled error).

Concretely, in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py inside delete_task:

  • Replace task_path = Path(f"{FLOW_PATH}/{task_id}") with something like:

    base_path = Path(FLOW_PATH).resolve()
    candidate_path = (base_path / task_id).resolve()
    if base_path in candidate_path.parents or candidate_path == base_path:
        task_path = candidate_path
        ...
    else:
        logger.warning("Refusing to delete task path outside FLOW_PATH: %s", candidate_path)

    (we’ll slightly adjust this to avoid deleting base_path itself and to keep the code compact).

  • Keep the existing existence check and shutil.rmtree call, but operate on the validated task_path. If validation fails, do not call rmtree.

No new imports beyond Path (already imported) are needed. All changes are limited to the delete_task method body around lines 380–393 in cleaning_task_service.py. The routes file (cleaning_task_routes.py) does not need changes.

Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -385,7 +385,15 @@
         await self.operator_instance_repo.delete_by_instance_id(db, task_id)
         await self.result_repo.delete_by_instance_id(db, task_id)
 
-        task_path = Path(f"{FLOW_PATH}/{task_id}")
+        base_path = Path(FLOW_PATH).resolve()
+        candidate_path = (base_path / task_id).resolve()
+
+        # Ensure the resolved task path is a subdirectory of the configured FLOW_PATH
+        if base_path not in candidate_path.parents:
+            logger.warning(f"Refusing to delete task path outside FLOW_PATH: {candidate_path}")
+            return
+
+        task_path = candidate_path
         if task_path.exists():
             try:
                 shutil.rmtree(task_path)
EOF
@@ -385,7 +385,15 @@
await self.operator_instance_repo.delete_by_instance_id(db, task_id)
await self.result_repo.delete_by_instance_id(db, task_id)

task_path = Path(f"{FLOW_PATH}/{task_id}")
base_path = Path(FLOW_PATH).resolve()
candidate_path = (base_path / task_id).resolve()

# Ensure the resolved task path is a subdirectory of the configured FLOW_PATH
if base_path not in candidate_path.parents:
logger.warning(f"Refusing to delete task path outside FLOW_PATH: {candidate_path}")
return

task_path = candidate_path
if task_path.exists():
try:
shutil.rmtree(task_path)
Copilot is powered by AI and may make mistakes. Always verify output.
task_path = Path(f"{FLOW_PATH}/{task_id}")
if task_path.exists():
try:
shutil.rmtree(task_path)

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 20 hours ago

In general, the fix is to ensure that any path derived from user input is (1) rooted under a trusted base directory and (2) normalized before use, and then checked to confirm it has not escaped that base directory. For destructive operations like shutil.rmtree, this is especially important to prevent an attacker from deleting arbitrary parts of the filesystem.

For this specific case, we should:

  1. Construct task_path using Path(FLOW_PATH) / task_id.
  2. Resolve it to an absolute, normalized path via .resolve().
  3. Ensure that the resolved task_path is a descendant of the resolved base path FLOW_PATH (e.g., using is_relative_to when available, or a safe fallback).
  4. If the check fails, log a warning and abort deletion (or raise an error), rather than calling shutil.rmtree.

Because we must not assume anything outside the shown snippets, we will:

  • Implement a small helper method _get_safe_task_path inside CleaningTaskService that encapsulates this logic for a given task_id.
  • Use this helper in delete_task before calling shutil.rmtree, and bail out safely if the path is invalid or outside FLOW_PATH.

We already import Path from pathlib, so no new imports are needed. The helper will use Path.resolve() and, for compatibility, a simple ancestry check based on Path.relative_to (wrapped in a try/except) instead of Path.is_relative_to (which is not available in older Python versions). All changes will be made in runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py within the class that already contains delete_task.

Suggested changeset 1
runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
--- a/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
+++ b/runtime/datamate-python/app/module/cleaning/service/cleaning_task_service.py
@@ -332,13 +332,32 @@
         """Get task results"""
         return await self.result_repo.find_by_instance_id(db, task_id)
 
+    def _get_safe_task_path(self, task_id: str) -> Path:
+        """
+        Construct a safe task path under FLOW_PATH for the given task_id.
+
+        Ensures that the resolved path stays within the FLOW_PATH directory.
+        """
+        base_path = Path(FLOW_PATH).resolve()
+        # Construct path under base_path and resolve to eliminate ".." etc.
+        candidate = (base_path / task_id).resolve()
+        try:
+            # This will raise a ValueError if candidate is not under base_path
+            candidate.relative_to(base_path)
+        except ValueError:
+            raise BusinessError(ErrorCodes.INVALID_PARAMETER, f"Invalid task id path: {task_id}")
+        return candidate
+
     async def get_task_log(self, db: AsyncSession, task_id: str, retry_count: int) -> List[CleaningTaskLog]:
         """Get task log"""
         self.validator.check_task_id(task_id)
 
-        log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
+        base_path = Path(FLOW_PATH).resolve()
+        task_dir = self._get_safe_task_path(task_id)
+
+        log_path = task_dir / "output.log"
         if retry_count > 0:
-            log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
+            log_path = task_dir / f"output.log.{retry_count}"
 
         if not log_path.exists():
             return []
@@ -385,7 +399,13 @@
         await self.operator_instance_repo.delete_by_instance_id(db, task_id)
         await self.result_repo.delete_by_instance_id(db, task_id)
 
-        task_path = Path(f"{FLOW_PATH}/{task_id}")
+        try:
+            task_path = self._get_safe_task_path(task_id)
+        except BusinessError as e:
+            # Invalid task_id path; log and do not attempt filesystem deletion
+            logger.warning(f"Skip deleting task path for invalid task id {task_id}: {e}")
+            return
+
         if task_path.exists():
             try:
                 shutil.rmtree(task_path)
EOF
@@ -332,13 +332,32 @@
"""Get task results"""
return await self.result_repo.find_by_instance_id(db, task_id)

def _get_safe_task_path(self, task_id: str) -> Path:
"""
Construct a safe task path under FLOW_PATH for the given task_id.

Ensures that the resolved path stays within the FLOW_PATH directory.
"""
base_path = Path(FLOW_PATH).resolve()
# Construct path under base_path and resolve to eliminate ".." etc.
candidate = (base_path / task_id).resolve()
try:
# This will raise a ValueError if candidate is not under base_path
candidate.relative_to(base_path)
except ValueError:
raise BusinessError(ErrorCodes.INVALID_PARAMETER, f"Invalid task id path: {task_id}")
return candidate

async def get_task_log(self, db: AsyncSession, task_id: str, retry_count: int) -> List[CleaningTaskLog]:
"""Get task log"""
self.validator.check_task_id(task_id)

log_path = Path(f"{FLOW_PATH}/{task_id}/output.log")
base_path = Path(FLOW_PATH).resolve()
task_dir = self._get_safe_task_path(task_id)

log_path = task_dir / "output.log"
if retry_count > 0:
log_path = Path(f"{FLOW_PATH}/{task_id}/output.log.{retry_count}")
log_path = task_dir / f"output.log.{retry_count}"

if not log_path.exists():
return []
@@ -385,7 +399,13 @@
await self.operator_instance_repo.delete_by_instance_id(db, task_id)
await self.result_repo.delete_by_instance_id(db, task_id)

task_path = Path(f"{FLOW_PATH}/{task_id}")
try:
task_path = self._get_safe_task_path(task_id)
except BusinessError as e:
# Invalid task_id path; log and do not attempt filesystem deletion
logger.warning(f"Skip deleting task path for invalid task id {task_id}: {e}")
return

if task_path.exists():
try:
shutil.rmtree(task_path)
Copilot is powered by AI and may make mistakes. Always verify output.
) -> OperatorDto:
"""上传算子文件并解析元数据"""
file_path = self._get_upload_path(file_name)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 19 hours ago

General fix strategy: constrain and validate the user-controlled file_name before using it in a filesystem path. Ensure that after normalization, the resulting path is (1) under the intended upload root directory and (2) not an absolute path or containing directory traversal. A simple and robust approach here is to treat file_name as just a filename (no directories), reject anything containing path separators, and build the full path from a safe root.

Best concrete fix in this codebase:

  1. Add a helper in OperatorService to validate and sanitize the provided file_name. This helper should:
    • Reject empty filenames.
    • Reject names containing / or \ (so no subdirectories or traversal components).
    • Optionally, strip leading/trailing whitespace.
  2. Use this helper in upload_operator before calling _get_upload_path, so that _get_upload_path receives only a safe basename and continues to work as before.
  3. Keep changes localized to operator_service.py and operator_routes.py as requested, and do not change the external API semantics: if a user passes an invalid fileName, raise a clear error (e.g., BusinessError or HTTPException) instead of silently mangling it.

Concretely:

  • In runtime/datamate-python/app/module/operator/service/operator_service.py:

    • Add a private method _validate_file_name(self, file_name: str) -> str that enforces the constraints and raises BusinessError(ErrorCodes.INVALID_PARAM) (or similar reasonable existing error code; since we cannot see them all, we’ll pick a generic parameter error that likely exists).
    • In upload_operator, call safe_file_name = self._validate_file_name(file_name) and then use safe_file_name for _get_upload_path and _get_file_type instead of the raw file_name.
  • Optionally, in runtime/datamate-python/app/module/operator/interface/operator_routes.py:

    • Keep only minimal logic, but we can also perform a very basic early check on file_name (e.g., not empty); the primary security enforcement lives in the service layer where the filesystem access occurs, which is already shared by any other callers.

This preserves existing functionality for valid filenames while preventing traversal or absolute path attacks.


Suggested changeset 1
runtime/datamate-python/app/module/operator/service/operator_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/operator/service/operator_service.py b/runtime/datamate-python/app/module/operator/service/operator_service.py
--- a/runtime/datamate-python/app/module/operator/service/operator_service.py
+++ b/runtime/datamate-python/app/module/operator/service/operator_service.py
@@ -460,13 +460,15 @@
         db: AsyncSession
     ) -> OperatorDto:
         """上传算子文件并解析元数据"""
-        file_path = self._get_upload_path(file_name)
+        # Validate and sanitize the provided file name to prevent path traversal
+        safe_file_name = self._validate_file_name(file_name)
+        file_path = self._get_upload_path(safe_file_name)
         file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None
         return self.parser_holder.parse_yaml_from_archive(
-            self._get_file_type(file_name),
+            self._get_file_type(safe_file_name),
             file_path,
             YAML_PATH,
-            file_name,
+            safe_file_name,
             file_size
         )
 
@@ -615,6 +613,22 @@
         """获取文件名不含扩展名"""
         return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name
 
+    def _validate_file_name(self, file_name: str) -> str:
+        """
+        校验上传的文件名,防止目录遍历等路径攻击。
+
+        仅允许简单的文件名,不允许包含路径分隔符或为空。
+        """
+        if not file_name:
+            raise BusinessError(ErrorCodes.INVALID_PARAM, "fileName is required")
+
+        # 拒绝包含路径分隔符的文件名,防止 ../../ 等目录遍历
+        if "/" in file_name or "\\" in file_name:
+            raise BusinessError(ErrorCodes.INVALID_PARAM, "invalid fileName")
+
+        # 去除首尾空白字符
+        return file_name.strip()
+
     def _get_upload_path(self, file_name: str) -> str:
         """获取上传文件路径"""
         return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name)
EOF
@@ -460,13 +460,15 @@
db: AsyncSession
) -> OperatorDto:
"""上传算子文件并解析元数据"""
file_path = self._get_upload_path(file_name)
# Validate and sanitize the provided file name to prevent path traversal
safe_file_name = self._validate_file_name(file_name)
file_path = self._get_upload_path(safe_file_name)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None
return self.parser_holder.parse_yaml_from_archive(
self._get_file_type(file_name),
self._get_file_type(safe_file_name),
file_path,
YAML_PATH,
file_name,
safe_file_name,
file_size
)

@@ -615,6 +613,22 @@
"""获取文件名不含扩展名"""
return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name

def _validate_file_name(self, file_name: str) -> str:
"""
校验上传的文件名防止目录遍历等路径攻击

仅允许简单的文件名不允许包含路径分隔符或为空
"""
if not file_name:
raise BusinessError(ErrorCodes.INVALID_PARAM, "fileName is required")

# 拒绝包含路径分隔符的文件名,防止 ../../ 等目录遍历
if "/" in file_name or "\\" in file_name:
raise BusinessError(ErrorCodes.INVALID_PARAM, "invalid fileName")

# 去除首尾空白字符
return file_name.strip()

def _get_upload_path(self, file_name: str) -> str:
"""获取上传文件路径"""
return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name)
Copilot is powered by AI and may make mistakes. Always verify output.
) -> OperatorDto:
"""上传算子文件并解析元数据"""
file_path = self._get_upload_path(file_name)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI about 19 hours ago

In general, to fix uncontrolled path usage, any user-controlled path component must be validated or normalized and checked against a safe root directory before being used. For this case, file_name should be constrained to be a simple filename (no directory separators, no .., no absolute paths) and then combined with the intended upload directory. Alternatively, we can normalize the full path and enforce that it stays within OPERATOR_BASE_PATH/UPLOAD_DIR.

The best fix here without changing overall functionality is:

  1. Add a small helper in OperatorService to validate and resolve upload paths:

    • Normalize the path with os.path.normpath.
    • Build the full path from OPERATOR_BASE_PATH/UPLOAD_DIR and file_name.
    • Reject if file_name is empty, contains path traversal (..), is absolute, or if the normalized full path is not under the upload root.
    • Optionally, ensure that file_name is only a basename (no path separators).
  2. Use this safe helper in upload_operator instead of _get_upload_path and also validate file_name in the router before calling the service, returning a clear HTTP error if invalid.

Concretely:

  • In runtime/datamate-python/app/module/operator/service/operator_service.py:

    • Add a private method like _get_safe_upload_path(self, file_name: str) -> str that:

      • Uses os.path.basename to strip any directory components.
      • Joins it with OPERATOR_BASE_PATH and UPLOAD_DIR.
      • Normalizes the result and checks it starts with the normalized upload root.
      • Raises BusinessError(ErrorCodes.INVALID_FILE_NAME) or similar if invalid (if no such error exists, choose a generic one defined in the codebase, but we must not invent new codes; instead we can raise a generic BusinessError(ErrorCodes.PARAM_ERROR) if that exists in the project—however we have not been shown it, so to avoid assumptions, we should raise a plain BusinessError(ErrorCodes.OPERATOR_IN_INSTANCE) is clearly wrong. Given constraints, the safest is to raise a generic BusinessError(ErrorCodes.OPERATOR_CANNOT_DELETE_PREDEFINED) is also wrong. To avoid guessing, we can raise a generic BusinessError(ErrorCodes.OPERATOR_IN_INSTANCE)? That changes semantics. Instead, better: raise a plain BusinessError(ErrorCodes.OPERATOR_IN_INSTANCE) is still semantics change. To avoid guessing error codes, we can raise a built-in exception like ValueError, which will be handled by global exception middleware. This does not require introducing new domain errors.)
      • For minimal impact, use os.path.basename plus basic checks; this will effectively strip traversal, while still allowing existing functionality (files are normally saved as simple names generated by FileService).
    • Update upload_operator to call _get_safe_upload_path(file_name) instead of _get_upload_path(file_name).

  • In runtime/datamate-python/app/module/operator/interface/operator_routes.py:

    • Add simple validation on file_name to reject obvious invalid input early:
      • Check it does not contain path separators (/ or \) and is not empty or only whitespace.
      • If invalid, raise HTTPException(422, "invalid fileName").

These changes ensure that even if a malicious fileName is submitted, it cannot escape the intended upload directory or manipulate arbitrary paths, and this is done entirely inside the snippets we’re allowed to edit, without adding new dependencies.


Suggested changeset 2
runtime/datamate-python/app/module/operator/service/operator_service.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/operator/service/operator_service.py b/runtime/datamate-python/app/module/operator/service/operator_service.py
--- a/runtime/datamate-python/app/module/operator/service/operator_service.py
+++ b/runtime/datamate-python/app/module/operator/service/operator_service.py
@@ -460,7 +460,8 @@
         db: AsyncSession
     ) -> OperatorDto:
         """上传算子文件并解析元数据"""
-        file_path = self._get_upload_path(file_name)
+        # Validate and resolve the upload path to prevent path traversal
+        file_path = self._get_safe_upload_path(file_name)
         file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None
         return self.parser_holder.parse_yaml_from_archive(
             self._get_file_type(file_name),
@@ -616,9 +617,34 @@
         return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name
 
     def _get_upload_path(self, file_name: str) -> str:
-        """获取上传文件路径"""
+        """获取上传文件路径(不进行安全校验,内部使用 _get_safe_upload_path 优先)"""
         return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name)
 
+    def _get_safe_upload_path(self, file_name: str) -> str:
+        """
+        获取经过安全校验的上传文件路径,防止目录遍历。
+
+        仅允许使用文件名部分,不允许包含路径分隔符或相对路径段。
+        """
+        if not file_name or not str(file_name).strip():
+            raise ValueError("file_name is empty")
+
+        # 只保留文件名部分,去掉任何路径信息
+        safe_name = os.path.basename(file_name)
+
+        # 基础上传目录
+        upload_root = os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR)
+
+        # 构造并标准化完整路径
+        full_path = os.path.normpath(os.path.join(upload_root, safe_name))
+        upload_root_norm = os.path.normpath(upload_root)
+
+        # 确保最终路径仍在上传根目录下
+        if not (full_path == upload_root_norm or full_path.startswith(upload_root_norm + os.sep)):
+            raise ValueError("invalid file_name path")
+
+        return full_path
+
     def _get_extract_path(self, file_stem: str) -> str:
         """获取解压路径"""
         return os.path.join(OPERATOR_BASE_PATH, EXTRACT_DIR, file_stem)
EOF
@@ -460,7 +460,8 @@
db: AsyncSession
) -> OperatorDto:
"""上传算子文件并解析元数据"""
file_path = self._get_upload_path(file_name)
# Validate and resolve the upload path to prevent path traversal
file_path = self._get_safe_upload_path(file_name)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else None
return self.parser_holder.parse_yaml_from_archive(
self._get_file_type(file_name),
@@ -616,9 +617,34 @@
return file_name.rsplit('.', 1)[0] if '.' in file_name else file_name

def _get_upload_path(self, file_name: str) -> str:
"""获取上传文件路径"""
"""获取上传文件路径(不进行安全校验,内部使用 _get_safe_upload_path 优先)"""
return os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR, file_name)

def _get_safe_upload_path(self, file_name: str) -> str:
"""
获取经过安全校验的上传文件路径防止目录遍历

仅允许使用文件名部分不允许包含路径分隔符或相对路径段
"""
if not file_name or not str(file_name).strip():
raise ValueError("file_name is empty")

# 只保留文件名部分,去掉任何路径信息
safe_name = os.path.basename(file_name)

# 基础上传目录
upload_root = os.path.join(OPERATOR_BASE_PATH, UPLOAD_DIR)

# 构造并标准化完整路径
full_path = os.path.normpath(os.path.join(upload_root, safe_name))
upload_root_norm = os.path.normpath(upload_root)

# 确保最终路径仍在上传根目录下
if not (full_path == upload_root_norm or full_path.startswith(upload_root_norm + os.sep)):
raise ValueError("invalid file_name path")

return full_path

def _get_extract_path(self, file_stem: str) -> str:
"""获取解压路径"""
return os.path.join(OPERATOR_BASE_PATH, EXTRACT_DIR, file_stem)
runtime/datamate-python/app/module/operator/interface/operator_routes.py
Outside changed files

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/runtime/datamate-python/app/module/operator/interface/operator_routes.py b/runtime/datamate-python/app/module/operator/interface/operator_routes.py
--- a/runtime/datamate-python/app/module/operator/interface/operator_routes.py
+++ b/runtime/datamate-python/app/module/operator/interface/operator_routes.py
@@ -151,9 +151,12 @@
 ):
     """上传算子"""
     file_name = request.get("fileName")
+    from fastapi import HTTPException
     if not file_name:
-        from fastapi import HTTPException
         raise HTTPException(status_code=422, detail="fileName is required")
+    # 基础校验:禁止路径分隔符,避免明显的目录遍历
+    if any(sep in file_name for sep in ("/", "\\")):
+        raise HTTPException(status_code=422, detail="invalid fileName")
     operator = await service.upload_operator(file_name, db)
     return StandardResponse(code="0", message="success", data=operator)
 
EOF
@@ -151,9 +151,12 @@
):
"""上传算子"""
file_name = request.get("fileName")
from fastapi import HTTPException
if not file_name:
from fastapi import HTTPException
raise HTTPException(status_code=422, detail="fileName is required")
# 基础校验:禁止路径分隔符,避免明显的目录遍历
if any(sep in file_name for sep in ("/", "\\")):
raise HTTPException(status_code=422, detail="invalid fileName")
operator = await service.upload_operator(file_name, db)
return StandardResponse(code="0", message="success", data=operator)

Copilot is powered by AI and may make mistakes. Always verify output.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

1 participant