- 필수 기능
- 시작하기
- Glossary
- 표준 속성
- Guides
- Agent
- 통합
- 개방형텔레메트리
- 개발자
- Administrator's Guide
- API
- Datadog Mobile App
- CoScreen
- Cloudcraft
- 앱 내
- 서비스 관리
- 인프라스트럭처
- 애플리케이션 성능
- APM
- Continuous Profiler
- 스팬 시각화
- 데이터 스트림 모니터링
- 데이터 작업 모니터링
- 디지털 경험
- 소프트웨어 제공
- 보안
- AI Observability
- 로그 관리
- 관리
",t};e.buildCustomizationMenuUi=t;function n(e){let t='
",t}function s(e){let n=e.filter.currentValue||e.filter.defaultValue,t='${e.filter.label}
`,e.filter.options.forEach(s=>{let o=s.id===n;t+=``}),t+="${e.filter.label}
`,t+=`ID: python-flask/os-popen-command-injection
Language: Python
Severity: Error
Category: Security
CWE: 78
This rule identifies potential OS Command Injection vulnerabilities. These occur when user-supplied data from a POST request’s JSON body, such as input from request.get_json()
or data.get("<key>")
, is passed directly to the os.popen()
function without sanitization or validation. This vulnerability allows an attacker to craft a JSON payload containing malicious commands. The server then executes these commands with the privileges of the running application, which can lead to unauthorized server access, data breaches, or denial of service.
To prevent OS Command Injection vulnerabilities when handling user input for shell commands:
os.popen()
with user input. Prefer using the subprocess
module. Pass commands and arguments as a list (for example, subprocess.run(["command", "arg1", "arg2"], shell=False)
). This method avoids shell interpretation of the command string.shlex.quote()
for any part of the command that originates from user input.import os
from flask import Flask, request
app = Flask(__name__)
@app.route("/")
def index():
return "Flask app is running. Add your first vulnerability!"
@app.route("/run", methods=["POST"])
def run_command():
data = request.get_json()
command = data.get("command") if data else None
if not command:
return "Missing 'command' in request body", 400
stream = os.popen(command)
output = stream.read()
return f"<pre>{output}</pre>"
if __name__ == "__main__":
app.run(host="0.0.0.0", port=80)
import os
import shlex
from flask import Flask, request
import subprocess
app = Flask(__name__)
@app.route("/run_safe_shlex", methods=["POST"])
def run_command_safe_shlex():
data = request.get_json()
user_command = data.get("command") if data else None
if not user_command:
return "Missing 'command' in request body", 400
# Sanitize using shlex.quote for commands meant for the shell
# This is safer but still relies on os.popen, ideally move to subprocess
safe_arg = shlex.quote(user_command)
# Example: using it as an argument to a fixed command like echo
stream = os.popen(f"echo {safe_arg}") # user_command is now an argument
output = stream.read()
return f"<pre>{output}</pre>"
@app.route("/run_safe_whitelist", methods=["POST"])
def run_command_safe_whitelist():
data = request.get_json()
action = data.get("action") if data else None
if not action:
return "Missing 'action' in request body", 400
actual_command = ""
if action == "list_users":
actual_command = "whoami" # Fixed, safe command
elif action == "show_date":
actual_command = "date" # Fixed, safe command
else:
return "Invalid action", 400
stream = os.popen(actual_command) # Executes a whitelisted, fixed command
output = stream.read()
return f"<pre>{output}</pre>"
@app.route("/run_safe_subprocess", methods=["POST"])
def run_command_safe_subprocess():
data = request.get_json()
command_parts = data.get("command_array") if data else None # Expecting ["ls", "-l", "/tmp"]
if not command_parts or not isinstance(command_parts, list):
return "Missing 'command_array' (list) in request body", 400
# Using subprocess with a list of arguments and shell=False is safest
try:
# Whitelist allowed executable
if command_parts[0] not in ["ls", "echo", "whoami", "date"]:
return "Command not allowed", 400
result = subprocess.run(command_parts, capture_output=True, text=True, check=True, shell=False)
return f"<pre>{result.stdout}</pre>"
except subprocess.CalledProcessError as e:
return f"Error: {e.stderr}", 500
except FileNotFoundError:
return f"Command not found: {command_parts[0]}", 400
# This should not be flagged as command_from_config is not from request.get_json()
@app.route("/run_config_command", methods=["POST"])
def run_config_command():
# data = request.get_json() # Input not used for command
command_from_config = "ls -l /etc"
stream = os.popen(command_from_config)
output = stream.read()
return f"<pre>{output}</pre>"