Files
python_parser/test_minio_connection.py
2025-09-01 13:58:42 +03:00

110 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Тестовый скрипт для проверки подключения к MinIO
"""
import os
import sys
import io
from minio import Minio
def test_minio_connection():
"""Тестирование подключения к MinIO"""
print("🔍 Тестирование подключения к MinIO...")
# Параметры подключения
endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
bucket_name = os.getenv("MINIO_BUCKET", "svodka-data")
print(f"📍 Endpoint: {endpoint}")
print(f"🔑 Access Key: {access_key}")
print(f"🔐 Secret Key: {secret_key}")
print(f"🪣 Bucket: {bucket_name}")
try:
# Создаем клиент
print("\n🚀 Создаю MinIO клиент...")
client = Minio(
endpoint,
access_key=access_key,
secret_key=secret_key,
secure=False,
cert_check=False
)
# Проверяем подключение
print("✅ MinIO клиент создан")
# Проверяем bucket
print(f"\n🔍 Проверяю bucket '{bucket_name}'...")
if client.bucket_exists(bucket_name):
print(f"✅ Bucket '{bucket_name}' существует")
else:
print(f"⚠️ Bucket '{bucket_name}' не существует, создаю...")
client.make_bucket(bucket_name)
print(f"✅ Bucket '{bucket_name}' создан")
# Пробуем загрузить тестовый файл
print("\n📤 Тестирую загрузку файла...")
test_data = b"Hello MinIO!"
test_stream = io.BytesIO(test_data)
client.put_object(
bucket_name,
"test.txt",
test_stream,
length=len(test_data),
content_type='text/plain'
)
print("✅ Тестовый файл загружен")
# Пробуем скачать файл
print("\n📥 Тестирую скачивание файла...")
response = client.get_object(bucket_name, "test.txt")
downloaded_data = response.read()
print(f"✅ Файл скачан: {downloaded_data}")
# Удаляем тестовый файл
client.remove_object(bucket_name, "test.txt")
print("✅ Тестовый файл удален")
print("\n🎉 Все тесты MinIO прошли успешно!")
return True
except Exception as e:
print(f"\n❌ Ошибка подключения к MinIO: {e}")
print(f"Тип ошибки: {type(e).__name__}")
return False
def test_environment():
"""Проверка переменных окружения"""
print("🔧 Проверка переменных окружения:")
env_vars = [
"MINIO_ENDPOINT",
"MINIO_ACCESS_KEY",
"MINIO_SECRET_KEY",
"MINIO_BUCKET"
]
for var in env_vars:
value = os.getenv(var, "НЕ УСТАНОВЛЕНО")
print(f" {var}: {value}")
if __name__ == "__main__":
print("=" * 60)
print("🧪 ТЕСТ ПОДКЛЮЧЕНИЯ К MINIO")
print("=" * 60)
test_environment()
print()
success = test_minio_connection()
if success:
print("\n✅ MinIO работает корректно!")
sys.exit(0)
else:
print("\n❌ Проблемы с MinIO!")
sys.exit(1)