import os import time import logging import signal import sys import redis from datetime import datetime, timedelta from minio import Minio from minio.error import S3Error # Default Configuration CONFIG = { "MINIO_ENDPOINT": "play.min.io", # Change to your MinIO instance "MINIO_ACCESS_KEY": "your-access-key", "MINIO_SECRET_KEY": "your-secret-key", "TOTAL_SIZE_LIMIT": "1G", # Configurable, supports 1G, 500M, etc. "WHITELIST": "admin,superuser", # Users that are not restricted, comma-separated string "AGGREGATE_INTERVAL": "600", # 10 minutes in seconds as string "LOG_FILE": "minio_quota.log", "REDIS_HOST": "localhost", "REDIS_PORT": "6379", "REDIS_DB": "0" } # Load Configuration from /etc/minio_quota.conf if it exists CONFIG_FILE = "/etc/minio_quota.conf" if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, "r") as f: file_config = f.readlines() for line in file_config: if "=" in line: key, value = line.strip().split("=", 1) CONFIG[key.strip()] = value.strip() # Override with Environment Variables if set CONFIG.update({key: os.getenv(key, value) for key, value in CONFIG.items()}) # Parse WHITELIST from string CONFIG["WHITELIST"] = CONFIG["WHITELIST"].split(",") # Parse AGGREGATE_INTERVAL as integer CONFIG["AGGREGATE_INTERVAL"] = int(CONFIG["AGGREGATE_INTERVAL"]) # Determine if the MinIO server is using HTTPS USE_HTTPS = CONFIG["MINIO_ENDPOINT"].startswith("https://") # Initialize MinIO client minio_client = Minio( CONFIG["MINIO_ENDPOINT"].replace("https://", "").replace("http://", ""), access_key=CONFIG["MINIO_ACCESS_KEY"], secret_key=CONFIG["MINIO_SECRET_KEY"], secure=USE_HTTPS, ) redis_client = redis.Redis( host=CONFIG["REDIS_HOST"], port=int(CONFIG["REDIS_PORT"]), db=int(CONFIG["REDIS_DB"]), decode_responses=True ) # Set up logging LOG_FILE = os.getenv("LOG_FILE", CONFIG.get("LOG_FILE")) handlers = [logging.StreamHandler()] # Always log to stdout if LOG_FILE and LOG_FILE.strip(): # Log to file only if LOG_FILE is set and not empty handlers.append(logging.FileHandler(LOG_FILE)) logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=handlers ) def parse_size(size_str): """Convert human-readable size string (e.g., 1G, 500M) to bytes.""" size_str = size_str.upper().strip() if size_str.endswith("G"): return int(float(size_str[:-1]) * 1024**3) elif size_str.endswith("M"): return int(float(size_str[:-1]) * 1024**2) elif size_str.endswith("K"): return int(float(size_str[:-1]) * 1024) else: return int(size_str) # Assume raw bytes if no suffix TOTAL_SIZE_LIMIT_BYTES = parse_size(CONFIG["TOTAL_SIZE_LIMIT"]) def get_buckets_by_user(): """Fetch all buckets and group them by username.""" logging.info("Fetching user buckets...") buckets = minio_client.list_buckets() user_buckets = {} for bucket in buckets: name = bucket.name username = name.split("-")[0] # Extract username from bucket name if username not in CONFIG["WHITELIST"]: user_buckets.setdefault(username, []).append(name) return user_buckets def get_bucket_size(bucket_name): """Calculate the total size of objects in a bucket.""" total_size = 0 try: objects = minio_client.list_objects(bucket_name, recursive=True) for obj in objects: total_size += obj.size except S3Error as e: logging.error(f"Error fetching objects for {bucket_name}: {e}") return total_size def aggregate_disk_usage(): """Aggregate disk usage per user and log warnings if they exceed the limit.""" user_buckets = get_buckets_by_user() user_usage = {} for user, buckets in user_buckets.items(): total_size = sum(get_bucket_size(bucket) for bucket in buckets) user_usage[user] = total_size if total_size > TOTAL_SIZE_LIMIT_BYTES: excess_size = total_size - TOTAL_SIZE_LIMIT_BYTES logging.warning(f"User {user} exceeded quota by {excess_size} bytes (Total size: {total_size} bytes)") redis_client.set(f"quota_exceeded:{user}", "1") else: # Remove quota exceeded flag if user is under quota if redis_client.exists(f"quota_exceeded:{user}"): redis_client.delete(f"quota_exceeded:{user}") logging.info(f"User {user} is now under quota. Removed quota exceeded flag.") return user_usage def signal_handler(sig, frame): """Handle SIGINT and SIGTERM to gracefully exit.""" logging.info(f"Received signal {sig}. Exiting gracefully...") sys.exit(0) # Register the signal handler for SIGINT and SIGTERM signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) def main(): """Main loop running every 10 minutes.""" while True: logging.info("Starting disk usage aggregation...") aggregate_disk_usage() logging.info("Sleeping for 10 minutes...") time.sleep(CONFIG["AGGREGATE_INTERVAL"]) if __name__ == "__main__": main()