#!/usr/bin/env python3 """ Тестовый скрипт для проверки подключения к MinIO """ import os import sys import io from minio import Minio def test_minio_connection(): """Тестирование подключения к MinIO""" print("🔍 Тестирование подключения к MinIO...") # Параметры подключения endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000") access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin") secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin") bucket_name = os.getenv("MINIO_BUCKET", "svodka-data") print(f"📍 Endpoint: {endpoint}") print(f"🔑 Access Key: {access_key}") print(f"🔐 Secret Key: {secret_key}") print(f"🪣 Bucket: {bucket_name}") try: # Создаем клиент print("\n🚀 Создаю MinIO клиент...") client = Minio( endpoint, access_key=access_key, secret_key=secret_key, secure=False, cert_check=False ) # Проверяем подключение print("✅ MinIO клиент создан") # Проверяем bucket print(f"\n🔍 Проверяю bucket '{bucket_name}'...") if client.bucket_exists(bucket_name): print(f"✅ Bucket '{bucket_name}' существует") else: print(f"⚠️ Bucket '{bucket_name}' не существует, создаю...") client.make_bucket(bucket_name) print(f"✅ Bucket '{bucket_name}' создан") # Пробуем загрузить тестовый файл print("\n📤 Тестирую загрузку файла...") test_data = b"Hello MinIO!" test_stream = io.BytesIO(test_data) client.put_object( bucket_name, "test.txt", test_stream, length=len(test_data), content_type='text/plain' ) print("✅ Тестовый файл загружен") # Пробуем скачать файл print("\n📥 Тестирую скачивание файла...") response = client.get_object(bucket_name, "test.txt") downloaded_data = response.read() print(f"✅ Файл скачан: {downloaded_data}") # Удаляем тестовый файл client.remove_object(bucket_name, "test.txt") print("✅ Тестовый файл удален") print("\n🎉 Все тесты MinIO прошли успешно!") return True except Exception as e: print(f"\n❌ Ошибка подключения к MinIO: {e}") print(f"Тип ошибки: {type(e).__name__}") return False def test_environment(): """Проверка переменных окружения""" print("🔧 Проверка переменных окружения:") env_vars = [ "MINIO_ENDPOINT", "MINIO_ACCESS_KEY", "MINIO_SECRET_KEY", "MINIO_BUCKET" ] for var in env_vars: value = os.getenv(var, "НЕ УСТАНОВЛЕНО") print(f" {var}: {value}") if __name__ == "__main__": print("=" * 60) print("🧪 ТЕСТ ПОДКЛЮЧЕНИЯ К MINIO") print("=" * 60) test_environment() print() success = test_minio_connection() if success: print("\n✅ MinIO работает корректно!") sys.exit(0) else: print("\n❌ Проблемы с MinIO!") sys.exit(1)