Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potential fix for code scanning alert no. 5: Uncontrolled data used in path expression #1

Merged
merged 2 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions src/analyzer/code_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import logging
import os
import re
from datetime import datetime
from functools import lru_cache
from typing import Any, Dict, List, Set
from pathlib import Path
from typing import Any, Dict, List, Set

import git

Expand Down Expand Up @@ -64,21 +65,30 @@ def __init__(self) -> None:

async def analyze_code(self, code_content: str, filename: str) -> VulnerabilityReport:
"""
Analyze a single file for security vulnerabilities with improved validation
Analyze a single file for security vulnerabilities with improved validation.
Empty files are ignored and return an empty report.

Args:
code_content: The code content to analyze
filename: The name of the file

Returns:
VulnerabilityReport: The vulnerability report

Raises:
ValueError: If code content is empty or invalid
VulnerabilityReport: The vulnerability report, empty for empty files
"""

if not code_content or not filename:
raise ValueError("Code content and filename are required")
# Check for empty file
if not code_content or not isinstance(code_content, str) or code_content.isspace():
logging.info(f"Skipping empty file: {filename}")
return VulnerabilityReport(
file_name=filename,
vulnerabilities=[],
chained_vulnerabilities=[],
timestamp=datetime.now()
)

# Validate filename
if not filename or not isinstance(filename, str) or filename.isspace():
raise ValueError("Filename must be a non-empty string")

# Parse the code to get relevant information
try:
Expand Down Expand Up @@ -370,14 +380,14 @@ def _process_ai_response(self, analysis_result: Dict[str, Any]) -> List[Vulnerab
'CROSS_SITE_SCRIPT': VulnerabilityType.CROSS_SITE_SCRIPTING,
'SQL_INJECTION_VULNERABILITY': VulnerabilityType.SQL_INJECTION,
'SQLI': VulnerabilityType.SQL_INJECTION,
'RCE': VulnerabilityType.REMOTE_CODE_EXECUTION_RCE,
'REMOTE_CODE_EXEC': VulnerabilityType.REMOTE_CODE_EXECUTION_RCE,
'RCE': VulnerabilityType.REMOTE_CODE_EXECUTION,
'REMOTE_CODE_EXEC': VulnerabilityType.REMOTE_CODE_EXECUTION,
'COMMAND_EXEC': VulnerabilityType.OS_COMMAND_INJECTION,
'OS_COMMAND_EXEC': VulnerabilityType.OS_COMMAND_INJECTION,
'PATH_TRAVERSAL_VULNERABILITY': VulnerabilityType.PATH_TRAVERSAL,
'DIRECTORY_TRAVERSAL': VulnerabilityType.PATH_TRAVERSAL,
'IDOR': VulnerabilityType.INSECURE_DIRECT_OBJECT_REFERENCE_IDOR,
'DIRECT_OBJECT_REFERENCE': VulnerabilityType.INSECURE_DIRECT_OBJECT_REFERENCE_IDOR,
'IDOR': VulnerabilityType.INSECURE_DIRECT_OBJECT_REFERENCE,
'DIRECT_OBJECT_REFERENCE': VulnerabilityType.INSECURE_DIRECT_OBJECT_REFERENCE,
}

original_type = vuln_data['type']
Expand Down Expand Up @@ -909,8 +919,23 @@ async def _fetch_repository(self, repo_url: str, branch: str) -> str:
str: The local path to the cloned or fetched repository
"""

# Validate the repo_url format
if not re.match(r'^https?://', repo_url):
raise ValueError("Invalid repository URL")

repo_name = repo_url.split('/')[-1].replace('.git', '')
repo_path = os.path.join('/tmp', repo_name) # Temporary directory for the repo

# Get system-appropriate temporary directory
if os.name == 'nt': # Windows
temp_dir = os.path.join(os.environ.get('TEMP') or os.environ.get('TMP') or 'C:\\Windows\\Temp')
else: # Unix-like systems
temp_dir = '/tmp'

repo_path = os.path.normpath(os.path.join(temp_dir, repo_name))

# Ensure the repo_path is within the temp directory
if not os.path.commonprefix([temp_dir, repo_path]) == temp_dir:
raise ValueError("Invalid repository path")

if os.path.exists(repo_path):
# If the repository already exists, fetch the latest changes
Expand Down
2 changes: 1 addition & 1 deletion src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ async def health_check() -> dict:

def main():
# Run the FastAPI app
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
1 change: 1 addition & 0 deletions src/models/vulnerability.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class VulnerabilityType(str, Enum):
INJECTION_FLAW = "INJECTION_FLAW"
SECURE_COOKIE = "SECURE_COOKIE"
INSECURE_CONFIGURATION_SETTING = "INSECURE_CONFIGURATION_SETTING"
INFORMATION_EXPOSURE_THROUGH_ERROR_MESSAGES = "INFORMATION_EXPOSURE_THROUGH_ERROR_MESSAGES"

def is_related_to(self, other: 'VulnerabilityType') -> bool:
"""
Expand Down
Loading