""" Прямое тестирование парсеров с локальным storage Этот модуль тестирует парсеры напрямую, без API """ import pytest import sys from pathlib import Path # Добавляем путь к проекту project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "python_parser")) from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser from core.services import ReportService from adapters.local_storage import LocalStorageAdapter class TestParsersDirect: """Прямое тестирование парсеров с локальным storage""" @pytest.fixture(autouse=True) def setup_storage(self, clean_storage): """Настройка локального storage для каждого теста""" self.storage = clean_storage self.report_service = ReportService(self.storage) def test_svodka_pm_parser_registration(self): """Тест регистрации парсера svodka_pm""" parser = SvodkaPMParser() getters = parser.get_available_getters() assert "single_og" in getters assert "total_ogs" in getters # Проверяем параметры геттеров single_og_getter = getters["single_og"] assert "id" in single_og_getter["required_params"] assert "codes" in single_og_getter["required_params"] assert "columns" in single_og_getter["required_params"] assert "search" in single_og_getter["optional_params"] total_ogs_getter = getters["total_ogs"] assert "codes" in total_ogs_getter["required_params"] assert "columns" in total_ogs_getter["required_params"] assert "search" in total_ogs_getter["optional_params"] print("✅ svodka_pm парсер зарегистрирован корректно") def test_svodka_ca_parser_registration(self): """Тест регистрации парсера svodka_ca""" parser = SvodkaCAParser() getters = parser.get_available_getters() assert "get_ca_data" in getters # Проверяем параметры геттера getter = getters["get_ca_data"] assert "modes" in getter["required_params"] assert "tables" in getter["required_params"] print("✅ svodka_ca парсер зарегистрирован корректно") def test_monitoring_fuel_parser_registration(self): """Тест регистрации парсера monitoring_fuel""" parser = MonitoringFuelParser() getters = parser.get_available_getters() assert "total_by_columns" in getters assert "month_by_code" in getters assert "series_by_id_and_columns" in getters # Проверяем параметры геттеров total_getter = getters["total_by_columns"] assert "columns" in total_getter["required_params"] month_getter = getters["month_by_code"] assert "month" in month_getter["required_params"] series_getter = getters["series_by_id_and_columns"] assert "columns" in series_getter["required_params"] print("✅ monitoring_fuel парсер зарегистрирован корректно") def test_storage_operations(self): """Тест операций с локальным storage""" import pandas as pd # Создаем тестовый DataFrame test_df = pd.DataFrame({ 'col1': [1, 2, 3], 'col2': ['a', 'b', 'c'] }) # Сохраняем success = self.storage.save_dataframe("test_object", test_df) assert success is True # Проверяем существование exists = self.storage.object_exists("test_object") assert exists is True # Загружаем loaded_df = self.storage.load_dataframe("test_object") assert loaded_df is not None assert loaded_df.shape == (3, 2) assert list(loaded_df.columns) == ['col1', 'col2'] # Получаем метаданные metadata = self.storage.get_object_metadata("test_object") assert metadata is not None assert metadata["shape"] == [3, 2] # Получаем список объектов objects = self.storage.list_objects() assert "test_object" in objects # Удаляем delete_success = self.storage.delete_object("test_object") assert delete_success is True # Проверяем, что объект удален exists_after = self.storage.object_exists("test_object") assert exists_after is False print("✅ Локальный storage работает корректно") def test_report_service_with_local_storage(self): """Тест ReportService с локальным storage""" # Проверяем, что ReportService может работать с локальным storage assert self.report_service.storage is not None assert hasattr(self.report_service.storage, 'save_dataframe') assert hasattr(self.report_service.storage, 'load_dataframe') print("✅ ReportService корректно работает с локальным storage")