110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Тестовый скрипт для проверки подключения к MinIO
|
||
"""
|
||
|
||
import os
|
||
import sys
|
||
import io
|
||
from minio import Minio
|
||
|
||
def test_minio_connection():
|
||
"""Тестирование подключения к MinIO"""
|
||
print("🔍 Тестирование подключения к MinIO...")
|
||
|
||
# Параметры подключения
|
||
endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
|
||
access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
|
||
secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
|
||
bucket_name = os.getenv("MINIO_BUCKET", "svodka-data")
|
||
|
||
print(f"📍 Endpoint: {endpoint}")
|
||
print(f"🔑 Access Key: {access_key}")
|
||
print(f"🔐 Secret Key: {secret_key}")
|
||
print(f"🪣 Bucket: {bucket_name}")
|
||
|
||
try:
|
||
# Создаем клиент
|
||
print("\n🚀 Создаю MinIO клиент...")
|
||
client = Minio(
|
||
endpoint,
|
||
access_key=access_key,
|
||
secret_key=secret_key,
|
||
secure=False,
|
||
cert_check=False
|
||
)
|
||
|
||
# Проверяем подключение
|
||
print("✅ MinIO клиент создан")
|
||
|
||
# Проверяем bucket
|
||
print(f"\n🔍 Проверяю bucket '{bucket_name}'...")
|
||
if client.bucket_exists(bucket_name):
|
||
print(f"✅ Bucket '{bucket_name}' существует")
|
||
else:
|
||
print(f"⚠️ Bucket '{bucket_name}' не существует, создаю...")
|
||
client.make_bucket(bucket_name)
|
||
print(f"✅ Bucket '{bucket_name}' создан")
|
||
|
||
# Пробуем загрузить тестовый файл
|
||
print("\n📤 Тестирую загрузку файла...")
|
||
test_data = b"Hello MinIO!"
|
||
test_stream = io.BytesIO(test_data)
|
||
|
||
client.put_object(
|
||
bucket_name,
|
||
"test.txt",
|
||
test_stream,
|
||
length=len(test_data),
|
||
content_type='text/plain'
|
||
)
|
||
print("✅ Тестовый файл загружен")
|
||
|
||
# Пробуем скачать файл
|
||
print("\n📥 Тестирую скачивание файла...")
|
||
response = client.get_object(bucket_name, "test.txt")
|
||
downloaded_data = response.read()
|
||
print(f"✅ Файл скачан: {downloaded_data}")
|
||
|
||
# Удаляем тестовый файл
|
||
client.remove_object(bucket_name, "test.txt")
|
||
print("✅ Тестовый файл удален")
|
||
|
||
print("\n🎉 Все тесты MinIO прошли успешно!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ Ошибка подключения к MinIO: {e}")
|
||
print(f"Тип ошибки: {type(e).__name__}")
|
||
return False
|
||
|
||
def test_environment():
|
||
"""Проверка переменных окружения"""
|
||
print("🔧 Проверка переменных окружения:")
|
||
env_vars = [
|
||
"MINIO_ENDPOINT",
|
||
"MINIO_ACCESS_KEY",
|
||
"MINIO_SECRET_KEY",
|
||
"MINIO_BUCKET"
|
||
]
|
||
|
||
for var in env_vars:
|
||
value = os.getenv(var, "НЕ УСТАНОВЛЕНО")
|
||
print(f" {var}: {value}")
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 60)
|
||
print("🧪 ТЕСТ ПОДКЛЮЧЕНИЯ К MINIO")
|
||
print("=" * 60)
|
||
|
||
test_environment()
|
||
print()
|
||
|
||
success = test_minio_connection()
|
||
|
||
if success:
|
||
print("\n✅ MinIO работает корректно!")
|
||
sys.exit(0)
|
||
else:
|
||
print("\n❌ Проблемы с MinIO!")
|
||
sys.exit(1) |