152 lines
No EOL
5.1 KiB
Python
152 lines
No EOL
5.1 KiB
Python
import os
|
|
import time
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import redis
|
|
from datetime import datetime, timedelta
|
|
from minio import Minio
|
|
from minio.error import S3Error
|
|
|
|
# Default Configuration
|
|
CONFIG = {
|
|
"MINIO_ENDPOINT": "play.min.io", # Change to your MinIO instance
|
|
"MINIO_ACCESS_KEY": "your-access-key",
|
|
"MINIO_SECRET_KEY": "your-secret-key",
|
|
"TOTAL_SIZE_LIMIT": "1G", # Configurable, supports 1G, 500M, etc.
|
|
"WHITELIST": "admin,superuser", # Users that are not restricted, comma-separated string
|
|
"AGGREGATE_INTERVAL": "600", # 10 minutes in seconds as string
|
|
"LOG_FILE": "minio_quota.log",
|
|
"REDIS_HOST": "localhost",
|
|
"REDIS_PORT": "6379",
|
|
"REDIS_DB": "0"
|
|
}
|
|
|
|
# Load Configuration from /etc/minio_quota.conf if it exists
|
|
CONFIG_FILE = "/etc/minio_quota.conf"
|
|
if os.path.exists(CONFIG_FILE):
|
|
with open(CONFIG_FILE, "r") as f:
|
|
file_config = f.readlines()
|
|
for line in file_config:
|
|
if "=" in line:
|
|
key, value = line.strip().split("=", 1)
|
|
CONFIG[key.strip()] = value.strip()
|
|
|
|
# Override with Environment Variables if set
|
|
CONFIG.update({key: os.getenv(key, value) for key, value in CONFIG.items()})
|
|
|
|
# Parse WHITELIST from string
|
|
CONFIG["WHITELIST"] = CONFIG["WHITELIST"].split(",")
|
|
|
|
# Parse AGGREGATE_INTERVAL as integer
|
|
CONFIG["AGGREGATE_INTERVAL"] = int(CONFIG["AGGREGATE_INTERVAL"])
|
|
|
|
# Determine if the MinIO server is using HTTPS
|
|
USE_HTTPS = CONFIG["MINIO_ENDPOINT"].startswith("https://")
|
|
|
|
# Initialize MinIO client
|
|
minio_client = Minio(
|
|
CONFIG["MINIO_ENDPOINT"].replace("https://", "").replace("http://", ""),
|
|
access_key=CONFIG["MINIO_ACCESS_KEY"],
|
|
secret_key=CONFIG["MINIO_SECRET_KEY"],
|
|
secure=USE_HTTPS,
|
|
)
|
|
|
|
redis_client = redis.Redis(
|
|
host=CONFIG["REDIS_HOST"],
|
|
port=int(CONFIG["REDIS_PORT"]),
|
|
db=int(CONFIG["REDIS_DB"]),
|
|
decode_responses=True
|
|
)
|
|
|
|
|
|
# Set up logging
|
|
LOG_FILE = os.getenv("LOG_FILE", CONFIG.get("LOG_FILE"))
|
|
handlers = [logging.StreamHandler()] # Always log to stdout
|
|
if LOG_FILE and LOG_FILE.strip(): # Log to file only if LOG_FILE is set and not empty
|
|
handlers.append(logging.FileHandler(LOG_FILE))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=handlers
|
|
)
|
|
|
|
def parse_size(size_str):
|
|
"""Convert human-readable size string (e.g., 1G, 500M) to bytes."""
|
|
size_str = size_str.upper().strip()
|
|
if size_str.endswith("G"):
|
|
return int(float(size_str[:-1]) * 1024**3)
|
|
elif size_str.endswith("M"):
|
|
return int(float(size_str[:-1]) * 1024**2)
|
|
elif size_str.endswith("K"):
|
|
return int(float(size_str[:-1]) * 1024)
|
|
else:
|
|
return int(size_str) # Assume raw bytes if no suffix
|
|
|
|
TOTAL_SIZE_LIMIT_BYTES = parse_size(CONFIG["TOTAL_SIZE_LIMIT"])
|
|
|
|
def get_buckets_by_user():
|
|
"""Fetch all buckets and group them by username."""
|
|
logging.info("Fetching user buckets...")
|
|
buckets = minio_client.list_buckets()
|
|
user_buckets = {}
|
|
|
|
for bucket in buckets:
|
|
name = bucket.name
|
|
username = name.split("-")[0] # Extract username from bucket name
|
|
if username not in CONFIG["WHITELIST"]:
|
|
user_buckets.setdefault(username, []).append(name)
|
|
|
|
return user_buckets
|
|
|
|
def get_bucket_size(bucket_name):
|
|
"""Calculate the total size of objects in a bucket."""
|
|
total_size = 0
|
|
try:
|
|
objects = minio_client.list_objects(bucket_name, recursive=True)
|
|
for obj in objects:
|
|
total_size += obj.size
|
|
except S3Error as e:
|
|
logging.error(f"Error fetching objects for {bucket_name}: {e}")
|
|
return total_size
|
|
|
|
def aggregate_disk_usage():
|
|
"""Aggregate disk usage per user and log warnings if they exceed the limit."""
|
|
user_buckets = get_buckets_by_user()
|
|
user_usage = {}
|
|
|
|
for user, buckets in user_buckets.items():
|
|
total_size = sum(get_bucket_size(bucket) for bucket in buckets)
|
|
user_usage[user] = total_size
|
|
if total_size > TOTAL_SIZE_LIMIT_BYTES:
|
|
excess_size = total_size - TOTAL_SIZE_LIMIT_BYTES
|
|
logging.warning(f"User {user} exceeded quota by {excess_size} bytes (Total size: {total_size} bytes)")
|
|
redis_client.set(f"quota_exceeded:{user}", "1")
|
|
else:
|
|
# Remove quota exceeded flag if user is under quota
|
|
if redis_client.exists(f"quota_exceeded:{user}"):
|
|
redis_client.delete(f"quota_exceeded:{user}")
|
|
logging.info(f"User {user} is now under quota. Removed quota exceeded flag.")
|
|
|
|
return user_usage
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle SIGINT and SIGTERM to gracefully exit."""
|
|
logging.info(f"Received signal {sig}. Exiting gracefully...")
|
|
sys.exit(0)
|
|
|
|
# Register the signal handler for SIGINT and SIGTERM
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
def main():
|
|
"""Main loop running every 10 minutes."""
|
|
while True:
|
|
logging.info("Starting disk usage aggregation...")
|
|
aggregate_disk_usage()
|
|
logging.info("Sleeping for 10 minutes...")
|
|
time.sleep(CONFIG["AGGREGATE_INTERVAL"])
|
|
|
|
if __name__ == "__main__":
|
|
main() |