97 lines
3.3 KiB
Python
97 lines
3.3 KiB
Python
from fastapi import FastAPI, Request, Response
|
||
import redis
|
||
import httpx
|
||
import os
|
||
import logging
|
||
|
||
# Default configuration
|
||
CONFIG = {
|
||
"REDIS_HOST": "127.0.0.1",
|
||
"REDIS_PORT": "6379",
|
||
"REDIS_DB": "2",
|
||
"MINIO_ENDPOINT": "http://localhost:9000",
|
||
"LOG_FILE": "/var/log/minio_quota.log"
|
||
}
|
||
|
||
# Load config from /etc/minio_quota.conf if available
|
||
CONFIG_FILE = "/etc/minio_quota.conf"
|
||
if os.path.exists(CONFIG_FILE):
|
||
with open(CONFIG_FILE, "r") as f:
|
||
for line in f:
|
||
if "=" in line:
|
||
key, value = line.strip().split("=", 1)
|
||
CONFIG[key.strip()] = value.strip()
|
||
|
||
# Read configuration with priority:
|
||
REDIS_HOST = os.getenv("REDIS_HOST", CONFIG.get("REDIS_HOST"))
|
||
REDIS_PORT = os.getenv("REDIS_PORT", CONFIG.get("REDIS_PORT"))
|
||
REDIS_DB = int(os.getenv("REDIS_DB", CONFIG.get("REDIS_DB")))
|
||
MINIO_ENDPOINT = os.getenv("MINIO_ENDPOINT", CONFIG.get("MINIO_ENDPOINT"))
|
||
LOG_FILE = os.getenv("LOG_FILE", CONFIG.get("LOG_FILE"))
|
||
|
||
# Set up logging
|
||
handlers = [logging.StreamHandler()] # Always log to stdout
|
||
if LOG_FILE and LOG_FILE.strip(): # Log to file only if LOG_FILE is set and not empty
|
||
handlers.append(logging.FileHandler(LOG_FILE))
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format="%(asctime)s - %(levelname)s - %(message)s",
|
||
handlers=handlers
|
||
)
|
||
|
||
# Initialize FastAPI and Redis client
|
||
app = FastAPI()
|
||
redis_client = redis.Redis(host=REDIS_HOST, port=int(REDIS_PORT), db=REDIS_DB)
|
||
|
||
@app.api_route("/{path:path}", methods=["GET", "PUT", "POST", "DELETE"])
|
||
async def proxy_request(path: str, request: Request):
|
||
username = path.split("/")[0].split("-")[0]
|
||
logging.info(f"Received {request.method} request for: {path}")
|
||
|
||
if username:
|
||
quota_exceeded = redis_client.get(f"quota_exceeded:{username}")
|
||
if quota_exceeded:
|
||
logging.warning(f"Quota exceeded for user: {username}, blocking request.")
|
||
error_xml = f"""<?xml version="1.0" encoding="UTF-8"?>
|
||
<Error>
|
||
<Code>QuotaExceeded</Code>
|
||
<Message>User has exceeded storage quota.</Message>
|
||
<Resource>/{path}</Resource>
|
||
<RequestId>request-id-12345</RequestId>
|
||
</Error>"""
|
||
return Response(content=error_xml, status_code=403, media_type="application/xml")
|
||
|
||
# Proxy request to MinIO
|
||
minio_url = f"{MINIO_ENDPOINT}/{path}"
|
||
headers = dict(request.headers)
|
||
|
||
async with httpx.AsyncClient(timeout=300.0) as client:
|
||
logging.info(f"Proxying request to MinIO: {minio_url}")
|
||
if request.method in ["PUT", "POST"]:
|
||
# Stream request body to MinIO
|
||
async def request_stream():
|
||
async for chunk in request.stream():
|
||
yield chunk
|
||
|
||
minio_response = await client.request(
|
||
method=request.method,
|
||
url=minio_url,
|
||
headers=headers,
|
||
content=request_stream()
|
||
)
|
||
else:
|
||
# For GET and DELETE, we don’t send a body
|
||
minio_response = await client.request(
|
||
method=request.method,
|
||
url=minio_url,
|
||
headers=headers
|
||
)
|
||
|
||
logging.info(f"MinIO response: {minio_response.status_code}")
|
||
|
||
return Response(
|
||
content=minio_response.content,
|
||
status_code=minio_response.status_code,
|
||
headers=minio_response.headers
|
||
)
|