Files
python_parser/python_parser/adapters/storage.py
2025-09-04 18:24:53 +03:00

140 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Адаптер для хранилища S3/MinIO
"""
import os
import pickle
import io
import logging
from typing import Optional
from minio import Minio # boto3
import pandas as pd
from core.ports import StoragePort
# Настройка логгера для модуля
logger = logging.getLogger(__name__)
class MinIOStorageAdapter(StoragePort):
"""Адаптер для MinIO хранилища"""
def __init__(self):
self._client = None
self._bucket_name = os.getenv("MINIO_BUCKET", "svodka-data")
self._endpoint = os.getenv("MINIO_ENDPOINT", "localhost:9000")
self._access_key = os.getenv("MINIO_ACCESS_KEY", "minioadmin")
self._secret_key = os.getenv("MINIO_SECRET_KEY", "minioadmin")
self._secure = os.getenv("MINIO_SECURE", "false").lower() == "true"
@property
def client(self):
"""Ленивая инициализация MinIO клиента"""
if self._client is None:
try:
self._client = Minio(
self._endpoint,
access_key=self._access_key,
secret_key=self._secret_key,
secure=self._secure,
cert_check=False
)
# Проверяем bucket только при первом использовании
self._ensure_bucket_exists()
except Exception as e:
logger.warning(f"⚠️ Не удалось подключиться к MinIO: {e}")
logger.warning("MinIO будет недоступен, но приложение продолжит работать")
return None
return self._client
def _ensure_bucket_exists(self):
"""Проверка существования bucket и создание при необходимости"""
if self.client is None:
return False
try:
if not self.client.bucket_exists(self._bucket_name):
self.client.make_bucket(self._bucket_name)
logger.info(f"✅ Bucket '{self._bucket_name}' создан")
return True
except Exception as e:
logger.error(f"❌ Ошибка при работе с bucket: {e}")
return False
def save_dataframe(self, df: pd.DataFrame, object_id: str) -> bool:
"""Сохранение DataFrame в MinIO"""
if self.client is None:
logger.warning("⚠️ MinIO недоступен, данные не сохранены")
return False
try:
# Сериализуем DataFrame
data = pickle.dumps(df)
# Создаем поток данных
data_stream = io.BytesIO(data)
# Загружаем в MinIO
self.client.put_object(
self._bucket_name,
object_id,
data_stream,
length=len(data),
content_type='application/octet-stream'
)
logger.info(f"✅ DataFrame успешно сохранен в MinIO: {self._bucket_name}/{object_id}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при сохранении в MinIO: {e}")
return False
def load_dataframe(self, object_id: str) -> Optional[pd.DataFrame]:
"""Загрузка DataFrame из MinIO"""
if self.client is None:
logger.warning("⚠️ MinIO недоступен, данные не загружены")
return None
try:
# Получаем объект из MinIO
response = self.client.get_object(self._bucket_name, object_id)
# Читаем данные
data = response.read()
# Десериализуем DataFrame
df = pickle.loads(data)
return df
except Exception as e:
logger.error(f"❌ Ошибка при загрузке данных из MinIO: {e}")
return None
finally:
if 'response' in locals():
response.close()
response.release_conn()
def delete_object(self, object_id: str) -> bool:
"""Удаление объекта из MinIO"""
if self.client is None:
logger.warning("⚠️ MinIO недоступен, объект не удален")
return False
try:
self.client.remove_object(self._bucket_name, object_id)
logger.info(f"✅ Объект успешно удален из MinIO: {self._bucket_name}/{object_id}")
return True
except Exception as e:
logger.error(f"❌ Ошибка при удалении объекта из MinIO: {e}")
return False
def object_exists(self, object_id: str) -> bool:
"""Проверка существования объекта в MinIO"""
if self.client is None:
return False
try:
self.client.stat_object(self._bucket_name, object_id)
return True
except Exception:
return False