minio_quota_checker/minio_quota_checker.py

151 lines
No EOL
5 KiB
Python

import os
import time
import logging
import signal
import sys
import redis
from datetime import datetime, timedelta
from minio import Minio
from minio.error import S3Error
# Default Configuration
CONFIG = {
"MINIO_ENDPOINT": "play.min.io", # Change to your MinIO instance
"MINIO_ACCESS_KEY": "your-access-key",
"MINIO_SECRET_KEY": "your-secret-key",
"TOTAL_SIZE_LIMIT": "1G", # Configurable, supports 1G, 500M, etc.
"WHITELIST": "admin,superuser", # Users that are not restricted, comma-separated string
"AGGREGATE_INTERVAL": "600", # 10 minutes in seconds as string
"LOG_FILE": "minio_quota.log",
"REDIS_HOST": "localhost",
"REDIS_PORT": "6379",
"REDIS_DB": "0"
}
# Load Configuration from /etc/minio_quota.conf if it exists
CONFIG_FILE = "/etc/minio_quota.conf"
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, "r") as f:
file_config = f.readlines()
for line in file_config:
if "=" in line:
key, value = line.strip().split("=", 1)
CONFIG[key.strip()] = value.strip()
# Override with Environment Variables if set
CONFIG.update({key: os.getenv(key, value) for key, value in CONFIG.items()})
# Parse WHITELIST from string
CONFIG["WHITELIST"] = CONFIG["WHITELIST"].split(",")
# Parse AGGREGATE_INTERVAL as integer
CONFIG["AGGREGATE_INTERVAL"] = int(CONFIG["AGGREGATE_INTERVAL"])
# Determine if the MinIO server is using HTTPS
USE_HTTPS = CONFIG["MINIO_ENDPOINT"].startswith("https://")
# Initialize MinIO client
minio_client = Minio(
CONFIG["MINIO_ENDPOINT"].replace("https://", "").replace("http://", ""),
access_key=CONFIG["MINIO_ACCESS_KEY"],
secret_key=CONFIG["MINIO_SECRET_KEY"],
secure=USE_HTTPS,
)
redis_client = redis.Redis(
host=CONFIG["REDIS_HOST"],
port=int(CONFIG["REDIS_PORT"]),
db=int(CONFIG["REDIS_DB"]),
decode_responses=True
)
# Set up logging
handlers = [logging.StreamHandler()] # Always log to stdout
if LOG_FILE and LOG_FILE.strip(): # Log to file only if LOG_FILE is set and not empty
handlers.append(logging.FileHandler(LOG_FILE))
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=handlers
)
def parse_size(size_str):
"""Convert human-readable size string (e.g., 1G, 500M) to bytes."""
size_str = size_str.upper().strip()
if size_str.endswith("G"):
return int(float(size_str[:-1]) * 1024**3)
elif size_str.endswith("M"):
return int(float(size_str[:-1]) * 1024**2)
elif size_str.endswith("K"):
return int(float(size_str[:-1]) * 1024)
else:
return int(size_str) # Assume raw bytes if no suffix
TOTAL_SIZE_LIMIT_BYTES = parse_size(CONFIG["TOTAL_SIZE_LIMIT"])
def get_buckets_by_user():
"""Fetch all buckets and group them by username."""
logging.info("Fetching user buckets...")
buckets = minio_client.list_buckets()
user_buckets = {}
for bucket in buckets:
name = bucket.name
username = name.split("-")[0] # Extract username from bucket name
if username not in CONFIG["WHITELIST"]:
user_buckets.setdefault(username, []).append(name)
return user_buckets
def get_bucket_size(bucket_name):
"""Calculate the total size of objects in a bucket."""
total_size = 0
try:
objects = minio_client.list_objects(bucket_name, recursive=True)
for obj in objects:
total_size += obj.size
except S3Error as e:
logging.error(f"Error fetching objects for {bucket_name}: {e}")
return total_size
def aggregate_disk_usage():
"""Aggregate disk usage per user and log warnings if they exceed the limit."""
user_buckets = get_buckets_by_user()
user_usage = {}
for user, buckets in user_buckets.items():
total_size = sum(get_bucket_size(bucket) for bucket in buckets)
user_usage[user] = total_size
if total_size > TOTAL_SIZE_LIMIT_BYTES:
excess_size = total_size - TOTAL_SIZE_LIMIT_BYTES
logging.warning(f"User {user} exceeded quota by {excess_size} bytes (Total size: {total_size} bytes)")
redis_client.set(f"quota_exceeded:{user}", "1")
else:
# Remove quota exceeded flag if user is under quota
if redis_client.exists(f"quota_exceeded:{user}"):
redis_client.delete(f"quota_exceeded:{user}")
logging.info(f"User {user} is now under quota. Removed quota exceeded flag.")
return user_usage
def signal_handler(sig, frame):
"""Handle SIGINT and SIGTERM to gracefully exit."""
logging.info(f"Received signal {sig}. Exiting gracefully...")
sys.exit(0)
# Register the signal handler for SIGINT and SIGTERM
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
def main():
"""Main loop running every 10 minutes."""
while True:
logging.info("Starting disk usage aggregation...")
aggregate_disk_usage()
logging.info("Sleeping for 10 minutes...")
time.sleep(CONFIG["AGGREGATE_INTERVAL"])
if __name__ == "__main__":
main()