minio_quota_checker/minio_quota_checker.py

152 lines
No EOL
5.1 KiB
Python

import os
import time
import logging
import signal
import sys
import redis
from datetime import datetime, timedelta
from minio import Minio
from minio.error import S3Error
# Default Configuration
CONFIG = {
"MINIO_ENDPOINT": "play.min.io", # Change to your MinIO instance
"MINIO_ACCESS_KEY": "your-access-key",
"MINIO_SECRET_KEY": "your-secret-key",
"TOTAL_SIZE_LIMIT": "1G", # Configurable, supports 1G, 500M, etc.
"WHITELIST": "admin,superuser", # Users that are not restricted, comma-separated string
"AGGREGATE_INTERVAL": "600", # 10 minutes in seconds as string
"LOG_FILE": "minio_quota.log",
"REDIS_HOST": "localhost",
"REDIS_PORT": "6379",
"REDIS_DB": "0"
}
# Load Configuration from /etc/minio_quota.conf if it exists
CONFIG_FILE = "/etc/minio_quota.conf"
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r") as f:
file_config = f.readlines()
for line in file_config:
if "=" in line:
key, value = line.strip().split("=", 1)
CONFIG[key.strip()] = value.strip()
# Override with Environment Variables if set
CONFIG.update({key: os.getenv(key, value) for key, value in CONFIG.items()})
# Parse WHITELIST from string
CONFIG["WHITELIST"] = CONFIG["WHITELIST"].split(",")
# Parse AGGREGATE_INTERVAL as integer
CONFIG["AGGREGATE_INTERVAL"] = int(CONFIG["AGGREGATE_INTERVAL"])
# Determine if the MinIO server is using HTTPS
USE_HTTPS = CONFIG["MINIO_ENDPOINT"].startswith("https://")
# Initialize MinIO client
minio_client = Minio(
CONFIG["MINIO_ENDPOINT"].replace("https://", "").replace("http://", ""),
access_key=CONFIG["MINIO_ACCESS_KEY"],
secret_key=CONFIG["MINIO_SECRET_KEY"],
secure=USE_HTTPS,
)
redis_client = redis.Redis(
host=CONFIG["REDIS_HOST"],
port=int(CONFIG["REDIS_PORT"]),
db=int(CONFIG["REDIS_DB"]),
decode_responses=True
)
# Set up logging
LOG_FILE = os.getenv("LOG_FILE", CONFIG.get("LOG_FILE"))
handlers = [logging.StreamHandler()] # Always log to stdout
if LOG_FILE and LOG_FILE.strip(): # Log to file only if LOG_FILE is set and not empty
handlers.append(logging.FileHandler(LOG_FILE))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=handlers
)
def parse_size(size_str):
"""Convert human-readable size string (e.g., 1G, 500M) to bytes."""
size_str = size_str.upper().strip()
if size_str.endswith("G"):
return int(float(size_str[:-1]) * 1024**3)
elif size_str.endswith("M"):
return int(float(size_str[:-1]) * 1024**2)
elif size_str.endswith("K"):
return int(float(size_str[:-1]) * 1024)
else:
return int(size_str) # Assume raw bytes if no suffix
TOTAL_SIZE_LIMIT_BYTES = parse_size(CONFIG["TOTAL_SIZE_LIMIT"])
def get_buckets_by_user():
"""Fetch all buckets and group them by username."""
logging.info("Fetching user buckets...")
buckets = minio_client.list_buckets()
user_buckets = {}
for bucket in buckets:
name = bucket.name
username = name.split("-")[0] # Extract username from bucket name
if username not in CONFIG["WHITELIST"]:
user_buckets.setdefault(username, []).append(name)
return user_buckets
def get_bucket_size(bucket_name):
"""Calculate the total size of objects in a bucket."""
total_size = 0
try:
objects = minio_client.list_objects(bucket_name, recursive=True)
for obj in objects:
total_size += obj.size
except S3Error as e:
logging.error(f"Error fetching objects for {bucket_name}: {e}")
return total_size
def aggregate_disk_usage():
"""Aggregate disk usage per user and log warnings if they exceed the limit."""
user_buckets = get_buckets_by_user()
user_usage = {}
for user, buckets in user_buckets.items():
total_size = sum(get_bucket_size(bucket) for bucket in buckets)
user_usage[user] = total_size
if total_size > TOTAL_SIZE_LIMIT_BYTES:
excess_size = total_size - TOTAL_SIZE_LIMIT_BYTES
logging.warning(f"User {user} exceeded quota by {excess_size} bytes (Total size: {total_size} bytes)")
redis_client.set(f"quota_exceeded:{user}", "1")
else:
# Remove quota exceeded flag if user is under quota
if redis_client.exists(f"quota_exceeded:{user}"):
redis_client.delete(f"quota_exceeded:{user}")
logging.info(f"User {user} is now under quota. Removed quota exceeded flag.")
return user_usage
def signal_handler(sig, frame):
"""Handle SIGINT and SIGTERM to gracefully exit."""
logging.info(f"Received signal {sig}. Exiting gracefully...")
sys.exit(0)
# Register the signal handler for SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def main():
"""Main loop running every 10 minutes."""
while True:
logging.info("Starting disk usage aggregation...")
aggregate_disk_usage()
logging.info("Sleeping for 10 minutes...")
time.sleep(CONFIG["AGGREGATE_INTERVAL"])
if __name__ == "__main__":
main()