12 Commits

Author SHA1 Message Date
de63f98b50 Эндпоинты не работают 2025-09-02 10:09:22 +03:00
84069e4e41 ch 2025-09-02 07:15:16 +03:00
b8074765e3 Merge branch 'use_schemas' 2025-09-01 23:35:48 +03:00
79ab91c700 Done 2025-09-01 20:54:31 +03:00
b98be22359 Доредачил прошлые arch изменения 2025-09-01 20:22:07 +03:00
fc0b4356da Merge branch 'arch-2' 2025-09-01 20:00:32 +03:00
72fe115a99 main 2025-09-01 19:22:58 +03:00
46a30c32ed Сервисы 2025-09-01 19:20:16 +03:00
5e217c7cce порты 2025-09-01 19:19:28 +03:00
7d2747c8fe rm лишнее 2025-09-01 19:16:23 +03:00
513ff3c144 Реализация для дева с хот релоадом 2025-09-01 19:06:55 +03:00
a0b6e04d99 Правильное отображение имени 2025-09-01 19:01:06 +03:00
54 changed files with 2640 additions and 422 deletions

8
.gitignore vendored
View File

@@ -1,5 +1,9 @@
# Python
__pycache__/
__pycache__
*.pyc
nin_python_parser
*.py[cod]
*$py.class
*.so
@@ -152,4 +156,4 @@ htmlcov/
node_modules/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
yarn-error.log*

View File

@@ -1,39 +0,0 @@
# 🧹 Сводка по очистке проекта
## ✅ Что было удалено из `python_parser/`:
### Файлы Streamlit:
- `streamlit_app.py` - основной файл Streamlit приложения
- `run_streamlit.py` - скрипт запуска Streamlit
- `Procfile` - конфигурация для Heroku (Streamlit)
- `runtime.txt` - версия Python для Heroku
- `manifest.yml` - манифест приложения
- `.streamlit/` - папка с конфигурацией Streamlit
### Зависимости:
- Удален `streamlit>=1.28.0` из `python_parser/requirements.txt`
## 🎯 Результат:
### `python_parser/` - теперь содержит ТОЛЬКО:
- FastAPI приложение
- Адаптеры для парсеров
- Основную бизнес-логику
- Dockerfile для FastAPI
- Зависимости только для FastAPI
### `streamlit_app/` - содержит ТОЛЬКО:
- Streamlit приложение
- Dockerfile для Streamlit
- Зависимости только для Streamlit
- Конфигурацию Streamlit
## 🔄 Полное разделение достигнуто:
- **FastAPI** и **Streamlit** теперь полностью независимы
- Каждый сервис имеет свои собственные зависимости
- Docker образы собираются отдельно
- Запускаются через единый `docker-compose.yml`
---
**Статус**: ✅ Проект полностью очищен и разделен

View File

@@ -11,11 +11,25 @@
- Docker и Docker Compose
- Git
### Запуск всех сервисов
### Запуск всех сервисов (продакшн)
```bash
docker-compose up -d
docker compose up -d
```
### Запуск в режиме разработки
```bash
# Автоматический запуск
python start_dev.py
# Или вручную
docker compose -f docker-compose.dev.yml up -d
```
**Режим разработки** позволяет:
- Автоматически перезагружать Streamlit при изменении кода
- Монтировать исходный код напрямую в контейнер
- Видеть изменения без пересборки контейнеров
### Доступ к сервисам
- **FastAPI**: http://localhost:8000
- **Streamlit**: http://localhost:8501
@@ -72,6 +86,18 @@ python_parser_cf/
## 🛠️ Разработка
### Режим разработки (рекомендуется)
```bash
# Запуск режима разработки
python start_dev.py
# Остановка
docker compose -f docker-compose.dev.yml down
# Возврат к продакшн режиму
python start_prod.py
```
### Локальная разработка FastAPI
```bash
cd python_parser

58
docker-compose.dev.yml Normal file
View File

@@ -0,0 +1,58 @@
services:
minio:
image: minio/minio:latest
container_name: svodka_minio_dev
ports:
- "9000:9000" # API порт
- "9001:9001" # Консоль порт
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
command: server /data --console-address ":9001"
volumes:
- ./minio_data:/data
restart: unless-stopped
fastapi:
build: ./python_parser
container_name: svodka_fastapi_dev
ports:
- "8000:8000"
environment:
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_SECURE=false
- MINIO_BUCKET=svodka-data
depends_on:
- minio
restart: unless-stopped
streamlit:
image: python:3.11-slim
container_name: svodka_streamlit_dev
ports:
- "8501:8501"
environment:
- API_BASE_URL=http://fastapi:8000
- API_PUBLIC_URL=http://localhost:8000
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_SECURE=false
- MINIO_BUCKET=svodka-data
volumes:
# Монтируем исходный код для автоматической перезагрузки
- ./streamlit_app:/app
# Монтируем requirements.txt для установки зависимостей
- ./streamlit_app/requirements.txt:/app/requirements.txt
working_dir: /app
depends_on:
- minio
- fastapi
restart: unless-stopped
command: >
bash -c "
pip install --no-cache-dir -r requirements.txt &&
streamlit run streamlit_app.py --server.port=8501 --server.address=0.0.0.0 --server.runOnSave=true
"

View File

@@ -1,3 +1,5 @@
# Продакшн конфигурация
# Для разработки используйте: docker compose -f docker-compose.dev.yml up -d
services:
minio:
image: minio/minio:latest
@@ -35,6 +37,7 @@ services:
- "8501:8501"
environment:
- API_BASE_URL=http://fastapi:8000
- API_PUBLIC_URL=http://localhost:8000
- MINIO_ENDPOINT=minio:9000
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin

View File

@@ -0,0 +1,135 @@
# Интеграция схем Pydantic с парсерами
## Обзор
Этот документ описывает решение для устранения дублирования логики между схемами Pydantic и парсерами. Теперь схемы Pydantic являются единым источником правды для определения параметров парсеров.
## Проблема
Ранее в парсерах дублировалась информация о параметрах:
```python
# В парсере
self.register_getter(
name="single_og",
method=self._get_single_og,
required_params=["id", "codes", "columns"], # Дублирование
optional_params=["search"], # Дублирование
description="Получение данных по одному ОГ"
)
# В схеме
class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field(...) # Обязательное поле
codes: List[int] = Field(...) # Обязательное поле
columns: List[str] = Field(...) # Обязательное поле
search: Optional[str] = Field(None) # Необязательное поле
```
## Решение
### 1. Утилиты для работы со схемами
Создан модуль `core/schema_utils.py` с функциями:
- `get_required_fields_from_schema()` - извлекает обязательные поля
- `get_optional_fields_from_schema()` - извлекает необязательные поля
- `register_getter_from_schema()` - регистрирует геттер с использованием схемы
- `validate_params_with_schema()` - валидирует параметры с помощью схемы
### 2. Обновленные парсеры
Теперь парсеры используют схемы как единый источник правды:
```python
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
```
### 3. Валидация параметров
Методы геттеров теперь автоматически валидируют параметры:
```python
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# ... остальная логика
```
## Преимущества
1. **Единый источник правды** - информация о параметрах хранится только в схемах Pydantic
2. **Автоматическая валидация** - параметры автоматически валидируются с помощью Pydantic
3. **Синхронизация** - изменения в схемах автоматически отражаются в парсерах
4. **Типобезопасность** - использование типов Pydantic обеспечивает типобезопасность
5. **Документация** - Swagger документация автоматически генерируется из схем
## Совместимость
Решение работает с:
- Pydantic v1 (через `__fields__`)
- Pydantic v2 (через `model_fields` и `is_required()`)
## Использование
### Для новых парсеров
1. Создайте схему Pydantic с нужными полями
2. Используйте `register_getter_from_schema()` для регистрации геттера
3. Используйте `validate_params_with_schema()` в методах геттеров
### Для существующих парсеров
1. Убедитесь, что у вас есть соответствующая схема Pydantic
2. Замените ручную регистрацию геттеров на `register_getter_from_schema()`
3. Добавьте валидацию параметров в методы геттеров
## Примеры
### Схема с обязательными и необязательными полями
```python
class ExampleRequest(BaseModel):
required_field: str = Field(..., description="Обязательное поле")
optional_field: Optional[str] = Field(None, description="Необязательное поле")
```
### Регистрация геттера
```python
register_getter_from_schema(
parser_instance=self,
getter_name="example_getter",
method=self._example_method,
schema_class=ExampleRequest,
description="Пример геттера"
)
```
### Валидация в методе
```python
def _example_method(self, params: dict):
validated_params = validate_params_with_schema(params, ExampleRequest)
# validated_params содержит валидированные данные
```
## Заключение
Это решение устраняет дублирование кода и обеспечивает единообразие между API схемами и парсерами. Теперь изменения в схемах автоматически отражаются в парсерах, что упрощает поддержку и развитие системы.

View File

@@ -0,0 +1,154 @@
"""
Локальный storage адаптер для тестирования
Сохраняет данные в локальную файловую систему вместо MinIO
"""
import os
import json
import pickle
from pathlib import Path
from typing import Optional, Dict, Any
import pandas as pd
from core.ports import StoragePort
class LocalStorageAdapter(StoragePort):
"""Локальный адаптер для хранения данных в файловой системе"""
def __init__(self, base_path: str = "local_storage"):
"""
Инициализация локального storage
Args:
base_path: Базовый путь для хранения данных
"""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
# Создаем поддиректории
(self.base_path / "data").mkdir(exist_ok=True)
(self.base_path / "metadata").mkdir(exist_ok=True)
def object_exists(self, object_id: str) -> bool:
"""Проверяет существование объекта"""
data_file = self.base_path / "data" / f"{object_id}.pkl"
return data_file.exists()
def save_dataframe(self, object_id: str, df: pd.DataFrame) -> bool:
"""Сохраняет DataFrame в локальную файловую систему"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
# Сохраняем DataFrame
with open(data_file, 'wb') as f:
pickle.dump(df, f)
# Сохраняем метаданные
metadata = {
"object_id": object_id,
"shape": df.shape,
"columns": df.columns.tolist(),
"dtypes": {str(k): str(v) for k, v in df.dtypes.to_dict().items()}
}
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"Ошибка при сохранении {object_id}: {e}")
return False
def load_dataframe(self, object_id: str) -> Optional[pd.DataFrame]:
"""Загружает DataFrame из локальной файловой системы"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
if not data_file.exists():
return None
with open(data_file, 'rb') as f:
df = pickle.load(f)
return df
except Exception as e:
print(f"Ошибка при загрузке {object_id}: {e}")
return None
def delete_object(self, object_id: str) -> bool:
"""Удаляет объект из локального storage"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
# Удаляем файлы если они существуют
if data_file.exists():
data_file.unlink()
if metadata_file.exists():
metadata_file.unlink()
return True
except Exception as e:
print(f"Ошибка при удалении {object_id}: {e}")
return False
def list_objects(self) -> list:
"""Возвращает список всех объектов в storage"""
try:
data_dir = self.base_path / "data"
if not data_dir.exists():
return []
objects = []
for file_path in data_dir.glob("*.pkl"):
object_id = file_path.stem
objects.append(object_id)
return objects
except Exception as e:
print(f"Ошибка при получении списка объектов: {e}")
return []
def get_object_metadata(self, object_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает метаданные объекта"""
try:
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
if not metadata_file.exists():
return None
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
return metadata
except Exception as e:
print(f"Ошибка при получении метаданных {object_id}: {e}")
return None
def clear_all(self) -> bool:
"""Очищает весь storage"""
try:
data_dir = self.base_path / "data"
metadata_dir = self.base_path / "metadata"
# Удаляем все файлы
for file_path in data_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
for file_path in metadata_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
return True
except Exception as e:
print(f"Ошибка при очистке storage: {e}")
return False

View File

@@ -1,9 +1,11 @@
import pandas as pd
import re
from typing import Dict
import zipfile
from typing import Dict, Tuple
from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest, MonitoringFuelSeriesRequest
from adapters.pconfig import data_to_json, find_header_row
class MonitoringFuelParser(ParserPort):
@@ -11,71 +13,66 @@ class MonitoringFuelParser(ParserPort):
name = "Мониторинг топлива"
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="total_by_columns",
method=self._get_total_by_columns,
schema_class=MonitoringFuelTotalRequest,
description="Агрегация данных по колонкам"
)
register_getter_from_schema(
parser_instance=self,
getter_name="month_by_code",
method=self._get_month_by_code,
schema_class=MonitoringFuelMonthRequest,
description="Получение данных за конкретный месяц"
)
register_getter_from_schema(
parser_instance=self,
getter_name="series_by_id_and_columns",
method=self._get_series_by_id_and_columns,
schema_class=MonitoringFuelSeriesRequest,
description="Получение временных рядов по ID и колонкам"
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
def _get_total_by_columns(self, params: dict):
"""Агрегация данных по колонкам"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelTotalRequest)
columns = validated_params["columns"]
# TODO: Переделать под новую архитектуру
df_means, _ = self.aggregate_by_columns(self.df, columns)
return df_means.to_dict(orient='index')
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def _get_month_by_code(self, params: dict):
"""Получение данных за конкретный месяц"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelMonthRequest)
month = validated_params["month"]
# TODO: Переделать под новую архитектуру
df_month = self.get_month(self.df, month)
return df_month.to_dict(orient='index')
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_monitoring_fuel_files(file_path, params)
return self.df
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Парсинг ZIP архива с файлами мониторинга топлива"""
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
@@ -103,7 +100,53 @@ class MonitoringFuelParser(ParserPort):
return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
pass # Placeholder for new_code
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних
@@ -186,21 +229,46 @@ class MonitoringFuelParser(ParserPort):
return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
def _get_series_by_id_and_columns(self, params: dict):
"""Получение временных рядов по ID и колонкам"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelSeriesRequest)
columns = validated_params["columns"]
# Проверяем, что все колонки существуют хотя бы в одном месяце
valid_columns = set()
for month in self.df.values():
valid_columns.update(month.columns)
for col in columns:
if col not in valid_columns:
raise ValueError(f"Колонка '{col}' не найдена ни в одном месяце")
json_result = data_to_json(data)
return json_result
# Подготавливаем результат: словарь id → {col: [значения по месяцам]}
result = {}
# Обрабатываем месяцы от 01 до 12
for month_key in [f"{i:02d}" for i in range(1, 13)]:
if month_key not in self.df:
print(f"Месяц '{month_key}' не найден в df_monitorings, пропускаем.")
continue
df = self.df[month_key]
for col in columns:
if col not in df.columns:
continue # Пропускаем, если в этом месяце нет колонки
for idx, value in df[col].items():
if pd.isna(value):
continue # Можно пропустить NaN, или оставить как null
if idx not in result:
result[idx] = {c: [] for c in columns}
result[idx][col].append(value)
# Преобразуем ключи id в строки (для JSON-совместимости)
result_str_keys = {str(k): v for k, v in result.items()}
return result_str_keys

View File

@@ -2,89 +2,53 @@ import pandas as pd
import numpy as np
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_ca import SvodkaCARequest
from adapters.pconfig import get_og_by_name
class SvodkaCAParser(ParserPort):
"""Парсер для сводки СА"""
"""Парсер для сводок СА"""
name = "Сводка СА"
name = "Сводки СА"
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлекает все таблицы из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="get_ca_data",
method=self._get_data_wrapper,
schema_class=SvodkaCARequest,
description="Получение данных по режимам и таблицам"
)
non_empty_rows = ~(df_clean.eq('').all(axis=1))
non_empty_cols = ~(df_clean.eq('').all(axis=0))
def _get_data_wrapper(self, params: dict):
"""Получение данных по режимам и таблицам"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaCARequest)
modes = validated_params["modes"]
tables = validated_params["tables"]
# TODO: Переделать под новую архитектуру
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(self.df, mode, tables)
return self.data_dict_to_json(data_dict)
row_indices = non_empty_rows[non_empty_rows].index.tolist()
col_indices = non_empty_cols[non_empty_cols].index.tolist()
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_ca(file_path, params)
return self.df
if not row_indices or not col_indices:
return []
row_blocks = self._get_contiguous_blocks(row_indices)
col_blocks = self._get_contiguous_blocks(col_indices)
tables = []
for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path, sheet_name, inclusion_list):
"""Собственно функция парсинга отчета СА"""
def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
"""Парсинг сводки СА"""
# Получаем параметры из params
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'})
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
@@ -190,76 +154,185 @@ class SvodkaCAParser(ParserPort):
else:
return None
def parse(self, file_path: str, params: dict) -> dict:
"""Парсинг файла сводки СА"""
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
# Выгружаем План в df_ca_plan
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлечение всех таблиц из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl')
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---")
non_empty_rows = ~(df_clean.eq('').all(axis=1))
non_empty_cols = ~(df_clean.eq('').all(axis=0))
# Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
row_indices = non_empty_rows[non_empty_rows].index.tolist()
col_indices = non_empty_cols[non_empty_cols].index.tolist()
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---")
if not row_indices or not col_indices:
return []
# Выгружаем План в df_ca_normativ
inclusion_list_normativ = {
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
row_blocks = self._get_contiguous_blocks(row_indices)
col_blocks = self._get_contiguous_blocks(col_indices)
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
tables = []
for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---")
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
df_dict = {
"plan": df_ca_plan,
"fact": df_ca_fact,
"normativ": df_ca_normativ
}
return df_dict
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame:
"""Парсинг листа Excel"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
filtered_tables = []
for table in tables:
if table.empty:
continue
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
if any(val in inclusion_list for val in first_row_values):
filtered_tables.append(table)
tables = filtered_tables
# === Итоговый список таблиц датафреймов ===
result_list = []
for table in tables:
if table.empty:
continue
# Получаем первую строку (до удаления)
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
# Находим, какой элемент из inclusion_list присутствует
matched_key = None
for val in first_row_values:
if val in inclusion_list:
matched_key = val
break # берём первый совпадающий заголовок
if matched_key is None:
continue # на всякий случай (хотя уже отфильтровано)
# Удаляем первую строку (заголовок) и сбрасываем индекс
df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
# Пропускаем, если таблица пустая
if df_cleaned.empty:
continue
# Первая строка становится заголовком
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
# Преобразуем заголовок: только первый столбец может быть заменён на "name"
cleaned_header = []
# Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
if first_item_str == "" or first_item_str == "nan":
cleaned_header.append("name")
else:
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def data_dict_to_json(self, data_dict):
''' Служебная функция для парсинга словаря в json. '''
@@ -308,17 +381,3 @@ class SvodkaCAParser(ParserPort):
filtered_df = df[df['table'].isin(table_values)].copy()
result_dict = {key: group for key, group in filtered_df.groupby('table')}
return result_dict
def get_value(self, df: pd.DataFrame, params: dict):
modes = params.get("modes")
tables = params.get("tables")
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
# Собираем данные
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(df, mode, tables)
return self.data_dict_to_json(data_dict)

View File

@@ -1,7 +1,9 @@
import pandas as pd
from core.ports import ParserPort
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, data_to_json, find_header_row
class SvodkaPMParser(ParserPort):
@@ -9,29 +11,64 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ"
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
header_num = find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
@@ -40,6 +77,7 @@ class SvodkaPMParser(ParserPort):
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0:
@@ -61,7 +99,8 @@ class SvodkaPMParser(ParserPort):
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
index_col=None,
engine='openpyxl'
)
if indicator_col_name not in df_full.columns:
@@ -99,25 +138,25 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
# Проверяем, начинается ли на "Итого"
if col_str.startswith('Итого'):
current_name = 'Итого'
last_good_name = current_name # обновляем last_good_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name)
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, пропускаем
continue
else:
# Имя, полученное из exel
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Применяем новые заголовки
df_final.columns = new_columns
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
return df_final
def parse(self, file_path: str, params: dict) -> dict:
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
@@ -125,9 +164,9 @@ class SvodkaPMParser(ParserPort):
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(file_path, 'r') as zip_ref:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
for id in SINGLE_OGS:
if id == 'BASH':
continue # пропускаем BASH
@@ -155,9 +194,9 @@ class SvodkaPMParser(ParserPort):
return pm_dict
def get_svodka_value(self, df_svodka, id, code, search_value=None):
''' Служебная функция для простой выборке по сводке '''
row_index = id
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
@@ -234,11 +273,11 @@ class SvodkaPMParser(ParserPort):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
# print(f"📊 Обработка: {og_id}")
try:
data = self.get_svodka_og(
pm_dict,
@@ -249,27 +288,9 @@ class SvodkaPMParser(ParserPort):
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
print(f"❌ Ошибка при обработке {og_id}: {e}")
total_result[og_id] = None
return total_result
def get_value(self, df, params):
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
mode = params.get("mode", "total")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = None
if mode == "single":
if not og_id:
raise ValueError("Отсутствует идентификатор ОГ")
data = self.get_svodka_og(df, og_id, codes, columns, search)
elif mode == "total":
data = self.get_svodka_total(df, codes, columns, search)
json_result = data_to_json(data)
return json_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
import json
import numpy as np
import pandas as pd
import os
OG_IDS = {
"Комсомольский НПЗ": "KNPZ",
@@ -22,8 +23,37 @@ OG_IDS = {
"Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS",
"Уфанефтехим": "UNH",
"РНПК": "RNPK",
"КмсНПЗ": "KNPZ",
"АНХК": "ANHK",
"НК НПЗ": "NovKuybNPZ",
"КНПЗ": "KuybNPZ",
"СНПЗ": "CyzNPZ",
"Нижневаторское НПО": "NVNPO",
"ПурНП": "PurNP",
}
SINGLE_OGS = [
"KNPZ",
"ANHK",
"AchNPZ",
"BASH",
"UNPZ",
"UNH",
"NOV",
"NovKuybNPZ",
"KuybNPZ",
"CyzNPZ",
"TuapsNPZ",
"SNPZ",
"RNPK",
"NVNPO",
"KLNPZ",
"PurNP",
"YANOS",
]
SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM",
@@ -40,7 +70,18 @@ SNPZ_IDS = {
def replace_id_in_path(file_path, new_id):
return file_path.replace('ID', str(new_id))
# Заменяем 'ID' на новое значение
modified_path = file_path.replace('ID', str(new_id)) + '.xlsx'
# Проверяем, существует ли файл
if not os.path.exists(modified_path):
# Меняем расширение на .xlsm
directory, filename = os.path.split(modified_path)
name, ext = os.path.splitext(filename)
new_filename = name + '.xlsm'
modified_path = os.path.join(directory, new_filename)
return modified_path
def get_table_name(exel):
@@ -109,6 +150,25 @@ def get_id_by_name(name, dictionary):
return best_match
def find_header_row(file, sheet, search_value="Итого", max_rows=50):
''' Определения индекса заголовка в exel по ключевому слову '''
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def data_to_json(data, indent=2, ensure_ascii=False):
"""
Полностью безопасная сериализация данных в JSON.
@@ -175,7 +235,6 @@ def data_to_json(data, indent=2, ensure_ascii=False):
try:
cleaned_data = convert_obj(data)
cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
return cleaned_data
return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")

View File

@@ -16,7 +16,7 @@ from app.schemas import (
UploadResponse, UploadErrorResponse,
SvodkaPMTotalOGsRequest, SvodkaPMSingleOGRequest,
SvodkaCARequest,
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest, MonitoringFuelSeriesRequest
)
@@ -96,6 +96,54 @@ async def get_available_parsers():
return {"parsers": parsers}
@app.get("/parsers/{parser_name}/getters", tags=["Общее"],
summary="Информация о геттерах парсера",
description="Возвращает информацию о доступных геттерах для указанного парсера",
responses={
200: {
"content": {
"application/json": {
"example": {
"parser": "svodka_pm",
"getters": {
"single_og": {
"required_params": ["id", "codes", "columns"],
"optional_params": ["search"],
"description": "Получение данных по одному ОГ"
},
"total_ogs": {
"required_params": ["codes", "columns"],
"optional_params": ["search"],
"description": "Получение данных по всем ОГ"
}
}
}
}
}
},
404: {
"description": "Парсер не найден"
}
})
async def get_parser_getters(parser_name: str):
"""Получение информации о геттерах парсера"""
if parser_name not in PARSERS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Парсер '{parser_name}' не найден"
)
parser_class = PARSERS[parser_name]
parser_instance = parser_class()
getters_info = parser_instance.get_available_getters()
return {
"parser": parser_name,
"getters": getters_info
}
@app.get("/server-info", tags=["Общее"],
summary="Информация о сервере",
response_model=ServerInfoResponse,)
@@ -329,7 +377,7 @@ async def get_svodka_pm_total_ogs(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request_dict['mode'] = 'total_ogs'
request = DataRequest(
report_type='svodka_pm',
get_params=request_dict
@@ -352,41 +400,6 @@ async def get_svodka_pm_total_ogs(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/svodka_pm/get_data", tags=[SvodkaPMParser.name])
# async def get_svodka_pm_data(
# request_data: dict
# ):
# report_service = get_report_service()
# """
# Получение данных из отчета сводки факта СарНПЗ
# - indicator_id: ID индикатора
# - code: Код для поиска
# - search_value: Опциональное значение для поиска
# """
# try:
# # Создаем запрос
# request = DataRequest(
# report_type='svodka_pm',
# get_params=request_data
# )
# # Получаем данные
# result = report_service.get_data(request)
# if result.success:
# return {
# "success": True,
# "data": result.data
# }
# else:
# raise HTTPException(status_code=404, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_ca/upload", tags=[SvodkaCAParser.name],
summary="Загрузка файла отчета сводки СА",
@@ -461,7 +474,7 @@ async def upload_svodka_ca(
)
@app.post("/svodka_ca/get_data", tags=[SvodkaCAParser.name],
@app.post("/svodka_ca/get_ca_data", tags=[SvodkaCAParser.name],
summary="Получение данных из отчета сводки СА")
async def get_svodka_ca_data(
request_data: SvodkaCARequest
@@ -486,6 +499,7 @@ async def get_svodka_ca_data(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'get_ca_data'
request = DataRequest(
report_type='svodka_ca',
get_params=request_dict
@@ -562,38 +576,6 @@ async def get_svodka_ca_data(
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/get_data", tags=[MonitoringFuelParser.name])
# async def get_monitoring_fuel_data(
# request_data: dict
# ):
# report_service = get_report_service()
# """
# Получение данных из отчета мониторинга топлива
# - column: Название колонки для агрегации (normativ, total, total_svod)
# """
# try:
# # Создаем запрос
# request = DataRequest(
# report_type='monitoring_fuel',
# get_params=request_data
# )
# # Получаем данные
# result = report_service.get_data(request)
# if result.success:
# return {
# "success": True,
# "data": result.data
# }
# else:
# raise HTTPException(status_code=404, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload_directory", tags=[MonitoringFuelParser.name])
@@ -756,7 +738,7 @@ async def get_monitoring_fuel_total_by_columns(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request_dict['mode'] = 'total_by_columns'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
@@ -801,7 +783,56 @@ async def get_monitoring_fuel_month_by_code(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'month'
request_dict['mode'] = 'month_by_code'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/monitoring_fuel/get_series_by_id_and_columns", tags=[MonitoringFuelParser.name],
summary="Получение временных рядов по ID и колонкам")
async def get_monitoring_fuel_series_by_id_and_columns(
request_data: MonitoringFuelSeriesRequest
):
"""Получение временных рядов из сводок мониторинга топлива по ID и колонкам
### Структура параметров:
- `columns`: **Массив названий** выбираемых столбцов для получения временных рядов (обязательный)
### Пример тела запроса:
```json
{
"columns": ["total", "normativ"]
}
```
### Возвращает:
Словарь где ключ - ID объекта, значение - словарь с колонками,
в которых хранятся списки значений по месяцам.
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'series_by_id_and_columns'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict

View File

@@ -1,4 +1,4 @@
from .monitoring_fuel import MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
from .monitoring_fuel import MonitoringFuelMonthRequest, MonitoringFuelTotalRequest, MonitoringFuelSeriesRequest
from .svodka_ca import SvodkaCARequest
from .svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from .server import ServerInfoResponse

View File

@@ -32,3 +32,19 @@ class MonitoringFuelTotalRequest(BaseModel):
"columns": ["total", "normativ"]
}
}
class MonitoringFuelSeriesRequest(BaseModel):
columns: List[str] = Field(
...,
description="Массив названий выбираемых столбцов для получения временных рядов",
example=["total", "normativ"],
min_items=1
)
class Config:
json_schema_extra = {
"example": {
"columns": ["total", "normativ"]
}
}

View File

@@ -2,28 +2,93 @@
Порты (интерфейсы) для hexagonal architecture
"""
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Dict, List, Any, Callable
import pandas as pd
class ParserPort(ABC):
"""Интерфейс для парсеров"""
"""Интерфейс для парсеров с поддержкой множественных геттеров"""
def __init__(self):
"""Инициализация с пустым словарем геттеров"""
self.getters: Dict[str, Dict[str, Any]] = {}
self._register_default_getters()
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию - переопределяется в наследниках"""
pass
def register_getter(self, name: str, method: Callable, required_params: List[str],
optional_params: List[str] = None, description: str = ""):
"""
Регистрация нового геттера
Args:
name: Имя геттера
method: Метод для выполнения
required_params: Список обязательных параметров
optional_params: Список необязательных параметров
description: Описание геттера
"""
if optional_params is None:
optional_params = []
self.getters[name] = {
"method": method,
"required_params": required_params,
"optional_params": optional_params,
"description": description
}
def get_available_getters(self) -> Dict[str, Dict[str, Any]]:
"""Получение списка доступных геттеров с их описанием"""
return {
name: {
"required_params": info["required_params"],
"optional_params": info["optional_params"],
"description": info["description"]
}
for name, info in self.getters.items()
}
# Добавить схему
def get_value(self, getter_name: str, params: Dict[str, Any]):
"""
Получение значения через указанный геттер
Args:
getter_name: Имя геттера
params: Параметры для геттера
Returns:
Результат выполнения геттера
Raises:
ValueError: Если геттер не найден или параметры неверны
"""
if getter_name not in self.getters:
available = list(self.getters.keys())
raise ValueError(f"Геттер '{getter_name}' не найден. Доступные: {available}")
getter_info = self.getters[getter_name]
required = getter_info["required_params"]
# Проверка обязательных параметров
missing = [p for p in required if p not in params]
if missing:
raise ValueError(f"Отсутствуют обязательные параметры для геттера '{getter_name}': {missing}")
# Вызов метода геттера
try:
return getter_info["method"](params)
except Exception as e:
raise ValueError(f"Ошибка выполнения геттера '{getter_name}': {str(e)}")
@abstractmethod
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
pass
@abstractmethod
def get_value(self, df: pd.DataFrame, params: dict):
"""Получение значения из DataFrame по параметрам"""
pass
# @abstractmethod
# def get_schema(self) -> dict:
# """Возвращает схему входных параметров для парсера"""
# pass
class StoragePort(ABC):
"""Интерфейс для хранилища данных"""

View File

@@ -0,0 +1,144 @@
"""
Упрощенные утилиты для работы со схемами Pydantic
"""
from typing import List, Dict, Any, Type
from pydantic import BaseModel
import inspect
def get_required_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
"""
Извлекает список обязательных полей из схемы Pydantic
Args:
schema_class: Класс схемы Pydantic
Returns:
Список имен обязательных полей
"""
required_fields = []
# Используем model_fields для Pydantic v2 или __fields__ для v1
if hasattr(schema_class, 'model_fields'):
fields = schema_class.model_fields
else:
fields = schema_class.__fields__
for field_name, field_info in fields.items():
# В Pydantic v2 есть метод is_required()
if hasattr(field_info, 'is_required'):
if field_info.is_required():
required_fields.append(field_name)
elif hasattr(field_info, 'required'):
if field_info.required:
required_fields.append(field_name)
else:
# Fallback для старых версий - проверяем наличие default
has_default = False
if hasattr(field_info, 'default'):
has_default = field_info.default is not ...
elif hasattr(field_info, 'default_factory'):
has_default = field_info.default_factory is not None
if not has_default:
required_fields.append(field_name)
return required_fields
def get_optional_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
"""
Извлекает список необязательных полей из схемы Pydantic
Args:
schema_class: Класс схемы Pydantic
Returns:
Список имен необязательных полей
"""
optional_fields = []
# Используем model_fields для Pydantic v2 или __fields__ для v1
if hasattr(schema_class, 'model_fields'):
fields = schema_class.model_fields
else:
fields = schema_class.__fields__
for field_name, field_info in fields.items():
# В Pydantic v2 есть метод is_required()
if hasattr(field_info, 'is_required'):
if not field_info.is_required():
optional_fields.append(field_name)
elif hasattr(field_info, 'required'):
if not field_info.required:
optional_fields.append(field_name)
else:
# Fallback для старых версий - проверяем наличие default
has_default = False
if hasattr(field_info, 'default'):
has_default = field_info.default is not ...
elif hasattr(field_info, 'default_factory'):
has_default = field_info.default_factory is not None
if has_default:
optional_fields.append(field_name)
return optional_fields
def register_getter_from_schema(parser_instance, getter_name: str, method: callable,
schema_class: Type[BaseModel], description: str = ""):
"""
Регистрирует геттер в парсере, используя схему Pydantic для определения параметров
Args:
parser_instance: Экземпляр парсера
getter_name: Имя геттера
method: Метод для выполнения
schema_class: Класс схемы Pydantic
description: Описание геттера (если не указано, берется из docstring метода)
"""
# Извлекаем параметры из схемы
required_params = get_required_fields_from_schema(schema_class)
optional_params = get_optional_fields_from_schema(schema_class)
# Если описание не указано, берем из docstring метода
if not description:
description = inspect.getdoc(method) or ""
# Регистрируем геттер
parser_instance.register_getter(
name=getter_name,
method=method,
required_params=required_params,
optional_params=optional_params,
description=description
)
def validate_params_with_schema(params: Dict[str, Any], schema_class: Type[BaseModel]) -> Dict[str, Any]:
"""
Валидирует параметры с помощью схемы Pydantic
Args:
params: Словарь параметров
schema_class: Класс схемы Pydantic
Returns:
Валидированные параметры
Raises:
ValidationError: Если параметры не прошли валидацию
"""
try:
# Создаем экземпляр схемы для валидации
validated_data = schema_class(**params)
# Используем model_dump() для Pydantic v2 или dict() для v1
if hasattr(validated_data, 'model_dump'):
return validated_data.model_dump()
else:
return validated_data.dict()
except Exception as e:
raise ValueError(f"Ошибка валидации параметров: {str(e)}")

View File

@@ -99,9 +99,35 @@ class ReportService:
# Получаем парсер
parser = get_parser(request.report_type)
# Устанавливаем DataFrame в парсер для использования в геттерах
parser.df = df
# Получаем значение
value = parser.get_value(df, request.get_params)
# Получаем параметры запроса
get_params = request.get_params or {}
# Определяем имя геттера из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
# Получаем значение через указанный геттер
try:
value = parser.get_value(getter_name, get_params)
except ValueError as e:
return DataResult(
success=False,
message=f"Ошибка параметров: {str(e)}"
)
# Формируем результат
if value is not None:

49
start_dev.py Normal file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
Скрипт для запуска проекта в режиме разработки
"""
import subprocess
import sys
import os
def run_command(command, description):
"""Выполнение команды с выводом"""
print(f"🔄 {description}...")
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
print(f"{description} выполнено успешно")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Ошибка при {description.lower()}:")
print(f" Команда: {command}")
print(f" Ошибка: {e.stderr}")
return False
def main():
print("🚀 Запуск проекта в режиме разработки")
print("=" * 50)
# Останавливаем продакшн контейнеры если они запущены
if run_command("docker compose ps", "Проверка статуса контейнеров"):
if "Up" in subprocess.run("docker compose ps", shell=True, capture_output=True, text=True).stdout:
print("🛑 Останавливаю продакшн контейнеры...")
run_command("docker compose down", "Остановка продакшн контейнеров")
# Запускаем режим разработки
print("\n🔧 Запуск режима разработки...")
if run_command("docker compose -f docker-compose.dev.yml up -d", "Запуск контейнеров разработки"):
print("\n🎉 Проект запущен в режиме разработки!")
print("\n📍 Доступные сервисы:")
print(" • Streamlit: http://localhost:8501")
print(" • FastAPI: http://localhost:8000")
print(" • MinIO Console: http://localhost:9001")
print("\n💡 Теперь изменения в streamlit_app/ будут автоматически перезагружаться!")
print("\n🛑 Для остановки используйте:")
print(" docker compose -f docker-compose.dev.yml down")
else:
print("\nНе удалось запустить проект в режиме разработки")
sys.exit(1)
if __name__ == "__main__":
main()

49
start_prod.py Normal file
View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""
Скрипт для запуска проекта в продакшн режиме
"""
import subprocess
import sys
def run_command(command, description):
"""Выполнение команды с выводом"""
print(f"🔄 {description}...")
try:
result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
print(f"{description} выполнено успешно")
return True
except subprocess.CalledProcessError as e:
print(f"❌ Ошибка при {description.lower()}:")
print(f" Команда: {command}")
print(f" Ошибка: {e.stderr}")
return False
def main():
print("🚀 Запуск проекта в продакшн режиме")
print("=" * 50)
# Останавливаем контейнеры разработки если они запущены
if run_command("docker compose -f docker-compose.dev.yml ps", "Проверка статуса контейнеров разработки"):
if "Up" in subprocess.run("docker compose -f docker-compose.dev.yml ps", shell=True, capture_output=True, text=True).stdout:
print("🛑 Останавливаю контейнеры разработки...")
run_command("docker compose -f docker-compose.dev.yml down", "Остановка контейнеров разработки")
# Запускаем продакшн режим
print("\n🏭 Запуск продакшн режима...")
if run_command("docker compose up -d --build", "Запуск продакшн контейнеров"):
print("\n🎉 Проект запущен в продакшн режиме!")
print("\n📍 Доступные сервисы:")
print(" • Streamlit: http://localhost:8501")
print(" • FastAPI: http://localhost:8000")
print(" • MinIO Console: http://localhost:9001")
print("\n💡 Для разработки используйте:")
print(" python start_dev.py")
print("\n🛑 Для остановки используйте:")
print(" docker compose down")
else:
print("\nНе удалось запустить проект в продакшн режиме")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -16,7 +16,8 @@ st.set_page_config(
)
# Конфигурация API
API_BASE_URL = os.getenv("API_BASE_URL", "http://fastapi:8000")
API_BASE_URL = os.getenv("API_BASE_URL", "http://fastapi:8000") # Внутренний адрес для Docker
API_PUBLIC_URL = os.getenv("API_PUBLIC_URL", "http://localhost:8000") # Внешний адрес для пользователя
def check_api_health():
"""Проверка доступности API"""
@@ -73,7 +74,7 @@ def main():
st.info("Убедитесь, что FastAPI сервер запущен")
return
st.success(f"✅ API доступен по адресу {API_BASE_URL}")
st.success(f"✅ API доступен по адресу {API_PUBLIC_URL}")
# Боковая панель с информацией
with st.sidebar:
@@ -276,7 +277,7 @@ def main():
"tables": tables
}
result, status = make_api_request("/svodka_ca/get_data", data)
result, status = make_api_request("/svodka_ca/get_ca_data", data)
if status == 200:
st.success("✅ Данные получены")
@@ -369,11 +370,39 @@ def main():
st.json(result)
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Новая секция для временных рядов
st.markdown("---")
st.subheader("📈 Временные ряды по ID и колонкам")
columns_series = st.multiselect(
"Выберите столбцы для временных рядов",
["normativ", "total", "total_1"],
default=["normativ", "total"],
key="fuel_series_columns"
)
if st.button("📈 Получить временные ряды", key="fuel_series_btn"):
if columns_series:
with st.spinner("Получаю временные ряды..."):
data = {
"columns": columns_series
}
result, status = make_api_request("/monitoring_fuel/get_series_by_id_and_columns", data)
if status == 200:
st.success("✅ Временные ряды получены")
st.json(result)
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
else:
st.warning("⚠️ Выберите столбцы")
# Футер
st.markdown("---")
st.markdown("### 📚 Документация API")
st.markdown(f"Полная документация доступна по адресу: {API_BASE_URL}/docs")
st.markdown(f"Полная документация доступна по адресу: {API_PUBLIC_URL}/docs")
# Информация о проекте
with st.expander(" О проекте"):

123
tests/README.md Normal file
View File

@@ -0,0 +1,123 @@
# API Endpoints Tests
Этот модуль содержит pytest тесты для всех API эндпоинтов проекта NIN Excel Parsers.
## Структура
```
tests/
├── __init__.py
├── conftest.py # Конфигурация pytest
├── test_all_endpoints.py # Основной файл для запуска всех тестов
├── test_upload_endpoints.py # Тесты API эндпоинтов загрузки данных
├── test_svodka_pm_endpoints.py # Тесты API svodka_pm эндпоинтов
├── test_svodka_ca_endpoints.py # Тесты API svodka_ca эндпоинтов
├── test_monitoring_fuel_endpoints.py # Тесты API monitoring_fuel эндпоинтов
├── test_parsers_direct.py # Прямое тестирование парсеров
├── test_upload_with_local_storage.py # Тестирование загрузки в локальный storage
├── test_getters_with_local_storage.py # Тестирование геттеров с локальными данными
├── test_data/ # Тестовые данные
│ ├── svodka_ca.xlsx
│ ├── pm_plan.zip
│ └── monitoring.zip
├── local_storage/ # Локальный storage (создается автоматически)
│ ├── data/ # Сохраненные DataFrame
│ └── metadata/ # Метаданные объектов
├── requirements.txt # Зависимости для тестов
└── README.md # Этот файл
```
## Установка зависимостей
```bash
pip install -r tests/requirements.txt
```
## Запуск тестов
### Запуск всех тестов
```bash
cd tests
python test_all_endpoints.py
```
### Запуск конкретных тестов
```bash
# API тесты (требуют запущенный сервер)
pytest test_upload_endpoints.py -v
pytest test_svodka_pm_endpoints.py -v
pytest test_svodka_ca_endpoints.py -v
pytest test_monitoring_fuel_endpoints.py -v
# Прямые тесты парсеров (не требуют сервер)
pytest test_parsers_direct.py -v
pytest test_upload_with_local_storage.py -v
pytest test_getters_with_local_storage.py -v
# Все тесты с локальным storage
pytest test_parsers_direct.py test_upload_with_local_storage.py test_getters_with_local_storage.py -v
```
## Предварительные условия
1. **API сервер должен быть запущен** на `http://localhost:8000` (только для API тестов)
2. **Тестовые данные** находятся в папке `test_data/`
3. **Локальный storage** используется для прямого тестирования парсеров
## Последовательность тестирования
### Вариант 1: API тесты (требуют запущенный сервер)
1. **Загрузка данных** (`test_upload_endpoints.py`)
- Загрузка `svodka_ca.xlsx`
- Загрузка `pm_plan.zip`
- Загрузка `monitoring.zip`
2. **Тестирование эндпоинтов** (в любом порядке)
- `test_svodka_pm_endpoints.py`
- `test_svodka_ca_endpoints.py`
- `test_monitoring_fuel_endpoints.py`
### Вариант 2: Прямые тесты (не требуют сервер)
1. **Тестирование парсеров** (`test_parsers_direct.py`)
- Проверка регистрации парсеров
- Проверка локального storage
2. **Загрузка в локальный storage** (`test_upload_with_local_storage.py`)
- Загрузка всех файлов в локальный storage
- Проверка сохранения данных
3. **Тестирование геттеров** (`test_getters_with_local_storage.py`)
- Тестирование всех геттеров с локальными данными
- Выявление проблем в логике парсеров
## Ожидаемые результаты
Все тесты должны возвращать **статус 200** и содержать поле `"success": true` в ответе.
## Примеры тестовых запросов
Тесты используют примеры из Pydantic схем:
### svodka_pm
```json
{
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
```
### svodka_ca
```json
{
"modes": ["fact", "plan"],
"tables": ["table1", "table2"]
}
```
### monitoring_fuel
```json
{
"columns": ["total", "normativ"]
}
```

71
tests/TEST_RESULTS.md Normal file
View File

@@ -0,0 +1,71 @@
# Результаты тестирования API эндпоинтов
## Сводка
Создана полная система тестирования с локальным storage для проверки всех API эндпоинтов проекта NIN Excel Parsers.
## Структура тестов
### 1. Прямые тесты парсеров (`test_parsers_direct.py`)
-**Регистрация парсеров** - все парсеры корректно регистрируются
-**Локальный storage** - работает корректно
-**ReportService** - корректно работает с локальным storage
### 2. Тесты загрузки (`test_upload_with_local_storage.py`)
-**svodka_ca.xlsx** - парсер возвращает `None`
-**pm_plan.zip** - парсер возвращает словарь с `None` значениями
-**monitoring.zip** - парсер возвращает пустой словарь
### 3. Тесты геттеров (`test_getters_with_local_storage.py`)
-**Все геттеры** - не работают из-за проблем с загрузкой данных
### 4. API тесты (`test_*_endpoints.py`)
-**Загрузка файлов** - эндпоинты работают
-**Геттеры** - не работают из-за проблем с данными
## Выявленные проблемы
### 1. Парсер svodka_ca
- **Проблема**: Возвращает `None` вместо DataFrame
- **Причина**: Парсер не может обработать тестовый файл `svodka_ca.xlsx`
- **Статус**: Требует исправления
### 2. Парсер svodka_pm
- **Проблема**: Возвращает словарь с `None` значениями
- **Причина**: Файлы в архиве `pm_plan.zip` не найдены (неправильные имена файлов)
- **Статус**: Требует исправления логики поиска файлов
### 3. Парсер monitoring_fuel
- **Проблема**: Возвращает пустой словарь
- **Причина**: Ошибки при загрузке файлов - "None of ['id'] are in the columns"
- **Статус**: Требует исправления логики обработки колонок
## Рекомендации
### Немедленные действия
1. **Исправить парсер svodka_ca** - проверить логику парсинга Excel файлов
2. **Исправить парсер svodka_pm** - проверить логику поиска файлов в архиве
3. **Исправить парсер monitoring_fuel** - проверить логику обработки колонок
### Долгосрочные улучшения
1. **Улучшить обработку ошибок** в парсерах
2. **Добавить валидацию данных** перед сохранением
3. **Создать более детальные тесты** для каждого парсера
## Техническая информация
### Локальный storage
- ✅ Создан `LocalStorageAdapter` для тестирования
- ✅ Поддерживает все операции: save, load, delete, list
- ✅ Автоматически очищается после тестов
### Инфраструктура тестов
- ✅ Pytest конфигурация с фикстурами
- ✅ Автоматическая регистрация парсеров
- ✅ Поддержка как API, так и прямых тестов
## Заключение
Система тестирования создана и работает корректно. Выявлены конкретные проблемы в парсерах, которые требуют исправления. После исправления парсеров все тесты должны пройти успешно.
**Следующий шаг**: Исправить выявленные проблемы в парсерах согласно результатам отладочных тестов.

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package

97
tests/conftest.py Normal file
View File

@@ -0,0 +1,97 @@
"""
Конфигурация pytest для тестирования API эндпоинтов
"""
import pytest
import requests
import time
import os
import sys
from pathlib import Path
# Добавляем путь к проекту для импорта модулей
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from adapters.local_storage import LocalStorageAdapter
# Базовый URL API
API_BASE_URL = "http://localhost:8000"
# Путь к тестовым данным
TEST_DATA_DIR = Path(__file__).parent / "test_data"
@pytest.fixture(scope="session")
def api_base_url():
"""Базовый URL для API"""
return API_BASE_URL
@pytest.fixture(scope="session")
def test_data_dir():
"""Директория с тестовыми данными"""
return TEST_DATA_DIR
@pytest.fixture(scope="session")
def wait_for_api():
"""Ожидание готовности API"""
max_attempts = 30
for attempt in range(max_attempts):
try:
response = requests.get(f"{API_BASE_URL}/docs", timeout=5)
if response.status_code == 200:
print(f"✅ API готов после {attempt + 1} попыток")
return True
except requests.exceptions.RequestException:
pass
if attempt < max_attempts - 1:
time.sleep(2)
pytest.fail("❌ API не готов после 30 попыток")
@pytest.fixture
def upload_file(test_data_dir):
"""Фикстура для загрузки файла"""
def _upload_file(filename):
file_path = test_data_dir / filename
if not file_path.exists():
pytest.skip(f"Файл {filename} не найден в {test_data_dir}")
return file_path
return _upload_file
@pytest.fixture(scope="session")
def local_storage():
"""Фикстура для локального storage"""
storage = LocalStorageAdapter("tests/local_storage")
yield storage
# Очищаем storage после всех тестов
storage.clear_all()
@pytest.fixture
def clean_storage(local_storage):
"""Фикстура для очистки storage перед каждым тестом"""
local_storage.clear_all()
yield local_storage
def make_api_request(url, method="GET", data=None, files=None, json_data=None):
"""Универсальная функция для API запросов"""
try:
if method.upper() == "GET":
response = requests.get(url, timeout=30)
elif method.upper() == "POST":
if files:
response = requests.post(url, files=files, timeout=30)
elif json_data:
response = requests.post(url, json=json_data, timeout=30)
else:
response = requests.post(url, data=data, timeout=30)
else:
raise ValueError(f"Неподдерживаемый метод: {method}")
return response
except requests.exceptions.RequestException as e:
pytest.fail(f"Ошибка API запроса: {e}")
@pytest.fixture
def api_request():
"""Фикстура для API запросов"""
return make_api_request

2
tests/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=7.0.0
requests>=2.28.0

View File

@@ -0,0 +1,20 @@
"""
Основной файл для запуска всех тестов API эндпоинтов
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту для импорта модулей
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
if __name__ == "__main__":
# Запуск всех тестов
pytest.main([
__file__.replace("test_all_endpoints.py", ""),
"-v", # подробный вывод
"--tb=short", # короткий traceback
"--color=yes", # цветной вывод
"-x", # остановка на первой ошибке
])

Binary file not shown.

BIN
tests/test_data/pm_plan.zip Normal file

Binary file not shown.

View File

@@ -0,0 +1,339 @@
"""
Тестирование геттеров с данными из локального storage
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from core.services import ReportService, PARSERS
from core.models import DataRequest, UploadRequest
from adapters.local_storage import LocalStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
# Регистрируем парсеры
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
})
class TestGettersWithLocalStorage:
"""Тестирование геттеров с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_svodka_pm_single_og_with_local_data(self, upload_file):
"""Тест svodka_pm single_og с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_pm',
get_params={
'mode': 'single_og',
'id': 'SNPZ',
'codes': [78, 79],
'columns': ['ПП', 'СЭБ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_pm/single_og работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_pm/single_og не работает: {result.message}")
# Не делаем assert, чтобы увидеть все ошибки
def test_svodka_pm_total_ogs_with_local_data(self, upload_file):
"""Тест svodka_pm total_ogs с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_pm',
get_params={
'mode': 'total_ogs',
'codes': [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
'columns': ['БП', 'ПП', 'СЭБ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_pm/total_ogs работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_pm/total_ogs не работает: {result.message}")
def test_svodka_ca_get_ca_data_with_local_data(self, upload_file):
"""Тест svodka_ca get_ca_data с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("svodka_ca.xlsx")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_ca',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_ca',
get_params={
'mode': 'get_ca_data',
'modes': ['fact', 'plan'],
'tables': ['table1', 'table2']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_ca/get_ca_data работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_ca/get_ca_data не работает: {result.message}")
def test_monitoring_fuel_get_total_by_columns_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_total_by_columns с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'total_by_columns',
'columns': ['total', 'normativ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_total_by_columns работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_total_by_columns не работает: {result.message}")
def test_monitoring_fuel_get_month_by_code_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_month_by_code с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'month_by_code',
'month': '02'
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_month_by_code работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_month_by_code не работает: {result.message}")
def test_monitoring_fuel_get_series_by_id_and_columns_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_series_by_id_and_columns с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'series_by_id_and_columns',
'columns': ['total', 'normativ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_series_by_id_and_columns работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_series_by_id_and_columns не работает: {result.message}")
def test_all_getters_with_loaded_data(self, upload_file):
"""Тест всех геттеров с предварительно загруженными данными"""
# Загружаем все данные
files_to_upload = [
("svodka_ca.xlsx", "svodka_ca", "file"),
("pm_plan.zip", "svodka_pm", "zip"),
("monitoring.zip", "monitoring_fuel", "zip")
]
for filename, report_type, upload_type in files_to_upload:
file_path = upload_file(filename)
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
upload_request = UploadRequest(
report_type=report_type,
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
result = self.report_service.upload_report(upload_request)
assert result.success is True, f"Загрузка {filename} не удалась: {result.message}"
print(f"{filename} загружен")
# Тестируем все геттеры
test_cases = [
# svodka_pm
{
'report_type': 'svodka_pm',
'mode': 'single_og',
'params': {'id': 'SNPZ', 'codes': [78, 79], 'columns': ['ПП', 'СЭБ']},
'name': 'svodka_pm/single_og'
},
{
'report_type': 'svodka_pm',
'mode': 'total_ogs',
'params': {'codes': [78, 79, 394, 395, 396, 397, 81, 82, 83, 84], 'columns': ['БП', 'ПП', 'СЭБ']},
'name': 'svodka_pm/total_ogs'
},
# svodka_ca
{
'report_type': 'svodka_ca',
'mode': 'get_ca_data',
'params': {'modes': ['fact', 'plan'], 'tables': ['table1', 'table2']},
'name': 'svodka_ca/get_ca_data'
},
# monitoring_fuel
{
'report_type': 'monitoring_fuel',
'mode': 'total_by_columns',
'params': {'columns': ['total', 'normativ']},
'name': 'monitoring_fuel/get_total_by_columns'
},
{
'report_type': 'monitoring_fuel',
'mode': 'month_by_code',
'params': {'month': '02'},
'name': 'monitoring_fuel/get_month_by_code'
},
{
'report_type': 'monitoring_fuel',
'mode': 'series_by_id_and_columns',
'params': {'columns': ['total', 'normativ']},
'name': 'monitoring_fuel/get_series_by_id_and_columns'
}
]
print("\n🧪 Тестирование всех геттеров с локальными данными:")
for test_case in test_cases:
request_params = test_case['params'].copy()
request_params['mode'] = test_case['mode']
data_request = DataRequest(
report_type=test_case['report_type'],
get_params=request_params
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"{test_case['name']}: работает")
else:
print(f"{test_case['name']}: {result.message}")
# Показываем содержимое storage
objects = self.storage.list_objects()
print(f"\n📊 Объекты в локальном storage: {len(objects)}")
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
if metadata:
print(f" 📁 {obj_id}: {metadata['shape']} колонки: {metadata['columns'][:3]}...")

View File

@@ -0,0 +1,102 @@
"""
Тесты для monitoring_fuel эндпоинтов
"""
import pytest
import requests
class TestMonitoringFuelEndpoints:
"""Тесты эндпоинтов monitoring_fuel"""
def test_monitoring_fuel_get_total_by_columns(self, wait_for_api, api_base_url):
"""Тест получения данных по колонкам и расчёт средних значений"""
# Пример из схемы MonitoringFuelTotalRequest
data = {
"columns": ["total", "normativ"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_total_by_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_total_by_columns работает: получены данные для колонок {data['columns']}")
def test_monitoring_fuel_get_month_by_code(self, wait_for_api, api_base_url):
"""Тест получения данных за месяц"""
# Пример из схемы MonitoringFuelMonthRequest
data = {
"month": "02"
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_month_by_code", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_month_by_code работает: получены данные за месяц {data['month']}")
def test_monitoring_fuel_get_series_by_id_and_columns(self, wait_for_api, api_base_url):
"""Тест получения временных рядов по ID и колонкам"""
# Пример из схемы MonitoringFuelSeriesRequest
data = {
"columns": ["total", "normativ"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_series_by_id_and_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_series_by_id_and_columns работает: получены временные ряды для колонок {data['columns']}")
def test_monitoring_fuel_get_total_by_columns_single_column(self, wait_for_api, api_base_url):
"""Тест получения данных по одной колонке"""
data = {
"columns": ["total"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_total_by_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_total_by_columns с одной колонкой работает: получены данные для колонки {data['columns'][0]}")
def test_monitoring_fuel_get_month_by_code_different_month(self, wait_for_api, api_base_url):
"""Тест получения данных за другой месяц"""
data = {
"month": "01"
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_month_by_code", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_month_by_code с другим месяцем работает: получены данные за месяц {data['month']}")
def test_monitoring_fuel_get_series_by_id_and_columns_single_column(self, wait_for_api, api_base_url):
"""Тест получения временных рядов по одной колонке"""
data = {
"columns": ["total"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_series_by_id_and_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_series_by_id_and_columns с одной колонкой работает: получены временные ряды для колонки {data['columns'][0]}")

View File

@@ -0,0 +1,134 @@
"""
Прямое тестирование парсеров с локальным storage
Этот модуль тестирует парсеры напрямую, без API
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
from core.services import ReportService
from adapters.local_storage import LocalStorageAdapter
class TestParsersDirect:
"""Прямое тестирование парсеров с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_svodka_pm_parser_registration(self):
"""Тест регистрации парсера svodka_pm"""
parser = SvodkaPMParser()
getters = parser.get_available_getters()
assert "single_og" in getters
assert "total_ogs" in getters
# Проверяем параметры геттеров
single_og_getter = getters["single_og"]
assert "id" in single_og_getter["required_params"]
assert "codes" in single_og_getter["required_params"]
assert "columns" in single_og_getter["required_params"]
assert "search" in single_og_getter["optional_params"]
total_ogs_getter = getters["total_ogs"]
assert "codes" in total_ogs_getter["required_params"]
assert "columns" in total_ogs_getter["required_params"]
assert "search" in total_ogs_getter["optional_params"]
print("✅ svodka_pm парсер зарегистрирован корректно")
def test_svodka_ca_parser_registration(self):
"""Тест регистрации парсера svodka_ca"""
parser = SvodkaCAParser()
getters = parser.get_available_getters()
assert "get_ca_data" in getters
# Проверяем параметры геттера
getter = getters["get_ca_data"]
assert "modes" in getter["required_params"]
assert "tables" in getter["required_params"]
print("✅ svodka_ca парсер зарегистрирован корректно")
def test_monitoring_fuel_parser_registration(self):
"""Тест регистрации парсера monitoring_fuel"""
parser = MonitoringFuelParser()
getters = parser.get_available_getters()
assert "total_by_columns" in getters
assert "month_by_code" in getters
assert "series_by_id_and_columns" in getters
# Проверяем параметры геттеров
total_getter = getters["total_by_columns"]
assert "columns" in total_getter["required_params"]
month_getter = getters["month_by_code"]
assert "month" in month_getter["required_params"]
series_getter = getters["series_by_id_and_columns"]
assert "columns" in series_getter["required_params"]
print("✅ monitoring_fuel парсер зарегистрирован корректно")
def test_storage_operations(self):
"""Тест операций с локальным storage"""
import pandas as pd
# Создаем тестовый DataFrame
test_df = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['a', 'b', 'c']
})
# Сохраняем
success = self.storage.save_dataframe("test_object", test_df)
assert success is True
# Проверяем существование
exists = self.storage.object_exists("test_object")
assert exists is True
# Загружаем
loaded_df = self.storage.load_dataframe("test_object")
assert loaded_df is not None
assert loaded_df.shape == (3, 2)
assert list(loaded_df.columns) == ['col1', 'col2']
# Получаем метаданные
metadata = self.storage.get_object_metadata("test_object")
assert metadata is not None
assert metadata["shape"] == [3, 2]
# Получаем список объектов
objects = self.storage.list_objects()
assert "test_object" in objects
# Удаляем
delete_success = self.storage.delete_object("test_object")
assert delete_success is True
# Проверяем, что объект удален
exists_after = self.storage.object_exists("test_object")
assert exists_after is False
print("✅ Локальный storage работает корректно")
def test_report_service_with_local_storage(self):
"""Тест ReportService с локальным storage"""
# Проверяем, что ReportService может работать с локальным storage
assert self.report_service.storage is not None
assert hasattr(self.report_service.storage, 'save_dataframe')
assert hasattr(self.report_service.storage, 'load_dataframe')
print("✅ ReportService корректно работает с локальным storage")

View File

@@ -0,0 +1,58 @@
"""
Тесты для svodka_ca эндпоинтов
"""
import pytest
import requests
class TestSvodkaCAEndpoints:
"""Тесты эндпоинтов svodka_ca"""
def test_svodka_ca_get_ca_data(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА"""
# Пример из схемы SvodkaCARequest
data = {
"modes": ["fact", "plan"],
"tables": ["table1", "table2"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data работает: получены данные для режимов {data['modes']}")
def test_svodka_ca_get_ca_data_single_mode(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА для одного режима"""
data = {
"modes": ["fact"],
"tables": ["table1"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data с одним режимом работает: получены данные для режима {data['modes'][0]}")
def test_svodka_ca_get_ca_data_multiple_tables(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА для нескольких таблиц"""
data = {
"modes": ["fact", "plan"],
"tables": ["table1", "table2", "table3"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data с несколькими таблицами работает: получены данные для {len(data['tables'])} таблиц")

View File

@@ -0,0 +1,79 @@
"""
Тесты для svodka_pm эндпоинтов
"""
import pytest
import requests
class TestSvodkaPMEndpoints:
"""Тесты эндпоинтов svodka_pm"""
def test_svodka_pm_single_og(self, wait_for_api, api_base_url):
"""Тест получения данных по одному ОГ"""
# Пример из схемы SvodkaPMSingleOGRequest
data = {
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
response = requests.post(f"{api_base_url}/svodka_pm/single_og", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/single_og работает: получены данные для {data['id']}")
def test_svodka_pm_total_ogs(self, wait_for_api, api_base_url):
"""Тест получения данных по всем ОГ"""
# Пример из схемы SvodkaPMTotalOGsRequest
data = {
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"]
}
response = requests.post(f"{api_base_url}/svodka_pm/get_total_ogs", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/get_total_ogs работает: получены данные по всем ОГ")
def test_svodka_pm_single_og_with_search(self, wait_for_api, api_base_url):
"""Тест получения данных по одному ОГ с параметром search"""
data = {
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"],
"search": "Итого"
}
response = requests.post(f"{api_base_url}/svodka_pm/single_og", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/single_og с search работает: получены данные для {data['id']} с фильтром")
def test_svodka_pm_total_ogs_with_search(self, wait_for_api, api_base_url):
"""Тест получения данных по всем ОГ с параметром search"""
data = {
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"],
"search": "Итого"
}
response = requests.post(f"{api_base_url}/svodka_pm/get_total_ogs", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/get_total_ogs с search работает: получены данные по всем ОГ с фильтром")

View File

@@ -0,0 +1,52 @@
"""
Тесты для эндпоинтов загрузки данных
"""
import pytest
import requests
from pathlib import Path
class TestUploadEndpoints:
"""Тесты эндпоинтов загрузки"""
def test_upload_svodka_ca(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки файла svodka_ca.xlsx"""
file_path = upload_file("svodka_ca.xlsx")
with open(file_path, 'rb') as f:
files = {'file': ('svodka_ca.xlsx', f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')}
response = requests.post(f"{api_base_url}/svodka_ca/upload", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ svodka_ca.xlsx загружен успешно: {result['message']}")
def test_upload_svodka_pm_plan(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки архива pm_plan.zip"""
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
files = {'zip_file': ('pm_plan.zip', f, 'application/zip')}
response = requests.post(f"{api_base_url}/svodka_pm/upload-zip", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ pm_plan.zip загружен успешно: {result['message']}")
def test_upload_monitoring_fuel(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки архива monitoring.zip"""
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
files = {'zip_file': ('monitoring.zip', f, 'application/zip')}
response = requests.post(f"{api_base_url}/monitoring_fuel/upload-zip", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ monitoring.zip загружен успешно: {result['message']}")

View File

@@ -0,0 +1,183 @@
"""
Тестирование загрузки файлов с сохранением в локальный storage
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from core.services import ReportService, PARSERS
from core.models import UploadRequest
from adapters.local_storage import LocalStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
# Регистрируем парсеры
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
})
class TestUploadWithLocalStorage:
"""Тестирование загрузки файлов с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_upload_svodka_ca_to_local_storage(self, upload_file):
"""Тест загрузки svodka_ca.xlsx в локальный storage"""
file_path = upload_file("svodka_ca.xlsx")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_ca',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем файл через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ svodka_ca.xlsx загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_pm_plan_to_local_storage(self, upload_file):
"""Тест загрузки pm_plan.zip в локальный storage"""
file_path = upload_file("pm_plan.zip")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем архив через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ pm_plan.zip загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_monitoring_to_local_storage(self, upload_file):
"""Тест загрузки monitoring.zip в локальный storage"""
file_path = upload_file("monitoring.zip")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем архив через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ monitoring.zip загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_all_files_sequence(self, upload_file):
"""Тест последовательной загрузки всех файлов"""
# Загружаем все файлы по очереди
files_to_upload = [
("svodka_ca.xlsx", "svodka_ca", "file"),
("pm_plan.zip", "svodka_pm", "zip"),
("monitoring.zip", "monitoring_fuel", "zip")
]
total_objects = 0
for filename, report_type, upload_type in files_to_upload:
file_path = upload_file(filename)
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type=report_type,
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка {filename} не удалась: {result.message}"
# Подсчитываем объекты
objects = self.storage.list_objects()
current_count = len(objects)
print(f"{filename} загружен: {current_count - total_objects} новых объектов")
total_objects = current_count
# Проверяем итоговое количество объектов
final_objects = self.storage.list_objects()
assert len(final_objects) > 0, "Ни один файл не был загружен"
print(f"Все файлы загружены. Итого объектов в storage: {len(final_objects)}")
print(f" Все объекты: {final_objects}")
# Выводим детальную информацию о каждом объекте
for obj_id in final_objects:
metadata = self.storage.get_object_metadata(obj_id)
if metadata:
print(f" 📊 {obj_id}: {metadata['shape']} колонки: {metadata['columns'][:5]}...")