2 Commits

Author SHA1 Message Date
de63f98b50 Эндпоинты не работают 2025-09-02 10:09:22 +03:00
84069e4e41 ch 2025-09-02 07:15:16 +03:00
51 changed files with 1888 additions and 1236 deletions

18
.gitignore vendored
View File

@@ -1,22 +1,8 @@
# Python # Python
__pycache__ __pycache__
__pycache__/ *.pyc
python_parser/__pycache__/
python_parser/core/__pycache__/
python_parser/adapters/__pycache__/
python_parser/tests/__pycache__/
python_parser/tests/test_core/__pycache__/
python_parser/tests/test_adapters/__pycache__/
python_parser/tests/test_app/__pycache__/
python_parser/app/__pycache__/
python_parser/app/schemas/__pycache__/
python_parser/app/schemas/test_schemas/__pycache__/
python_parser/app/schemas/test_schemas/test_core/__pycache__/
python_parser/app/schemas/test_schemas/test_adapters/__pycache__/
python_parser/app/schemas/test_schemas/test_app/__pycache__/
nin_python_parser nin_python_parser
*.pyc
*.py[cod] *.py[cod]
*$py.class *$py.class
@@ -171,5 +157,3 @@ node_modules/
npm-debug.log* npm-debug.log*
yarn-debug.log* yarn-debug.log*
yarn-error.log* yarn-error.log*
__pycache__/

View File

@@ -14,7 +14,7 @@ services:
restart: unless-stopped restart: unless-stopped
fastapi: fastapi:
image: python:3.11-slim build: ./python_parser
container_name: svodka_fastapi_dev container_name: svodka_fastapi_dev
ports: ports:
- "8000:8000" - "8000:8000"
@@ -24,20 +24,9 @@ services:
- MINIO_SECRET_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin
- MINIO_SECURE=false - MINIO_SECURE=false
- MINIO_BUCKET=svodka-data - MINIO_BUCKET=svodka-data
volumes:
# Монтируем исходный код для автоматической перезагрузки
- ./python_parser:/app
# Монтируем requirements.txt для установки зависимостей
- ./python_parser/requirements.txt:/app/requirements.txt
working_dir: /app
depends_on: depends_on:
- minio - minio
restart: unless-stopped restart: unless-stopped
command: >
bash -c "
pip install --no-cache-dir -r requirements.txt &&
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
"
streamlit: streamlit:
image: python:3.11-slim image: python:3.11-slim

View File

@@ -0,0 +1,154 @@
"""
Локальный storage адаптер для тестирования
Сохраняет данные в локальную файловую систему вместо MinIO
"""
import os
import json
import pickle
from pathlib import Path
from typing import Optional, Dict, Any
import pandas as pd
from core.ports import StoragePort
class LocalStorageAdapter(StoragePort):
"""Локальный адаптер для хранения данных в файловой системе"""
def __init__(self, base_path: str = "local_storage"):
"""
Инициализация локального storage
Args:
base_path: Базовый путь для хранения данных
"""
self.base_path = Path(base_path)
self.base_path.mkdir(parents=True, exist_ok=True)
# Создаем поддиректории
(self.base_path / "data").mkdir(exist_ok=True)
(self.base_path / "metadata").mkdir(exist_ok=True)
def object_exists(self, object_id: str) -> bool:
"""Проверяет существование объекта"""
data_file = self.base_path / "data" / f"{object_id}.pkl"
return data_file.exists()
def save_dataframe(self, object_id: str, df: pd.DataFrame) -> bool:
"""Сохраняет DataFrame в локальную файловую систему"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
# Сохраняем DataFrame
with open(data_file, 'wb') as f:
pickle.dump(df, f)
# Сохраняем метаданные
metadata = {
"object_id": object_id,
"shape": df.shape,
"columns": df.columns.tolist(),
"dtypes": {str(k): str(v) for k, v in df.dtypes.to_dict().items()}
}
with open(metadata_file, 'w', encoding='utf-8') as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
print(f"Ошибка при сохранении {object_id}: {e}")
return False
def load_dataframe(self, object_id: str) -> Optional[pd.DataFrame]:
"""Загружает DataFrame из локальной файловой системы"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
if not data_file.exists():
return None
with open(data_file, 'rb') as f:
df = pickle.load(f)
return df
except Exception as e:
print(f"Ошибка при загрузке {object_id}: {e}")
return None
def delete_object(self, object_id: str) -> bool:
"""Удаляет объект из локального storage"""
try:
data_file = self.base_path / "data" / f"{object_id}.pkl"
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
# Удаляем файлы если они существуют
if data_file.exists():
data_file.unlink()
if metadata_file.exists():
metadata_file.unlink()
return True
except Exception as e:
print(f"Ошибка при удалении {object_id}: {e}")
return False
def list_objects(self) -> list:
"""Возвращает список всех объектов в storage"""
try:
data_dir = self.base_path / "data"
if not data_dir.exists():
return []
objects = []
for file_path in data_dir.glob("*.pkl"):
object_id = file_path.stem
objects.append(object_id)
return objects
except Exception as e:
print(f"Ошибка при получении списка объектов: {e}")
return []
def get_object_metadata(self, object_id: str) -> Optional[Dict[str, Any]]:
"""Возвращает метаданные объекта"""
try:
metadata_file = self.base_path / "metadata" / f"{object_id}.json"
if not metadata_file.exists():
return None
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata = json.load(f)
return metadata
except Exception as e:
print(f"Ошибка при получении метаданных {object_id}: {e}")
return None
def clear_all(self) -> bool:
"""Очищает весь storage"""
try:
data_dir = self.base_path / "data"
metadata_dir = self.base_path / "metadata"
# Удаляем все файлы
for file_path in data_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
for file_path in metadata_dir.glob("*"):
if file_path.is_file():
file_path.unlink()
return True
except Exception as e:
print(f"Ошибка при очистке storage: {e}")
return False

View File

@@ -1,88 +0,0 @@
# Парсер Сводки ПМ
## Описание
Парсер для обработки сводок ПМ (план и факт) с поддержкой множественных геттеров. Наследуется от `ParserPort` и реализует архитектуру hexagonal architecture.
## Доступные геттеры
### 1. `get_single_og`
Получение данных по одному ОГ из сводки ПМ.
**Обязательные параметры:**
- `id` (str): ID ОГ (например, "SNPZ", "KNPZ")
- `codes` (list): Список кодов показателей (например, [78, 79, 81, 82])
- `columns` (list): Список столбцов для извлечения (например, ["ПП", "БП", "СЭБ"])
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"id": "SNPZ",
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_single_og", params)
```
### 2. `get_total_ogs`
Получение данных по всем ОГ из сводки ПМ.
**Обязательные параметры:**
- `codes` (list): Список кодов показателей
- `columns` (list): Список столбцов для извлечения
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_total_ogs", params)
```
## Поддерживаемые столбцы
- **ПП, БП**: Данные из файлов плана
- **ТБ, СЭБ, НЭБ**: Данные из файлов факта
## Структура файлов
Парсер ожидает следующую структуру файлов:
- `data/pm_fact/svodka_fact_pm_{OG_ID}.xlsx` или `.xlsm`
- `data/pm_plan/svodka_plan_pm_{OG_ID}.xlsx` или `.xlsm`
Где `{OG_ID}` - это ID ОГ (например, SNPZ, KNPZ и т.д.)
## Формат результата
Результат возвращается в формате JSON со следующей структурой:
```json
{
"ПП": {
"78": 123.45,
"79": 234.56
},
"БП": {
"78": 111.11,
"79": 222.22
},
"СЭБ": {
"78": 333.33,
"79": 444.44
}
}
```
## Обработка ошибок
- Если файл плана/факта не найден, соответствующие столбцы будут пустыми
- Если код показателя не найден, возвращается 0
- Валидация параметров выполняется автоматически

View File

@@ -4,8 +4,8 @@ import zipfile
from typing import Dict, Tuple from typing import Dict, Tuple
from core.ports import ParserPort from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest, MonitoringFuelSeriesRequest
from adapters.pconfig import data_to_json from adapters.pconfig import data_to_json, find_header_row
class MonitoringFuelParser(ParserPort): class MonitoringFuelParser(ParserPort):
@@ -32,6 +32,14 @@ class MonitoringFuelParser(ParserPort):
description="Получение данных за конкретный месяц" description="Получение данных за конкретный месяц"
) )
register_getter_from_schema(
parser_instance=self,
getter_name="series_by_id_and_columns",
method=self._get_series_by_id_and_columns,
schema_class=MonitoringFuelSeriesRequest,
description="Получение временных рядов по ID и колонкам"
)
def _get_total_by_columns(self, params: dict): def _get_total_by_columns(self, params: dict):
"""Агрегация данных по колонкам""" """Агрегация данных по колонкам"""
# Валидируем параметры с помощью схемы Pydantic # Валидируем параметры с помощью схемы Pydantic
@@ -39,31 +47,9 @@ class MonitoringFuelParser(ParserPort):
columns = validated_params["columns"] columns = validated_params["columns"]
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки) # TODO: Переделать под новую архитектуру
if hasattr(self, 'data_dict') and self.data_dict is not None: df_means, _ = self.aggregate_by_columns(self.df, columns)
# Данные из парсинга return df_means.to_dict(orient='index')
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
# Агрегируем данные по колонкам
df_means, _ = self.aggregate_by_columns(data_source, columns)
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_means.iterrows():
result[str(idx)] = {}
for col in columns:
value = row.get(col)
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
return result
def _get_month_by_code(self, params: dict): def _get_month_by_code(self, params: dict):
"""Получение данных за конкретный месяц""" """Получение данных за конкретный месяц"""
@@ -72,73 +58,14 @@ class MonitoringFuelParser(ParserPort):
month = validated_params["month"] month = validated_params["month"]
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки) # TODO: Переделать под новую архитектуру
if hasattr(self, 'data_dict') and self.data_dict is not None: df_month = self.get_month(self.df, month)
# Данные из парсинга return df_month.to_dict(orient='index')
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
# Получаем данные за конкретный месяц
df_month = self.get_month(data_source, month)
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_month.iterrows():
result[str(idx)] = {}
for col in df_month.columns:
value = row[col]
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
return result
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {}
# Группируем данные по месяцам
for _, row in self.df.iterrows():
month = row.get('month')
data = row.get('data')
if month and data is not None:
data_dict[month] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame: def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame""" """Парсинг файла и возврат DataFrame"""
# Парсим данные и сохраняем словарь для использования в геттерах # Сохраняем DataFrame для использования в геттерах
self.data_dict = self.parse_monitoring_fuel_files(file_path, params) self.df = self.parse_monitoring_fuel_files(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
if self.data_dict:
# Создаем DataFrame с информацией о месяцах и данных
data_rows = []
for month, df_data in self.data_dict.items():
if df_data is not None and not df_data.empty:
data_rows.append({
'month': month,
'rows_count': len(df_data),
'data': df_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
return self.df return self.df
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]: def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
@@ -173,30 +100,13 @@ class MonitoringFuelParser(ParserPort):
return df_monitorings return df_monitorings
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None): def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта''' ''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан # Автоопределение header_num, если не передан
if header_num is None: if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка") header_num = find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок # Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel( df_full = pd.read_excel(
file, file,
@@ -229,11 +139,7 @@ class MonitoringFuelParser(ParserPort):
if 'name' in df_full.columns: if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name' # Применяем функцию get_id_by_name к каждой строке в колонке 'name'
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code # df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
# Временно используем name как id pass # Placeholder for new_code
df_full['id'] = df_full['name']
else:
# Если нет колонки name, создаем id из индекса
df_full['id'] = df_full.index
# Устанавливаем id как индекс # Устанавливаем id как индекс
df_full.set_index('id', inplace=True) df_full.set_index('id', inplace=True)
@@ -322,3 +228,47 @@ class MonitoringFuelParser(ParserPort):
total.name = 'mean' total.name = 'mean'
return total, df_combined return total, df_combined
def _get_series_by_id_and_columns(self, params: dict):
"""Получение временных рядов по ID и колонкам"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelSeriesRequest)
columns = validated_params["columns"]
# Проверяем, что все колонки существуют хотя бы в одном месяце
valid_columns = set()
for month in self.df.values():
valid_columns.update(month.columns)
for col in columns:
if col not in valid_columns:
raise ValueError(f"Колонка '{col}' не найдена ни в одном месяце")
# Подготавливаем результат: словарь id → {col: [значения по месяцам]}
result = {}
# Обрабатываем месяцы от 01 до 12
for month_key in [f"{i:02d}" for i in range(1, 13)]:
if month_key not in self.df:
print(f"Месяц '{month_key}' не найден в df_monitorings, пропускаем.")
continue
df = self.df[month_key]
for col in columns:
if col not in df.columns:
continue # Пропускаем, если в этом месяце нет колонки
for idx, value in df[col].items():
if pd.isna(value):
continue # Можно пропустить NaN, или оставить как null
if idx not in result:
result[idx] = {c: [] for c in columns}
result[idx][col].append(value)
# Преобразуем ключи id в строки (для JSON-совместимости)
result_str_keys = {str(k): v for k, v in result.items()}
return result_str_keys

View File

@@ -25,197 +25,134 @@ class SvodkaCAParser(ParserPort):
def _get_data_wrapper(self, params: dict): def _get_data_wrapper(self, params: dict):
"""Получение данных по режимам и таблицам""" """Получение данных по режимам и таблицам"""
print(f"🔍 DEBUG: _get_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры с помощью схемы Pydantic # Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaCARequest) validated_params = validate_params_with_schema(params, SvodkaCARequest)
modes = validated_params["modes"] modes = validated_params["modes"]
tables = validated_params["tables"] tables = validated_params["tables"]
print(f"🔍 DEBUG: Запрошенные режимы: {modes}") # TODO: Переделать под новую архитектуру
print(f"🔍 DEBUG: Запрошенные таблицы: {tables}")
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
print(f"🔍 DEBUG: Используем data_dict с режимами: {list(data_source.keys())}")
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
print(f"🔍 DEBUG: Используем df, преобразованный в data_dict с режимами: {list(data_source.keys())}")
else:
print(f"🔍 DEBUG: Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
return {}
# Фильтруем данные по запрошенным режимам и таблицам
result_data = {}
for mode in modes:
if mode in data_source:
result_data[mode] = {}
available_tables = list(data_source[mode].keys())
print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {available_tables}")
for table_name, table_data in data_source[mode].items():
# Ищем таблицы по частичному совпадению
for requested_table in tables:
if requested_table in table_name:
result_data[mode][table_name] = table_data
print(f"🔍 DEBUG: Добавлена таблица '{table_name}' (совпадение с '{requested_table}') с {len(table_data)} записями")
break # Найдено совпадение, переходим к следующей таблице
else:
print(f"🔍 DEBUG: Режим '{mode}' не найден в data_source")
print(f"🔍 DEBUG: Итоговый результат содержит режимы: {list(result_data.keys())}")
return result_data
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {} data_dict = {}
for mode in modes:
# Группируем данные по режимам и таблицам data_dict[mode] = self.get_data(self.df, mode, tables)
for _, row in self.df.iterrows(): return self.data_dict_to_json(data_dict)
mode = row.get('mode')
table = row.get('table')
data = row.get('data')
if mode and table and data is not None:
if mode not in data_dict:
data_dict[mode] = {}
data_dict[mode][table] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame: def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame""" """Парсинг файла и возврат DataFrame"""
print(f"🔍 DEBUG: SvodkaCAParser.parse вызван с файлом: {file_path}") # Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_ca(file_path, params)
# Парсим данные и сохраняем словарь для использования в геттерах
self.data_dict = self.parse_svodka_ca(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
# Создаем простой DataFrame с информацией о загруженных данных
if self.data_dict:
# Создаем DataFrame с информацией о режимах и таблицах
data_rows = []
for mode, tables in self.data_dict.items():
for table_name, table_data in tables.items():
if table_data:
data_rows.append({
'mode': mode,
'table': table_name,
'rows_count': len(table_data),
'data': table_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
print(f"🔍 DEBUG: Создан DataFrame с {len(data_rows)} записями")
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
print(f"🔍 DEBUG: Возвращаем пустой DataFrame")
return self.df return self.df
def parse_svodka_ca(self, file_path: str, params: dict) -> dict: def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
"""Парсинг сводки СА - работает с тремя листами: План, Факт, Норматив""" """Парсинг сводки СА"""
print(f"🔍 DEBUG: Начинаем парсинг сводки СА из файла: {file_path}") # Получаем параметры из params
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'})
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив === # === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
# Выгружаем План # Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
inclusion_list_plan = { filtered_tables = []
"ТиП, %", for table in tables:
"Топливо итого, тонн", if table.empty:
"Топливо итого, %", continue
"Топливо на технологию, тонн", first_row_values = table.iloc[0].astype(str).str.strip().tolist()
"Топливо на технологию, %", if any(val in inclusion_list for val in first_row_values):
"Топливо на энергетику, тонн", filtered_tables.append(table)
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) tables = filtered_tables
print(f"🔍 DEBUG: Объединённый и отсортированный План: {df_ca_plan.shape if df_ca_plan is not None else 'None'}")
# Выгружаем Факт # === Итоговый список таблиц датафреймов ===
inclusion_list_fact = { result_list = []
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) for table in tables:
print(f"🔍 DEBUG: Объединённый и отсортированный Факт: {df_ca_fact.shape if df_ca_fact is not None else 'None'}") if table.empty:
continue
# Выгружаем Норматив # Получаем первую строку (до удаления)
inclusion_list_normativ = { first_row_values = table.iloc[0].astype(str).str.strip().tolist()
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ) # Находим, какой элемент из inclusion_list присутствует
print(f"🔍 DEBUG: Объединённый и отсортированный Норматив: {df_ca_normativ.shape if df_ca_normativ is not None else 'None'}") matched_key = None
for val in first_row_values:
if val in inclusion_list:
matched_key = val
break # берём первый совпадающий заголовок
# Преобразуем DataFrame в словарь по режимам и таблицам if matched_key is None:
data_dict = {} continue # на всякий случай (хотя уже отфильтровано)
# Обрабатываем План # Удаляем первую строку (заголовок) и сбрасываем индекс
if df_ca_plan is not None and not df_ca_plan.empty: df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
data_dict['plan'] = {}
for table_name, group_df in df_ca_plan.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['plan'][table_name] = table_data.to_dict('records')
# Обрабатываем Факт # Пропускаем, если таблица пустая
if df_ca_fact is not None and not df_ca_fact.empty: if df_cleaned.empty:
data_dict['fact'] = {} continue
for table_name, group_df in df_ca_fact.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['fact'][table_name] = table_data.to_dict('records')
# Обрабатываем Норматив # Первая строка становится заголовком
if df_ca_normativ is not None and not df_ca_normativ.empty: new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
data_dict['normativ'] = {}
for table_name, group_df in df_ca_normativ.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['normativ'][table_name] = table_data.to_dict('records')
print(f"🔍 DEBUG: Итоговый data_dict содержит режимы: {list(data_dict.keys())}") # Преобразуем заголовок: только первый столбец может быть заменён на "name"
for mode, tables in data_dict.items(): cleaned_header = []
print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {list(tables.keys())}")
return data_dict # Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
if first_item_str == "" or first_item_str == "nan":
cleaned_header.append("name")
else:
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def extract_all_tables(self, file_path, sheet_name=0): def extract_all_tables(self, file_path, sheet_name=0):
"""Извлечение всех таблиц из Excel файла""" """Извлечение всех таблиц из Excel файла"""

View File

@@ -1,326 +0,0 @@
import pandas as pd
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort):
"""Парсер для сводок ПМ (план и факт)"""
name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
first_data_row = df_probe.iloc[0]
# Находим столбец с 'INDICATOR_ID'
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
if len(indicator_cols) == 0:
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
if indicator_col_name not in df_full.columns:
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
# Перемещаем INDICATOR_ID в начало и делаем индексом
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
df_full = df_full[cols]
df_full.set_index(indicator_col_name, inplace=True)
# Обрезаем до "Итого" + 1
header_list = [str(h).strip() for h in df_full.columns]
try:
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = []
last_good_name = None
for col in df_final.columns:
col_str = str(col).strip()
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, используем имя по умолчанию
new_columns.append(f"col_{len(new_columns)}")
else:
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Убеждаемся, что количество столбцов совпадает
if len(new_columns) != len(df_final.columns):
# Если количество не совпадает, обрезаем или дополняем
if len(new_columns) > len(df_final.columns):
new_columns = new_columns[:len(df_final.columns)]
else:
# Дополняем недостающие столбцы
while len(new_columns) < len(df_final.columns):
new_columns.append(f"col_{len(new_columns)}")
# Применяем новые заголовки
df_final.columns = new_columns
return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
else:
return numeric_values.iloc[0]
else:
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
result = {}
# Безопасно получаем данные, проверяя их наличие
fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None
plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
col_result = {}
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
return total_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -1,14 +1,9 @@
import pandas as pd import pandas as pd
import os
import json
import zipfile
import tempfile
import shutil
from typing import Dict, Any, List, Optional
from core.ports import ParserPort from core.ports import ParserPort
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, data_to_json, find_header_row
class SvodkaPMParser(ParserPort): class SvodkaPMParser(ParserPort):
@@ -16,140 +11,74 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ" name = "Сводки ПМ"
def __init__(self):
super().__init__()
self._register_default_getters()
def _register_default_getters(self): def _register_default_getters(self):
"""Регистрация геттеров для Сводки ПМ""" """Регистрация геттеров по умолчанию"""
self.register_getter( # Используем схемы Pydantic как единый источник правды
name="single_og", register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og, method=self._get_single_og,
required_params=["id", "codes", "columns"], schema_class=SvodkaPMSingleOGRequest,
optional_params=["search"], description="Получение данных по одному ОГ"
description="Получение данных по одному ОГ из сводки ПМ"
) )
self.register_getter( register_getter_from_schema(
name="total_ogs", parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs, method=self._get_total_ogs,
required_params=["codes", "columns"], schema_class=SvodkaPMTotalOGsRequest,
optional_params=["search"], description="Получение данных по всем ОГ"
description="Получение данных по всем ОГ из сводки ПМ"
) )
def parse(self, file_path: str, params: dict) -> Dict[str, pd.DataFrame]: def _get_single_og(self, params: dict):
"""Парсинг ZIP архива со сводками ПМ и возврат словаря с DataFrame""" """Получение данных по одному ОГ"""
# Проверяем расширение файла # Валидируем параметры с помощью схемы Pydantic
if not file_path.lower().endswith('.zip'): validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
raise ValueError(f"Ожидается ZIP архив: {file_path}")
# Создаем временную директорию для разархивирования og_id = validated_params["id"]
temp_dir = tempfile.mkdtemp() codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
try: # Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# Разархивируем файл # TODO: Переделать под новую архитектуру
with zipfile.ZipFile(file_path, 'r') as zip_ref: return self.get_svodka_og(self.df, og_id, codes, columns, search)
zip_ref.extractall(temp_dir)
print(f"📦 Архив разархивирован в: {temp_dir}")
# Посмотрим, что находится в архиве def _get_total_ogs(self, params: dict):
print(f"🔍 Содержимое архива:") """Получение данных по всем ОГ"""
for root, dirs, files in os.walk(temp_dir): # Валидируем параметры с помощью схемы Pydantic
level = root.replace(temp_dir, '').count(os.sep) validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f"{subindent}{file}")
# Создаем словари для хранения данных как в оригинале codes = validated_params["codes"]
df_pm_facts = {} # Словарь с данными факта, ключ - ID ОГ columns = validated_params["columns"]
df_pm_plans = {} # Словарь с данными плана, ключ - ID ОГ search = validated_params.get("search")
# Ищем файлы в архиве (адаптируемся к реальной структуре) # TODO: Переделать под новую архитектуру
fact_files = [] return self.get_svodka_total(self.df, codes, columns, search)
plan_files = []
for root, dirs, files in os.walk(temp_dir): def parse(self, file_path: str, params: dict) -> pd.DataFrame:
for file in files: """Парсинг файла и возврат DataFrame"""
if file.lower().endswith(('.xlsx', '.xlsm')): # Сохраняем DataFrame для использования в геттерах
full_path = os.path.join(root, file) self.df = self.parse_svodka_pm_files(file_path, params)
if 'fact' in file.lower() or 'факт' in file.lower(): return self.df
fact_files.append(full_path)
elif 'plan' in file.lower() or 'план' in file.lower():
plan_files.append(full_path)
print(f"📊 Найдено файлов факта: {len(fact_files)}")
print(f"📊 Найдено файлов плана: {len(plan_files)}")
# Обрабатываем найденные файлы
for fact_file in fact_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(fact_file)
# Ищем паттерн типа svodka_fact_pm_SNPZ.xlsm
if 'svodka_fact_pm_' in filename:
og_id = filename.replace('svodka_fact_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка факта: {fact_file} (ОГ: {og_id})')
df_pm_facts[og_id] = self._parse_svodka_pm(fact_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен для {og_id}")
for plan_file in plan_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(plan_file)
# Ищем паттерн типа svodka_plan_pm_SNPZ.xlsm
if 'svodka_plan_pm_' in filename:
og_id = filename.replace('svodka_plan_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка плана: {plan_file} (ОГ: {og_id})')
df_pm_plans[og_id] = self._parse_svodka_pm(plan_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен для {og_id}")
# Инициализируем None для ОГ, для которых файлы не найдены
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
if og_id not in df_pm_facts:
df_pm_facts[og_id] = None
if og_id not in df_pm_plans:
df_pm_plans[og_id] = None
# Возвращаем словарь с данными (как в оригинале) def parse_svodka_pm(self, file, sheet, header_num=None):
result = { ''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
'df_pm_facts': df_pm_facts,
'df_pm_plans': df_pm_plans
}
print(f"🎯 Обработано ОГ: {len([k for k, v in df_pm_facts.items() if v is not None])} факт, {len([k for k, v in df_pm_plans.items() if v is not None])} план")
return result
finally:
# Удаляем временную директорию
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов одного ОГ для БП, ПП и факта"""
try:
# Автоопределение header_num, если не передан # Автоопределение header_num, если не передан
if header_num is None: if header_num is None:
header_num = find_header_row(file_path, sheet_name, search_value="Итого") header_num = find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID # Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel( df_probe = pd.read_excel(
file_path, file,
sheet_name=sheet_name, sheet_name=sheet,
header=header_num, header=header_num,
usecols=None, usecols=None,
nrows=2, nrows=2,
engine='openpyxl' # Явно указываем движок engine='openpyxl'
) )
except Exception as e:
raise ValueError(f"Ошибка при чтении файла {file_path}: {str(e)}")
if df_probe.shape[0] == 0: if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.") raise ValueError("Файл пуст или не содержит данных.")
@@ -162,15 +91,16 @@ class SvodkaPMParser(ParserPort):
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.') raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0] indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист # Читаем весь лист
df_full = pd.read_excel( df_full = pd.read_excel(
file_path, file,
sheet_name=sheet_name, sheet_name=sheet,
header=header_num, header=header_num,
usecols=None, usecols=None,
index_col=None, index_col=None,
engine='openpyxl' # Явно указываем движок engine='openpyxl'
) )
if indicator_col_name not in df_full.columns: if indicator_col_name not in df_full.columns:
@@ -187,18 +117,19 @@ class SvodkaPMParser(ParserPort):
itogo_idx = header_list.index("Итого") itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2 num_cols_needed = itogo_idx + 2
except ValueError: except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list) num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list)) num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed] df_final = df_full.iloc[:, :num_cols_needed]
# Удаление полностью пустых столбцов # === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True) df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA) df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any() non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask] df_final = df_final.loc[:, non_empty_mask]
# Обработка заголовков: Unnamed и "Итого" → "Итого" # === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = [] new_columns = []
last_good_name = None last_good_name = None
for col in df_final.columns: for col in df_final.columns:
@@ -207,152 +138,109 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной # Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
# Проверяем, начинается ли на "Итого" if is_empty_or_unnamed:
if col_str.startswith('Итого'): # Если это пустая колонка, используем последнее хорошее имя
current_name = 'Итого' if last_good_name:
last_good_name = current_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name) new_columns.append(last_good_name)
else: else:
# Имя, полученное из excel # Если нет хорошего имени, пропускаем
continue
else:
# Это хорошая колонка
last_good_name = col_str last_good_name = col_str
new_columns.append(col_str) new_columns.append(col_str)
# Применяем новые заголовки
df_final.columns = new_columns df_final.columns = new_columns
return df_final return df_final
def _get_svodka_value(self, df_svodka: pd.DataFrame, og_id: str, code: int, search_value: Optional[str] = None): def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Служебная функция для простой выборке по сводке""" """Парсинг ZIP архива со сводками ПМ"""
print(f"🔍 DEBUG: Ищем код '{code}' для ОГ '{og_id}' в DataFrame с {len(df_svodka)} строками") import zipfile
print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}") pm_dict = {
print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}") "facts": {},
print(f"🔍 DEBUG: Доступные столбцы: {list(df_svodka.columns)}") "plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for id in SINGLE_OGS:
if id == 'BASH':
continue # пропускаем BASH
# Проверяем, есть ли код в индексе current_fact = replace_id_in_path(excel_fact_template, id)
if code not in df_svodka.index: fact_candidates = [f for f in file_list if current_fact in f]
print(f"⚠️ Код '{code}' не найден в индексе") if len(fact_candidates) == 1:
return 0 print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
# Получаем позицию строки с кодом current_plan = replace_id_in_path(excel_plan_template, id)
code_row_loc = df_svodka.index.get_loc(code) plan_candidates = [f for f in file_list if current_plan in f]
print(f"🔍 DEBUG: Код '{code}' в позиции {code_row_loc}") if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
# Определяем позиции для поиска return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None: if search_value is None:
# Ищем все позиции кроме "Итого" и None (первый столбец с заголовком) mask_name = df_svodka.columns != 'Итого'
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name != 'Итого' and col_name is not None:
target_positions.append(i)
else: else:
# Ищем позиции в первой строке, где есть нужное название mask_name = df_svodka.columns == search_value
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name == search_value:
target_positions.append(i)
print(f"🔍 DEBUG: Найдены позиции для '{search_value}': {target_positions[:5]}...") # Убедимся, что маски совпадают по длине
print(f"🔍 DEBUG: Позиции в первой строке: {target_positions[:5]}...") if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
print(f"🔍 DEBUG: Ищем столбцы с названием '{search_value}'") final_mask = mask_value & mask_name # булевая маска по позициям столбцов
print(f"🔍 DEBUG: Целевые позиции: {target_positions[:10]}...") col_positions = final_mask.values # numpy array или Series булевых значений
if not target_positions: if not final_mask.any():
print(f"⚠️ Позиции '{search_value}' не найдены") print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0 return 0
# Извлекаем значения из найденных позиций
values = []
for pos in target_positions:
# Берем значение из пересечения строки с кодом и позиции столбца
value = df_svodka.iloc[code_row_loc, pos]
# Если это Series, берем первое значение
if isinstance(value, pd.Series):
if len(value) > 0:
# Берем первое не-NaN значение
first_valid = value.dropna().iloc[0] if not value.dropna().empty else 0
values.append(first_valid)
else: else:
values.append(0) if row_index in df_svodka.index:
else: # Получаем позицию строки
values.append(value) row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат # Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce') numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения (первые 5): {numeric_values.tolist()[:5]}")
# Попробуем альтернативное преобразование
try:
# Если pandas не может преобразовать, попробуем вручную
manual_values = []
for v in values:
if pd.isna(v) or v is None:
manual_values.append(0)
else:
try:
# Пробуем преобразовать в float
manual_values.append(float(str(v).replace(',', '.')))
except (ValueError, TypeError):
manual_values.append(0)
print(f"🔍 DEBUG: Ручное преобразование (первые 5): {manual_values[:5]}")
numeric_values = pd.Series(manual_values)
except Exception as e:
print(f"⚠️ Ошибка при ручном преобразовании: {e}")
# Используем исходные значения
numeric_values = pd.Series([0 if pd.isna(v) or v is None else v for v in values])
# Агрегация данных (NaN игнорируются) # Агрегация данных (NaN игнорируются)
if search_value is None: if search_value is None:
# Возвращаем массив всех значений (игнорируя NaN) return numeric_values
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else: else:
return [] return numeric_values.iloc[0]
else: else:
return [] return None
else:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None): def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
"""Служебная функция получения данных по одному ОГ""" ''' Служебная функция получения данных по одному ОГ '''
result = {} result = {}
# Получаем данные из сохраненных словарей (через self.df) fact_df = pm_dict['facts'][id]
if not hasattr(self, 'df') or self.df is None: plan_df = pm_dict['plans'][id]
print("❌ Данные не загружены. Сначала загрузите ZIP архив.")
return {col: {str(code): None for code in codes} for col in columns}
# Извлекаем словари из сохраненных данных
df_pm_facts = self.df.get('df_pm_facts', {})
df_pm_plans = self.df.get('df_pm_plans', {})
# Получаем данные для конкретного ОГ
fact_df = df_pm_facts.get(og_id)
plan_df = df_pm_plans.get(og_id)
print(f"🔍 ===== НАЧАЛО ОБРАБОТКИ ОГ {og_id} =====")
print(f"🔍 Коды: {codes}")
print(f"🔍 Столбцы: {columns}")
print(f"🔍 Получены данные для {og_id}: факт={'' if fact_df is not None else ''}, план={'' if plan_df is not None else ''}")
# Определяем, какие столбцы из какого датафрейма брать # Определяем, какие столбцы из какого датафрейма брать
for col in columns: for col in columns:
@@ -360,91 +248,49 @@ class SvodkaPMParser(ParserPort):
if col in ['ПП', 'БП']: if col in ['ПП', 'БП']:
if plan_df is None: if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}") print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
else: else:
print(f"🔍 DEBUG: ===== ОБРАБАТЫВАЕМ '{col}' ИЗ ДАННЫХ ПЛАНА =====")
for code in codes: for code in codes:
print(f"🔍 DEBUG: --- Код {code} для {col} ---") val = self.get_svodka_value(plan_df, code, col, search_value)
val = self._get_svodka_value(plan_df, og_id, code, col) col_result[code] = val
col_result[str(code)] = val
print(f"🔍 DEBUG: ===== ЗАВЕРШИЛИ ОБРАБОТКУ '{col}' =====")
elif col in ['ТБ', 'СЭБ', 'НЭБ']: elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None: if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}") print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
else: else:
for code in codes: for code in codes:
val = self._get_svodka_value(fact_df, og_id, code, col) val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[str(code)] = val col_result[code] = val
else: else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.") print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {str(code): None for code in codes} col_result = {code: None for code in codes}
result[col] = col_result result[col] = col_result
return result return result
def _get_single_og(self, params: Dict[str, Any]) -> str: def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
"""API функция для получения данных по одному ОГ""" ''' Служебная функция агрегации данные по всем ОГ '''
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = self._get_svodka_og(og_id, codes, columns, search)
json_result = data_to_json(data)
return json_result
def _get_total_ogs(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по всем ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"❌Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
total_result = {} total_result = {}
for og_id in SINGLE_OGS: for og_id in SINGLE_OGS:
if og_id == 'BASH': if og_id == 'BASH':
continue continue
# print(f"📊 Обработка: {og_id}")
try: try:
data = self._get_svodka_og(og_id, codes, columns, search) data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data total_result[og_id] = data
except Exception as e: except Exception as e:
print(f"❌ Ошибка при обработке {og_id}: {e}") print(f"❌ Ошибка при обработке {og_id}: {e}")
total_result[og_id] = None total_result[og_id] = None
json_result = data_to_json(total_result) return total_result
return json_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -213,18 +213,11 @@ def data_to_json(data, indent=2, ensure_ascii=False):
# --- рекурсия по dict и list --- # --- рекурсия по dict и list ---
elif isinstance(obj, dict): elif isinstance(obj, dict):
# Обрабатываем только значения, ключи оставляем как строки return {
converted = {} key: convert_obj(value)
for k, v in obj.items(): for key, value in obj.items()
if is_nan_like(k): if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON)
continue # ключи не могут быть null в JSON }
# Превращаем ключ в строку, но не пытаемся интерпретировать как число
key_str = str(k)
converted[key_str] = convert_obj(v) # только значение проходит через convert_obj
# Если все значения 0.0 — считаем, что данных нет, т.к. возможно ожидается массив.
if converted and all(v == 0.0 for v in converted.values()):
return None
return converted
elif isinstance(obj, list): elif isinstance(obj, list):
return [convert_obj(item) for item in obj] return [convert_obj(item) for item in obj]

View File

@@ -16,7 +16,7 @@ from app.schemas import (
UploadResponse, UploadErrorResponse, UploadResponse, UploadErrorResponse,
SvodkaPMTotalOGsRequest, SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest, SvodkaPMSingleOGRequest,
SvodkaCARequest, SvodkaCARequest,
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest MonitoringFuelMonthRequest, MonitoringFuelTotalRequest, MonitoringFuelSeriesRequest
) )
@@ -323,7 +323,7 @@ async def get_svodka_pm_single_og(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'single_og' request_dict['mode'] = 'single'
request = DataRequest( request = DataRequest(
report_type='svodka_pm', report_type='svodka_pm',
get_params=request_dict get_params=request_dict
@@ -400,41 +400,6 @@ async def get_svodka_pm_total_ogs(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}") raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_pm/get_data", tags=[SvodkaPMParser.name])
async def get_svodka_pm_data(
request_data: dict
):
report_service = get_report_service()
"""
Получение данных из отчета сводки факта СарНПЗ
- indicator_id: ID индикатора
- code: Код для поиска
- search_value: Опциональное значение для поиска
"""
try:
# Создаем запрос
request = DataRequest(
report_type='svodka_pm',
get_params=request_data
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_ca/upload", tags=[SvodkaCAParser.name], @app.post("/svodka_ca/upload", tags=[SvodkaCAParser.name],
summary="Загрузка файла отчета сводки СА", summary="Загрузка файла отчета сводки СА",
@@ -509,7 +474,7 @@ async def upload_svodka_ca(
) )
@app.post("/svodka_ca/get_data", tags=[SvodkaCAParser.name], @app.post("/svodka_ca/get_ca_data", tags=[SvodkaCAParser.name],
summary="Получение данных из отчета сводки СА") summary="Получение данных из отчета сводки СА")
async def get_svodka_ca_data( async def get_svodka_ca_data(
request_data: SvodkaCARequest request_data: SvodkaCARequest
@@ -534,6 +499,7 @@ async def get_svodka_ca_data(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'get_ca_data'
request = DataRequest( request = DataRequest(
report_type='svodka_ca', report_type='svodka_ca',
get_params=request_dict get_params=request_dict
@@ -610,38 +576,6 @@ async def get_svodka_ca_data(
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}") # raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/monitoring_fuel/get_data", tags=[MonitoringFuelParser.name])
async def get_monitoring_fuel_data(
request_data: dict
):
report_service = get_report_service()
"""
Получение данных из отчета мониторинга топлива
- column: Название колонки для агрегации (normativ, total, total_svod)
"""
try:
# Создаем запрос
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_data
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload_directory", tags=[MonitoringFuelParser.name]) # @app.post("/monitoring_fuel/upload_directory", tags=[MonitoringFuelParser.name])
@@ -872,5 +806,54 @@ async def get_monitoring_fuel_month_by_code(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}") raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/monitoring_fuel/get_series_by_id_and_columns", tags=[MonitoringFuelParser.name],
summary="Получение временных рядов по ID и колонкам")
async def get_monitoring_fuel_series_by_id_and_columns(
request_data: MonitoringFuelSeriesRequest
):
"""Получение временных рядов из сводок мониторинга топлива по ID и колонкам
### Структура параметров:
- `columns`: **Массив названий** выбираемых столбцов для получения временных рядов (обязательный)
### Пример тела запроса:
```json
{
"columns": ["total", "normativ"]
}
```
### Возвращает:
Словарь где ключ - ID объекта, значение - словарь с колонками,
в которых хранятся списки значений по месяцам.
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'series_by_id_and_columns'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080) uvicorn.run(app, host="0.0.0.0", port=8080)

View File

@@ -1,4 +1,4 @@
from .monitoring_fuel import MonitoringFuelMonthRequest, MonitoringFuelTotalRequest from .monitoring_fuel import MonitoringFuelMonthRequest, MonitoringFuelTotalRequest, MonitoringFuelSeriesRequest
from .svodka_ca import SvodkaCARequest from .svodka_ca import SvodkaCARequest
from .svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest from .svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from .server import ServerInfoResponse from .server import ServerInfoResponse

View File

@@ -32,3 +32,19 @@ class MonitoringFuelTotalRequest(BaseModel):
"columns": ["total", "normativ"] "columns": ["total", "normativ"]
} }
} }
class MonitoringFuelSeriesRequest(BaseModel):
columns: List[str] = Field(
...,
description="Массив названий выбираемых столбцов для получения временных рядов",
example=["total", "normativ"],
min_items=1
)
class Config:
json_schema_extra = {
"example": {
"columns": ["total", "normativ"]
}
}

View File

@@ -25,7 +25,7 @@ class OGID(str, Enum):
class SvodkaPMSingleOGRequest(BaseModel): class SvodkaPMSingleOGRequest(BaseModel):
id: str = Field( id: OGID = Field(
..., ...,
description="Идентификатор МА для запрашиваемого ОГ", description="Идентификатор МА для запрашиваемого ОГ",
example="SNPZ" example="SNPZ"

View File

@@ -135,6 +135,10 @@ def validate_params_with_schema(params: Dict[str, Any], schema_class: Type[BaseM
try: try:
# Создаем экземпляр схемы для валидации # Создаем экземпляр схемы для валидации
validated_data = schema_class(**params) validated_data = schema_class(**params)
# Используем model_dump() для Pydantic v2 или dict() для v1
if hasattr(validated_data, 'model_dump'):
return validated_data.model_dump()
else:
return validated_data.dict() return validated_data.dict()
except Exception as e: except Exception as e:
raise ValueError(f"Ошибка валидации параметров: {str(e)}") raise ValueError(f"Ошибка валидации параметров: {str(e)}")

View File

@@ -43,7 +43,7 @@ class ReportService:
try: try:
# Парсим файл # Парсим файл
parse_params = request.parse_params or {} parse_params = request.parse_params or {}
parse_result = parser.parse(temp_file_path, parse_params) df = parser.parse(temp_file_path, parse_params)
# Генерируем object_id # Генерируем object_id
object_id = f"nin_excel_data_{request.report_type}" object_id = f"nin_excel_data_{request.report_type}"
@@ -54,7 +54,7 @@ class ReportService:
print(f"Старый объект удален: {object_id}") print(f"Старый объект удален: {object_id}")
# Сохраняем в хранилище # Сохраняем в хранилище
if self.storage.save_dataframe(parse_result, object_id): if self.storage.save_dataframe(df, object_id):
return UploadResult( return UploadResult(
success=True, success=True,
message="Отчет успешно загружен", message="Отчет успешно загружен",
@@ -89,9 +89,9 @@ class ReportService:
message=f"Отчет типа '{request.report_type}' не найден. Возможно, MinIO недоступен или отчет не был загружен." message=f"Отчет типа '{request.report_type}' не найден. Возможно, MinIO недоступен или отчет не был загружен."
) )
# Загружаем данные из хранилища # Загружаем DataFrame из хранилища
loaded_data = self.storage.load_dataframe(object_id) df = self.storage.load_dataframe(object_id)
if loaded_data is None: if df is None:
return DataResult( return DataResult(
success=False, success=False,
message="Ошибка при загрузке данных из хранилища. Возможно, MinIO недоступен." message="Ошибка при загрузке данных из хранилища. Возможно, MinIO недоступен."
@@ -100,72 +100,13 @@ class ReportService:
# Получаем парсер # Получаем парсер
parser = get_parser(request.report_type) parser = get_parser(request.report_type)
# Устанавливаем данные в парсер для использования в геттерах # Устанавливаем DataFrame в парсер для использования в геттерах
parser.df = loaded_data parser.df = df
print(f"🔍 DEBUG: ReportService.get_data - установлены данные в парсер {request.report_type}")
# Проверяем тип загруженных данных
if hasattr(loaded_data, 'shape'):
# Это DataFrame
print(f"🔍 DEBUG: DataFrame shape: {loaded_data.shape}")
print(f"🔍 DEBUG: DataFrame columns: {list(loaded_data.columns) if not loaded_data.empty else 'Empty'}")
elif isinstance(loaded_data, dict):
# Это словарь (для парсера ПМ)
print(f"🔍 DEBUG: Словарь с ключами: {list(loaded_data.keys())}")
else:
print(f"🔍 DEBUG: Неизвестный тип данных: {type(loaded_data)}")
# Получаем параметры запроса # Получаем параметры запроса
get_params = request.get_params or {} get_params = request.get_params or {}
# Для svodka_ca определяем режим из данных или используем 'fact' по умолчанию # Определяем имя геттера из параметра mode
if request.report_type == 'svodka_ca':
# Извлекаем режим из DataFrame или используем 'fact' по умолчанию
if hasattr(parser, 'df') and parser.df is not None and not parser.df.empty:
modes_in_df = parser.df['mode'].unique() if 'mode' in parser.df.columns else ['fact']
# Используем первый найденный режим или 'fact' по умолчанию
default_mode = modes_in_df[0] if len(modes_in_df) > 0 else 'fact'
else:
default_mode = 'fact'
# Устанавливаем режим в параметры, если он не указан
if 'mode' not in get_params:
get_params['mode'] = default_mode
# Определяем имя геттера
if request.report_type == 'svodka_ca':
# Для svodka_ca используем геттер get_ca_data
getter_name = 'get_ca_data'
elif request.report_type == 'monitoring_fuel':
# Для monitoring_fuel определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
elif request.report_type == 'svodka_pm':
# Для svodka_pm определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
else:
# Для других парсеров определяем из параметра mode
getter_name = get_params.pop("mode", None) getter_name = get_params.pop("mode", None)
if not getter_name: if not getter_name:
# Если режим не указан, берем первый доступный # Если режим не указан, берем первый доступный

View File

@@ -1,20 +0,0 @@
#!/usr/bin/env python3
"""
Простой тест для проверки работы FastAPI
"""
from fastapi import FastAPI
app = FastAPI(title="Test API")
@app.get("/")
async def root():
return {"message": "Test API is working"}
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
print("Starting test server...")
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -277,7 +277,7 @@ def main():
"tables": tables "tables": tables
} }
result, status = make_api_request("/svodka_ca/get_data", data) result, status = make_api_request("/svodka_ca/get_ca_data", data)
if status == 200: if status == 200:
st.success("✅ Данные получены") st.success("✅ Данные получены")
@@ -371,6 +371,34 @@ def main():
else: else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}") st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Новая секция для временных рядов
st.markdown("---")
st.subheader("📈 Временные ряды по ID и колонкам")
columns_series = st.multiselect(
"Выберите столбцы для временных рядов",
["normativ", "total", "total_1"],
default=["normativ", "total"],
key="fuel_series_columns"
)
if st.button("📈 Получить временные ряды", key="fuel_series_btn"):
if columns_series:
with st.spinner("Получаю временные ряды..."):
data = {
"columns": columns_series
}
result, status = make_api_request("/monitoring_fuel/get_series_by_id_and_columns", data)
if status == 200:
st.success("✅ Временные ряды получены")
st.json(result)
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
else:
st.warning("⚠️ Выберите столбцы")
# Футер # Футер
st.markdown("---") st.markdown("---")
st.markdown("### 📚 Документация API") st.markdown("### 📚 Документация API")

123
tests/README.md Normal file
View File

@@ -0,0 +1,123 @@
# API Endpoints Tests
Этот модуль содержит pytest тесты для всех API эндпоинтов проекта NIN Excel Parsers.
## Структура
```
tests/
├── __init__.py
├── conftest.py # Конфигурация pytest
├── test_all_endpoints.py # Основной файл для запуска всех тестов
├── test_upload_endpoints.py # Тесты API эндпоинтов загрузки данных
├── test_svodka_pm_endpoints.py # Тесты API svodka_pm эндпоинтов
├── test_svodka_ca_endpoints.py # Тесты API svodka_ca эндпоинтов
├── test_monitoring_fuel_endpoints.py # Тесты API monitoring_fuel эндпоинтов
├── test_parsers_direct.py # Прямое тестирование парсеров
├── test_upload_with_local_storage.py # Тестирование загрузки в локальный storage
├── test_getters_with_local_storage.py # Тестирование геттеров с локальными данными
├── test_data/ # Тестовые данные
│ ├── svodka_ca.xlsx
│ ├── pm_plan.zip
│ └── monitoring.zip
├── local_storage/ # Локальный storage (создается автоматически)
│ ├── data/ # Сохраненные DataFrame
│ └── metadata/ # Метаданные объектов
├── requirements.txt # Зависимости для тестов
└── README.md # Этот файл
```
## Установка зависимостей
```bash
pip install -r tests/requirements.txt
```
## Запуск тестов
### Запуск всех тестов
```bash
cd tests
python test_all_endpoints.py
```
### Запуск конкретных тестов
```bash
# API тесты (требуют запущенный сервер)
pytest test_upload_endpoints.py -v
pytest test_svodka_pm_endpoints.py -v
pytest test_svodka_ca_endpoints.py -v
pytest test_monitoring_fuel_endpoints.py -v
# Прямые тесты парсеров (не требуют сервер)
pytest test_parsers_direct.py -v
pytest test_upload_with_local_storage.py -v
pytest test_getters_with_local_storage.py -v
# Все тесты с локальным storage
pytest test_parsers_direct.py test_upload_with_local_storage.py test_getters_with_local_storage.py -v
```
## Предварительные условия
1. **API сервер должен быть запущен** на `http://localhost:8000` (только для API тестов)
2. **Тестовые данные** находятся в папке `test_data/`
3. **Локальный storage** используется для прямого тестирования парсеров
## Последовательность тестирования
### Вариант 1: API тесты (требуют запущенный сервер)
1. **Загрузка данных** (`test_upload_endpoints.py`)
- Загрузка `svodka_ca.xlsx`
- Загрузка `pm_plan.zip`
- Загрузка `monitoring.zip`
2. **Тестирование эндпоинтов** (в любом порядке)
- `test_svodka_pm_endpoints.py`
- `test_svodka_ca_endpoints.py`
- `test_monitoring_fuel_endpoints.py`
### Вариант 2: Прямые тесты (не требуют сервер)
1. **Тестирование парсеров** (`test_parsers_direct.py`)
- Проверка регистрации парсеров
- Проверка локального storage
2. **Загрузка в локальный storage** (`test_upload_with_local_storage.py`)
- Загрузка всех файлов в локальный storage
- Проверка сохранения данных
3. **Тестирование геттеров** (`test_getters_with_local_storage.py`)
- Тестирование всех геттеров с локальными данными
- Выявление проблем в логике парсеров
## Ожидаемые результаты
Все тесты должны возвращать **статус 200** и содержать поле `"success": true` в ответе.
## Примеры тестовых запросов
Тесты используют примеры из Pydantic схем:
### svodka_pm
```json
{
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
```
### svodka_ca
```json
{
"modes": ["fact", "plan"],
"tables": ["table1", "table2"]
}
```
### monitoring_fuel
```json
{
"columns": ["total", "normativ"]
}
```

71
tests/TEST_RESULTS.md Normal file
View File

@@ -0,0 +1,71 @@
# Результаты тестирования API эндпоинтов
## Сводка
Создана полная система тестирования с локальным storage для проверки всех API эндпоинтов проекта NIN Excel Parsers.
## Структура тестов
### 1. Прямые тесты парсеров (`test_parsers_direct.py`)
-**Регистрация парсеров** - все парсеры корректно регистрируются
-**Локальный storage** - работает корректно
-**ReportService** - корректно работает с локальным storage
### 2. Тесты загрузки (`test_upload_with_local_storage.py`)
-**svodka_ca.xlsx** - парсер возвращает `None`
-**pm_plan.zip** - парсер возвращает словарь с `None` значениями
-**monitoring.zip** - парсер возвращает пустой словарь
### 3. Тесты геттеров (`test_getters_with_local_storage.py`)
-**Все геттеры** - не работают из-за проблем с загрузкой данных
### 4. API тесты (`test_*_endpoints.py`)
-**Загрузка файлов** - эндпоинты работают
-**Геттеры** - не работают из-за проблем с данными
## Выявленные проблемы
### 1. Парсер svodka_ca
- **Проблема**: Возвращает `None` вместо DataFrame
- **Причина**: Парсер не может обработать тестовый файл `svodka_ca.xlsx`
- **Статус**: Требует исправления
### 2. Парсер svodka_pm
- **Проблема**: Возвращает словарь с `None` значениями
- **Причина**: Файлы в архиве `pm_plan.zip` не найдены (неправильные имена файлов)
- **Статус**: Требует исправления логики поиска файлов
### 3. Парсер monitoring_fuel
- **Проблема**: Возвращает пустой словарь
- **Причина**: Ошибки при загрузке файлов - "None of ['id'] are in the columns"
- **Статус**: Требует исправления логики обработки колонок
## Рекомендации
### Немедленные действия
1. **Исправить парсер svodka_ca** - проверить логику парсинга Excel файлов
2. **Исправить парсер svodka_pm** - проверить логику поиска файлов в архиве
3. **Исправить парсер monitoring_fuel** - проверить логику обработки колонок
### Долгосрочные улучшения
1. **Улучшить обработку ошибок** в парсерах
2. **Добавить валидацию данных** перед сохранением
3. **Создать более детальные тесты** для каждого парсера
## Техническая информация
### Локальный storage
- ✅ Создан `LocalStorageAdapter` для тестирования
- ✅ Поддерживает все операции: save, load, delete, list
- ✅ Автоматически очищается после тестов
### Инфраструктура тестов
- ✅ Pytest конфигурация с фикстурами
- ✅ Автоматическая регистрация парсеров
- ✅ Поддержка как API, так и прямых тестов
## Заключение
Система тестирования создана и работает корректно. Выявлены конкретные проблемы в парсерах, которые требуют исправления. После исправления парсеров все тесты должны пройти успешно.
**Следующий шаг**: Исправить выявленные проблемы в парсерах согласно результатам отладочных тестов.

1
tests/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Tests package

97
tests/conftest.py Normal file
View File

@@ -0,0 +1,97 @@
"""
Конфигурация pytest для тестирования API эндпоинтов
"""
import pytest
import requests
import time
import os
import sys
from pathlib import Path
# Добавляем путь к проекту для импорта модулей
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from adapters.local_storage import LocalStorageAdapter
# Базовый URL API
API_BASE_URL = "http://localhost:8000"
# Путь к тестовым данным
TEST_DATA_DIR = Path(__file__).parent / "test_data"
@pytest.fixture(scope="session")
def api_base_url():
"""Базовый URL для API"""
return API_BASE_URL
@pytest.fixture(scope="session")
def test_data_dir():
"""Директория с тестовыми данными"""
return TEST_DATA_DIR
@pytest.fixture(scope="session")
def wait_for_api():
"""Ожидание готовности API"""
max_attempts = 30
for attempt in range(max_attempts):
try:
response = requests.get(f"{API_BASE_URL}/docs", timeout=5)
if response.status_code == 200:
print(f"✅ API готов после {attempt + 1} попыток")
return True
except requests.exceptions.RequestException:
pass
if attempt < max_attempts - 1:
time.sleep(2)
pytest.fail("❌ API не готов после 30 попыток")
@pytest.fixture
def upload_file(test_data_dir):
"""Фикстура для загрузки файла"""
def _upload_file(filename):
file_path = test_data_dir / filename
if not file_path.exists():
pytest.skip(f"Файл {filename} не найден в {test_data_dir}")
return file_path
return _upload_file
@pytest.fixture(scope="session")
def local_storage():
"""Фикстура для локального storage"""
storage = LocalStorageAdapter("tests/local_storage")
yield storage
# Очищаем storage после всех тестов
storage.clear_all()
@pytest.fixture
def clean_storage(local_storage):
"""Фикстура для очистки storage перед каждым тестом"""
local_storage.clear_all()
yield local_storage
def make_api_request(url, method="GET", data=None, files=None, json_data=None):
"""Универсальная функция для API запросов"""
try:
if method.upper() == "GET":
response = requests.get(url, timeout=30)
elif method.upper() == "POST":
if files:
response = requests.post(url, files=files, timeout=30)
elif json_data:
response = requests.post(url, json=json_data, timeout=30)
else:
response = requests.post(url, data=data, timeout=30)
else:
raise ValueError(f"Неподдерживаемый метод: {method}")
return response
except requests.exceptions.RequestException as e:
pytest.fail(f"Ошибка API запроса: {e}")
@pytest.fixture
def api_request():
"""Фикстура для API запросов"""
return make_api_request

2
tests/requirements.txt Normal file
View File

@@ -0,0 +1,2 @@
pytest>=7.0.0
requests>=2.28.0

View File

@@ -0,0 +1,20 @@
"""
Основной файл для запуска всех тестов API эндпоинтов
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту для импорта модулей
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
if __name__ == "__main__":
# Запуск всех тестов
pytest.main([
__file__.replace("test_all_endpoints.py", ""),
"-v", # подробный вывод
"--tb=short", # короткий traceback
"--color=yes", # цветной вывод
"-x", # остановка на первой ошибке
])

Binary file not shown.

BIN
tests/test_data/pm_plan.zip Normal file

Binary file not shown.

View File

@@ -0,0 +1,339 @@
"""
Тестирование геттеров с данными из локального storage
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from core.services import ReportService, PARSERS
from core.models import DataRequest, UploadRequest
from adapters.local_storage import LocalStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
# Регистрируем парсеры
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
})
class TestGettersWithLocalStorage:
"""Тестирование геттеров с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_svodka_pm_single_og_with_local_data(self, upload_file):
"""Тест svodka_pm single_og с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_pm',
get_params={
'mode': 'single_og',
'id': 'SNPZ',
'codes': [78, 79],
'columns': ['ПП', 'СЭБ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_pm/single_og работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_pm/single_og не работает: {result.message}")
# Не делаем assert, чтобы увидеть все ошибки
def test_svodka_pm_total_ogs_with_local_data(self, upload_file):
"""Тест svodka_pm total_ogs с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_pm',
get_params={
'mode': 'total_ogs',
'codes': [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
'columns': ['БП', 'ПП', 'СЭБ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_pm/total_ogs работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_pm/total_ogs не работает: {result.message}")
def test_svodka_ca_get_ca_data_with_local_data(self, upload_file):
"""Тест svodka_ca get_ca_data с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("svodka_ca.xlsx")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_ca',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='svodka_ca',
get_params={
'mode': 'get_ca_data',
'modes': ['fact', 'plan'],
'tables': ['table1', 'table2']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ svodka_ca/get_ca_data работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ svodka_ca/get_ca_data не работает: {result.message}")
def test_monitoring_fuel_get_total_by_columns_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_total_by_columns с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'total_by_columns',
'columns': ['total', 'normativ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_total_by_columns работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_total_by_columns не работает: {result.message}")
def test_monitoring_fuel_get_month_by_code_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_month_by_code с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'month_by_code',
'month': '02'
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_month_by_code работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_month_by_code не работает: {result.message}")
def test_monitoring_fuel_get_series_by_id_and_columns_with_local_data(self, upload_file):
"""Тест monitoring_fuel get_series_by_id_and_columns с данными из локального storage"""
# Сначала загружаем данные
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
upload_result = self.report_service.upload_report(request)
assert upload_result.success is True, f"Загрузка не удалась: {upload_result.message}"
# Теперь тестируем геттер
data_request = DataRequest(
report_type='monitoring_fuel',
get_params={
'mode': 'series_by_id_and_columns',
'columns': ['total', 'normativ']
}
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"✅ monitoring_fuel/get_series_by_id_and_columns работает с локальными данными")
print(f" Получено данных: {len(result.data) if isinstance(result.data, list) else 'не список'}")
else:
print(f"❌ monitoring_fuel/get_series_by_id_and_columns не работает: {result.message}")
def test_all_getters_with_loaded_data(self, upload_file):
"""Тест всех геттеров с предварительно загруженными данными"""
# Загружаем все данные
files_to_upload = [
("svodka_ca.xlsx", "svodka_ca", "file"),
("pm_plan.zip", "svodka_pm", "zip"),
("monitoring.zip", "monitoring_fuel", "zip")
]
for filename, report_type, upload_type in files_to_upload:
file_path = upload_file(filename)
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
upload_request = UploadRequest(
report_type=report_type,
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
result = self.report_service.upload_report(upload_request)
assert result.success is True, f"Загрузка {filename} не удалась: {result.message}"
print(f"{filename} загружен")
# Тестируем все геттеры
test_cases = [
# svodka_pm
{
'report_type': 'svodka_pm',
'mode': 'single_og',
'params': {'id': 'SNPZ', 'codes': [78, 79], 'columns': ['ПП', 'СЭБ']},
'name': 'svodka_pm/single_og'
},
{
'report_type': 'svodka_pm',
'mode': 'total_ogs',
'params': {'codes': [78, 79, 394, 395, 396, 397, 81, 82, 83, 84], 'columns': ['БП', 'ПП', 'СЭБ']},
'name': 'svodka_pm/total_ogs'
},
# svodka_ca
{
'report_type': 'svodka_ca',
'mode': 'get_ca_data',
'params': {'modes': ['fact', 'plan'], 'tables': ['table1', 'table2']},
'name': 'svodka_ca/get_ca_data'
},
# monitoring_fuel
{
'report_type': 'monitoring_fuel',
'mode': 'total_by_columns',
'params': {'columns': ['total', 'normativ']},
'name': 'monitoring_fuel/get_total_by_columns'
},
{
'report_type': 'monitoring_fuel',
'mode': 'month_by_code',
'params': {'month': '02'},
'name': 'monitoring_fuel/get_month_by_code'
},
{
'report_type': 'monitoring_fuel',
'mode': 'series_by_id_and_columns',
'params': {'columns': ['total', 'normativ']},
'name': 'monitoring_fuel/get_series_by_id_and_columns'
}
]
print("\n🧪 Тестирование всех геттеров с локальными данными:")
for test_case in test_cases:
request_params = test_case['params'].copy()
request_params['mode'] = test_case['mode']
data_request = DataRequest(
report_type=test_case['report_type'],
get_params=request_params
)
result = self.report_service.get_data(data_request)
if result.success:
print(f"{test_case['name']}: работает")
else:
print(f"{test_case['name']}: {result.message}")
# Показываем содержимое storage
objects = self.storage.list_objects()
print(f"\n📊 Объекты в локальном storage: {len(objects)}")
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
if metadata:
print(f" 📁 {obj_id}: {metadata['shape']} колонки: {metadata['columns'][:3]}...")

View File

@@ -0,0 +1,102 @@
"""
Тесты для monitoring_fuel эндпоинтов
"""
import pytest
import requests
class TestMonitoringFuelEndpoints:
"""Тесты эндпоинтов monitoring_fuel"""
def test_monitoring_fuel_get_total_by_columns(self, wait_for_api, api_base_url):
"""Тест получения данных по колонкам и расчёт средних значений"""
# Пример из схемы MonitoringFuelTotalRequest
data = {
"columns": ["total", "normativ"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_total_by_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_total_by_columns работает: получены данные для колонок {data['columns']}")
def test_monitoring_fuel_get_month_by_code(self, wait_for_api, api_base_url):
"""Тест получения данных за месяц"""
# Пример из схемы MonitoringFuelMonthRequest
data = {
"month": "02"
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_month_by_code", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_month_by_code работает: получены данные за месяц {data['month']}")
def test_monitoring_fuel_get_series_by_id_and_columns(self, wait_for_api, api_base_url):
"""Тест получения временных рядов по ID и колонкам"""
# Пример из схемы MonitoringFuelSeriesRequest
data = {
"columns": ["total", "normativ"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_series_by_id_and_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_series_by_id_and_columns работает: получены временные ряды для колонок {data['columns']}")
def test_monitoring_fuel_get_total_by_columns_single_column(self, wait_for_api, api_base_url):
"""Тест получения данных по одной колонке"""
data = {
"columns": ["total"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_total_by_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_total_by_columns с одной колонкой работает: получены данные для колонки {data['columns'][0]}")
def test_monitoring_fuel_get_month_by_code_different_month(self, wait_for_api, api_base_url):
"""Тест получения данных за другой месяц"""
data = {
"month": "01"
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_month_by_code", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_month_by_code с другим месяцем работает: получены данные за месяц {data['month']}")
def test_monitoring_fuel_get_series_by_id_and_columns_single_column(self, wait_for_api, api_base_url):
"""Тест получения временных рядов по одной колонке"""
data = {
"columns": ["total"]
}
response = requests.post(f"{api_base_url}/monitoring_fuel/get_series_by_id_and_columns", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ monitoring_fuel/get_series_by_id_and_columns с одной колонкой работает: получены временные ряды для колонки {data['columns'][0]}")

View File

@@ -0,0 +1,134 @@
"""
Прямое тестирование парсеров с локальным storage
Этот модуль тестирует парсеры напрямую, без API
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
from core.services import ReportService
from adapters.local_storage import LocalStorageAdapter
class TestParsersDirect:
"""Прямое тестирование парсеров с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_svodka_pm_parser_registration(self):
"""Тест регистрации парсера svodka_pm"""
parser = SvodkaPMParser()
getters = parser.get_available_getters()
assert "single_og" in getters
assert "total_ogs" in getters
# Проверяем параметры геттеров
single_og_getter = getters["single_og"]
assert "id" in single_og_getter["required_params"]
assert "codes" in single_og_getter["required_params"]
assert "columns" in single_og_getter["required_params"]
assert "search" in single_og_getter["optional_params"]
total_ogs_getter = getters["total_ogs"]
assert "codes" in total_ogs_getter["required_params"]
assert "columns" in total_ogs_getter["required_params"]
assert "search" in total_ogs_getter["optional_params"]
print("✅ svodka_pm парсер зарегистрирован корректно")
def test_svodka_ca_parser_registration(self):
"""Тест регистрации парсера svodka_ca"""
parser = SvodkaCAParser()
getters = parser.get_available_getters()
assert "get_ca_data" in getters
# Проверяем параметры геттера
getter = getters["get_ca_data"]
assert "modes" in getter["required_params"]
assert "tables" in getter["required_params"]
print("✅ svodka_ca парсер зарегистрирован корректно")
def test_monitoring_fuel_parser_registration(self):
"""Тест регистрации парсера monitoring_fuel"""
parser = MonitoringFuelParser()
getters = parser.get_available_getters()
assert "total_by_columns" in getters
assert "month_by_code" in getters
assert "series_by_id_and_columns" in getters
# Проверяем параметры геттеров
total_getter = getters["total_by_columns"]
assert "columns" in total_getter["required_params"]
month_getter = getters["month_by_code"]
assert "month" in month_getter["required_params"]
series_getter = getters["series_by_id_and_columns"]
assert "columns" in series_getter["required_params"]
print("✅ monitoring_fuel парсер зарегистрирован корректно")
def test_storage_operations(self):
"""Тест операций с локальным storage"""
import pandas as pd
# Создаем тестовый DataFrame
test_df = pd.DataFrame({
'col1': [1, 2, 3],
'col2': ['a', 'b', 'c']
})
# Сохраняем
success = self.storage.save_dataframe("test_object", test_df)
assert success is True
# Проверяем существование
exists = self.storage.object_exists("test_object")
assert exists is True
# Загружаем
loaded_df = self.storage.load_dataframe("test_object")
assert loaded_df is not None
assert loaded_df.shape == (3, 2)
assert list(loaded_df.columns) == ['col1', 'col2']
# Получаем метаданные
metadata = self.storage.get_object_metadata("test_object")
assert metadata is not None
assert metadata["shape"] == [3, 2]
# Получаем список объектов
objects = self.storage.list_objects()
assert "test_object" in objects
# Удаляем
delete_success = self.storage.delete_object("test_object")
assert delete_success is True
# Проверяем, что объект удален
exists_after = self.storage.object_exists("test_object")
assert exists_after is False
print("✅ Локальный storage работает корректно")
def test_report_service_with_local_storage(self):
"""Тест ReportService с локальным storage"""
# Проверяем, что ReportService может работать с локальным storage
assert self.report_service.storage is not None
assert hasattr(self.report_service.storage, 'save_dataframe')
assert hasattr(self.report_service.storage, 'load_dataframe')
print("✅ ReportService корректно работает с локальным storage")

View File

@@ -0,0 +1,58 @@
"""
Тесты для svodka_ca эндпоинтов
"""
import pytest
import requests
class TestSvodkaCAEndpoints:
"""Тесты эндпоинтов svodka_ca"""
def test_svodka_ca_get_ca_data(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА"""
# Пример из схемы SvodkaCARequest
data = {
"modes": ["fact", "plan"],
"tables": ["table1", "table2"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data работает: получены данные для режимов {data['modes']}")
def test_svodka_ca_get_ca_data_single_mode(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА для одного режима"""
data = {
"modes": ["fact"],
"tables": ["table1"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data с одним режимом работает: получены данные для режима {data['modes'][0]}")
def test_svodka_ca_get_ca_data_multiple_tables(self, wait_for_api, api_base_url):
"""Тест получения данных из сводок СА для нескольких таблиц"""
data = {
"modes": ["fact", "plan"],
"tables": ["table1", "table2", "table3"]
}
response = requests.post(f"{api_base_url}/svodka_ca/get_ca_data", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_ca/get_ca_data с несколькими таблицами работает: получены данные для {len(data['tables'])} таблиц")

View File

@@ -0,0 +1,79 @@
"""
Тесты для svodka_pm эндпоинтов
"""
import pytest
import requests
class TestSvodkaPMEndpoints:
"""Тесты эндпоинтов svodka_pm"""
def test_svodka_pm_single_og(self, wait_for_api, api_base_url):
"""Тест получения данных по одному ОГ"""
# Пример из схемы SvodkaPMSingleOGRequest
data = {
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
response = requests.post(f"{api_base_url}/svodka_pm/single_og", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/single_og работает: получены данные для {data['id']}")
def test_svodka_pm_total_ogs(self, wait_for_api, api_base_url):
"""Тест получения данных по всем ОГ"""
# Пример из схемы SvodkaPMTotalOGsRequest
data = {
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"]
}
response = requests.post(f"{api_base_url}/svodka_pm/get_total_ogs", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/get_total_ogs работает: получены данные по всем ОГ")
def test_svodka_pm_single_og_with_search(self, wait_for_api, api_base_url):
"""Тест получения данных по одному ОГ с параметром search"""
data = {
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"],
"search": "Итого"
}
response = requests.post(f"{api_base_url}/svodka_pm/single_og", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/single_og с search работает: получены данные для {data['id']} с фильтром")
def test_svodka_pm_total_ogs_with_search(self, wait_for_api, api_base_url):
"""Тест получения данных по всем ОГ с параметром search"""
data = {
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"],
"search": "Итого"
}
response = requests.post(f"{api_base_url}/svodka_pm/get_total_ogs", json=data)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Запрос не удался: {result}"
assert "data" in result, "Отсутствует поле 'data' в ответе"
print(f"✅ svodka_pm/get_total_ogs с search работает: получены данные по всем ОГ с фильтром")

View File

@@ -0,0 +1,52 @@
"""
Тесты для эндпоинтов загрузки данных
"""
import pytest
import requests
from pathlib import Path
class TestUploadEndpoints:
"""Тесты эндпоинтов загрузки"""
def test_upload_svodka_ca(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки файла svodka_ca.xlsx"""
file_path = upload_file("svodka_ca.xlsx")
with open(file_path, 'rb') as f:
files = {'file': ('svodka_ca.xlsx', f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')}
response = requests.post(f"{api_base_url}/svodka_ca/upload", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ svodka_ca.xlsx загружен успешно: {result['message']}")
def test_upload_svodka_pm_plan(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки архива pm_plan.zip"""
file_path = upload_file("pm_plan.zip")
with open(file_path, 'rb') as f:
files = {'zip_file': ('pm_plan.zip', f, 'application/zip')}
response = requests.post(f"{api_base_url}/svodka_pm/upload-zip", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ pm_plan.zip загружен успешно: {result['message']}")
def test_upload_monitoring_fuel(self, wait_for_api, upload_file, api_base_url):
"""Тест загрузки архива monitoring.zip"""
file_path = upload_file("monitoring.zip")
with open(file_path, 'rb') as f:
files = {'zip_file': ('monitoring.zip', f, 'application/zip')}
response = requests.post(f"{api_base_url}/monitoring_fuel/upload-zip", files=files)
assert response.status_code == 200, f"Ожидался статус 200, получен {response.status_code}: {response.text}"
result = response.json()
assert result["success"] is True, f"Загрузка не удалась: {result}"
print(f"✅ monitoring.zip загружен успешно: {result['message']}")

View File

@@ -0,0 +1,183 @@
"""
Тестирование загрузки файлов с сохранением в локальный storage
"""
import pytest
import sys
from pathlib import Path
# Добавляем путь к проекту
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "python_parser"))
from core.services import ReportService, PARSERS
from core.models import UploadRequest
from adapters.local_storage import LocalStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
# Регистрируем парсеры
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
})
class TestUploadWithLocalStorage:
"""Тестирование загрузки файлов с локальным storage"""
@pytest.fixture(autouse=True)
def setup_storage(self, clean_storage):
"""Настройка локального storage для каждого теста"""
self.storage = clean_storage
self.report_service = ReportService(self.storage)
def test_upload_svodka_ca_to_local_storage(self, upload_file):
"""Тест загрузки svodka_ca.xlsx в локальный storage"""
file_path = upload_file("svodka_ca.xlsx")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_ca',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем файл через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ svodka_ca.xlsx загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_pm_plan_to_local_storage(self, upload_file):
"""Тест загрузки pm_plan.zip в локальный storage"""
file_path = upload_file("pm_plan.zip")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='svodka_pm',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем архив через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ pm_plan.zip загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_monitoring_to_local_storage(self, upload_file):
"""Тест загрузки monitoring.zip в локальный storage"""
file_path = upload_file("monitoring.zip")
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type='monitoring_fuel',
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
# Загружаем архив через ReportService
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка не удалась: {result.message}"
# Проверяем, что данные сохранились в локальном storage
objects = self.storage.list_objects()
assert len(objects) > 0, "Данные не сохранились в storage"
# Проверяем метаданные
for obj_id in objects:
metadata = self.storage.get_object_metadata(obj_id)
assert metadata is not None, f"Метаданные для {obj_id} не найдены"
assert "shape" in metadata, f"Отсутствует shape в метаданных {obj_id}"
assert "columns" in metadata, f"Отсутствуют columns в метаданных {obj_id}"
print(f"✅ monitoring.zip загружен в локальный storage: {len(objects)} объектов")
print(f" Объекты: {objects}")
def test_upload_all_files_sequence(self, upload_file):
"""Тест последовательной загрузки всех файлов"""
# Загружаем все файлы по очереди
files_to_upload = [
("svodka_ca.xlsx", "svodka_ca", "file"),
("pm_plan.zip", "svodka_pm", "zip"),
("monitoring.zip", "monitoring_fuel", "zip")
]
total_objects = 0
for filename, report_type, upload_type in files_to_upload:
file_path = upload_file(filename)
# Читаем файл и создаем UploadRequest
with open(file_path, 'rb') as f:
file_content = f.read()
request = UploadRequest(
report_type=report_type,
file_name=file_path.name,
file_content=file_content,
parse_params={}
)
result = self.report_service.upload_report(request)
assert result.success is True, f"Загрузка {filename} не удалась: {result.message}"
# Подсчитываем объекты
objects = self.storage.list_objects()
current_count = len(objects)
print(f"{filename} загружен: {current_count - total_objects} новых объектов")
total_objects = current_count
# Проверяем итоговое количество объектов
final_objects = self.storage.list_objects()
assert len(final_objects) > 0, "Ни один файл не был загружен"
print(f"Все файлы загружены. Итого объектов в storage: {len(final_objects)}")
print(f" Все объекты: {final_objects}")
# Выводим детальную информацию о каждом объекте
for obj_id in final_objects:
metadata = self.storage.get_object_metadata(obj_id)
if metadata:
print(f" 📊 {obj_id}: {metadata['shape']} колонки: {metadata['columns'][:5]}...")