134 lines
5.5 KiB
Python
134 lines
5.5 KiB
Python
"""
|
||
Прямое тестирование парсеров с локальным storage
|
||
Этот модуль тестирует парсеры напрямую, без API
|
||
"""
|
||
import pytest
|
||
import sys
|
||
from pathlib import Path
|
||
|
||
# Добавляем путь к проекту
|
||
project_root = Path(__file__).parent.parent
|
||
sys.path.insert(0, str(project_root / "python_parser"))
|
||
|
||
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
|
||
from core.services import ReportService
|
||
from adapters.local_storage import LocalStorageAdapter
|
||
|
||
|
||
class TestParsersDirect:
|
||
"""Прямое тестирование парсеров с локальным storage"""
|
||
|
||
@pytest.fixture(autouse=True)
|
||
def setup_storage(self, clean_storage):
|
||
"""Настройка локального storage для каждого теста"""
|
||
self.storage = clean_storage
|
||
self.report_service = ReportService(self.storage)
|
||
|
||
def test_svodka_pm_parser_registration(self):
|
||
"""Тест регистрации парсера svodka_pm"""
|
||
parser = SvodkaPMParser()
|
||
getters = parser.get_available_getters()
|
||
|
||
assert "single_og" in getters
|
||
assert "total_ogs" in getters
|
||
|
||
# Проверяем параметры геттеров
|
||
single_og_getter = getters["single_og"]
|
||
assert "id" in single_og_getter["required_params"]
|
||
assert "codes" in single_og_getter["required_params"]
|
||
assert "columns" in single_og_getter["required_params"]
|
||
assert "search" in single_og_getter["optional_params"]
|
||
|
||
total_ogs_getter = getters["total_ogs"]
|
||
assert "codes" in total_ogs_getter["required_params"]
|
||
assert "columns" in total_ogs_getter["required_params"]
|
||
assert "search" in total_ogs_getter["optional_params"]
|
||
|
||
print("✅ svodka_pm парсер зарегистрирован корректно")
|
||
|
||
def test_svodka_ca_parser_registration(self):
|
||
"""Тест регистрации парсера svodka_ca"""
|
||
parser = SvodkaCAParser()
|
||
getters = parser.get_available_getters()
|
||
|
||
assert "get_ca_data" in getters
|
||
|
||
# Проверяем параметры геттера
|
||
getter = getters["get_ca_data"]
|
||
assert "modes" in getter["required_params"]
|
||
assert "tables" in getter["required_params"]
|
||
|
||
print("✅ svodka_ca парсер зарегистрирован корректно")
|
||
|
||
def test_monitoring_fuel_parser_registration(self):
|
||
"""Тест регистрации парсера monitoring_fuel"""
|
||
parser = MonitoringFuelParser()
|
||
getters = parser.get_available_getters()
|
||
|
||
assert "total_by_columns" in getters
|
||
assert "month_by_code" in getters
|
||
assert "series_by_id_and_columns" in getters
|
||
|
||
# Проверяем параметры геттеров
|
||
total_getter = getters["total_by_columns"]
|
||
assert "columns" in total_getter["required_params"]
|
||
|
||
month_getter = getters["month_by_code"]
|
||
assert "month" in month_getter["required_params"]
|
||
|
||
series_getter = getters["series_by_id_and_columns"]
|
||
assert "columns" in series_getter["required_params"]
|
||
|
||
print("✅ monitoring_fuel парсер зарегистрирован корректно")
|
||
|
||
def test_storage_operations(self):
|
||
"""Тест операций с локальным storage"""
|
||
import pandas as pd
|
||
|
||
# Создаем тестовый DataFrame
|
||
test_df = pd.DataFrame({
|
||
'col1': [1, 2, 3],
|
||
'col2': ['a', 'b', 'c']
|
||
})
|
||
|
||
# Сохраняем
|
||
success = self.storage.save_dataframe("test_object", test_df)
|
||
assert success is True
|
||
|
||
# Проверяем существование
|
||
exists = self.storage.object_exists("test_object")
|
||
assert exists is True
|
||
|
||
# Загружаем
|
||
loaded_df = self.storage.load_dataframe("test_object")
|
||
assert loaded_df is not None
|
||
assert loaded_df.shape == (3, 2)
|
||
assert list(loaded_df.columns) == ['col1', 'col2']
|
||
|
||
# Получаем метаданные
|
||
metadata = self.storage.get_object_metadata("test_object")
|
||
assert metadata is not None
|
||
assert metadata["shape"] == [3, 2]
|
||
|
||
# Получаем список объектов
|
||
objects = self.storage.list_objects()
|
||
assert "test_object" in objects
|
||
|
||
# Удаляем
|
||
delete_success = self.storage.delete_object("test_object")
|
||
assert delete_success is True
|
||
|
||
# Проверяем, что объект удален
|
||
exists_after = self.storage.object_exists("test_object")
|
||
assert exists_after is False
|
||
|
||
print("✅ Локальный storage работает корректно")
|
||
|
||
def test_report_service_with_local_storage(self):
|
||
"""Тест ReportService с локальным storage"""
|
||
# Проверяем, что ReportService может работать с локальным storage
|
||
assert self.report_service.storage is not None
|
||
assert hasattr(self.report_service.storage, 'save_dataframe')
|
||
assert hasattr(self.report_service.storage, 'load_dataframe')
|
||
|
||
print("✅ ReportService корректно работает с локальным storage") |