13 Commits

21 changed files with 1635 additions and 355 deletions

20
.gitignore vendored
View File

@@ -1,5 +1,23 @@
# Python
__pycache__
__pycache__/
python_parser/__pycache__/
python_parser/core/__pycache__/
python_parser/adapters/__pycache__/
python_parser/tests/__pycache__/
python_parser/tests/test_core/__pycache__/
python_parser/tests/test_adapters/__pycache__/
python_parser/tests/test_app/__pycache__/
python_parser/app/__pycache__/
python_parser/app/schemas/__pycache__/
python_parser/app/schemas/test_schemas/__pycache__/
python_parser/app/schemas/test_schemas/test_core/__pycache__/
python_parser/app/schemas/test_schemas/test_adapters/__pycache__/
python_parser/app/schemas/test_schemas/test_app/__pycache__/
nin_python_parser
*.pyc
*.py[cod]
*$py.class
*.so
@@ -153,3 +171,5 @@ node_modules/
npm-debug.log*
yarn-debug.log*
yarn-error.log*
__pycache__/

View File

@@ -14,7 +14,7 @@ services:
restart: unless-stopped
fastapi:
build: ./python_parser
image: python:3.11-slim
container_name: svodka_fastapi_dev
ports:
- "8000:8000"
@@ -24,9 +24,20 @@ services:
- MINIO_SECRET_KEY=minioadmin
- MINIO_SECURE=false
- MINIO_BUCKET=svodka-data
volumes:
# Монтируем исходный код для автоматической перезагрузки
- ./python_parser:/app
# Монтируем requirements.txt для установки зависимостей
- ./python_parser/requirements.txt:/app/requirements.txt
working_dir: /app
depends_on:
- minio
restart: unless-stopped
command: >
bash -c "
pip install --no-cache-dir -r requirements.txt &&
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
"
streamlit:
image: python:3.11-slim

View File

@@ -0,0 +1,135 @@
# Интеграция схем Pydantic с парсерами
## Обзор
Этот документ описывает решение для устранения дублирования логики между схемами Pydantic и парсерами. Теперь схемы Pydantic являются единым источником правды для определения параметров парсеров.
## Проблема
Ранее в парсерах дублировалась информация о параметрах:
```python
# В парсере
self.register_getter(
name="single_og",
method=self._get_single_og,
required_params=["id", "codes", "columns"], # Дублирование
optional_params=["search"], # Дублирование
description="Получение данных по одному ОГ"
)
# В схеме
class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field(...) # Обязательное поле
codes: List[int] = Field(...) # Обязательное поле
columns: List[str] = Field(...) # Обязательное поле
search: Optional[str] = Field(None) # Необязательное поле
```
## Решение
### 1. Утилиты для работы со схемами
Создан модуль `core/schema_utils.py` с функциями:
- `get_required_fields_from_schema()` - извлекает обязательные поля
- `get_optional_fields_from_schema()` - извлекает необязательные поля
- `register_getter_from_schema()` - регистрирует геттер с использованием схемы
- `validate_params_with_schema()` - валидирует параметры с помощью схемы
### 2. Обновленные парсеры
Теперь парсеры используют схемы как единый источник правды:
```python
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
```
### 3. Валидация параметров
Методы геттеров теперь автоматически валидируют параметры:
```python
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# ... остальная логика
```
## Преимущества
1. **Единый источник правды** - информация о параметрах хранится только в схемах Pydantic
2. **Автоматическая валидация** - параметры автоматически валидируются с помощью Pydantic
3. **Синхронизация** - изменения в схемах автоматически отражаются в парсерах
4. **Типобезопасность** - использование типов Pydantic обеспечивает типобезопасность
5. **Документация** - Swagger документация автоматически генерируется из схем
## Совместимость
Решение работает с:
- Pydantic v1 (через `__fields__`)
- Pydantic v2 (через `model_fields` и `is_required()`)
## Использование
### Для новых парсеров
1. Создайте схему Pydantic с нужными полями
2. Используйте `register_getter_from_schema()` для регистрации геттера
3. Используйте `validate_params_with_schema()` в методах геттеров
### Для существующих парсеров
1. Убедитесь, что у вас есть соответствующая схема Pydantic
2. Замените ручную регистрацию геттеров на `register_getter_from_schema()`
3. Добавьте валидацию параметров в методы геттеров
## Примеры
### Схема с обязательными и необязательными полями
```python
class ExampleRequest(BaseModel):
required_field: str = Field(..., description="Обязательное поле")
optional_field: Optional[str] = Field(None, description="Необязательное поле")
```
### Регистрация геттера
```python
register_getter_from_schema(
parser_instance=self,
getter_name="example_getter",
method=self._example_method,
schema_class=ExampleRequest,
description="Пример геттера"
)
```
### Валидация в методе
```python
def _example_method(self, params: dict):
validated_params = validate_params_with_schema(params, ExampleRequest)
# validated_params содержит валидированные данные
```
## Заключение
Это решение устраняет дублирование кода и обеспечивает единообразие между API схемами и парсерами. Теперь изменения в схемах автоматически отражаются в парсерах, что упрощает поддержку и развитие системы.

View File

@@ -0,0 +1,88 @@
# Парсер Сводки ПМ
## Описание
Парсер для обработки сводок ПМ (план и факт) с поддержкой множественных геттеров. Наследуется от `ParserPort` и реализует архитектуру hexagonal architecture.
## Доступные геттеры
### 1. `get_single_og`
Получение данных по одному ОГ из сводки ПМ.
**Обязательные параметры:**
- `id` (str): ID ОГ (например, "SNPZ", "KNPZ")
- `codes` (list): Список кодов показателей (например, [78, 79, 81, 82])
- `columns` (list): Список столбцов для извлечения (например, ["ПП", "БП", "СЭБ"])
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"id": "SNPZ",
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_single_og", params)
```
### 2. `get_total_ogs`
Получение данных по всем ОГ из сводки ПМ.
**Обязательные параметры:**
- `codes` (list): Список кодов показателей
- `columns` (list): Список столбцов для извлечения
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_total_ogs", params)
```
## Поддерживаемые столбцы
- **ПП, БП**: Данные из файлов плана
- **ТБ, СЭБ, НЭБ**: Данные из файлов факта
## Структура файлов
Парсер ожидает следующую структуру файлов:
- `data/pm_fact/svodka_fact_pm_{OG_ID}.xlsx` или `.xlsm`
- `data/pm_plan/svodka_plan_pm_{OG_ID}.xlsx` или `.xlsm`
Где `{OG_ID}` - это ID ОГ (например, SNPZ, KNPZ и т.д.)
## Формат результата
Результат возвращается в формате JSON со следующей структурой:
```json
{
"ПП": {
"78": 123.45,
"79": 234.56
},
"БП": {
"78": 111.11,
"79": 222.22
},
"СЭБ": {
"78": 333.33,
"79": 444.44
}
}
```
## Обработка ошибок
- Если файл плана/факта не найден, соответствующие столбцы будут пустыми
- Если код показателя не найден, возвращается 0
- Валидация параметров выполняется автоматически

View File

@@ -1,9 +1,11 @@
import pandas as pd
import re
from typing import Dict
import zipfile
from typing import Dict, Tuple
from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest
from adapters.pconfig import data_to_json
class MonitoringFuelParser(ParserPort):
@@ -11,71 +13,139 @@ class MonitoringFuelParser(ParserPort):
name = "Мониторинг топлива"
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="total_by_columns",
method=self._get_total_by_columns,
schema_class=MonitoringFuelTotalRequest,
description="Агрегация данных по колонкам"
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
register_getter_from_schema(
parser_instance=self,
getter_name="month_by_code",
method=self._get_month_by_code,
schema_class=MonitoringFuelMonthRequest,
description="Получение данных за конкретный месяц"
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
def _get_total_by_columns(self, params: dict):
"""Агрегация данных по колонкам"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelTotalRequest)
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
columns = validated_params["columns"]
new_columns = df_full.columns.tolist()
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
# Агрегируем данные по колонкам
df_means, _ = self.aggregate_by_columns(data_source, columns)
df_full.columns = new_columns
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_means.iterrows():
result[str(idx)] = {}
for col in columns:
value = row.get(col)
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
return result
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def _get_month_by_code(self, params: dict):
"""Получение данных за конкретный месяц"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, MonitoringFuelMonthRequest)
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
month = validated_params["month"]
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
# Получаем данные за конкретный месяц
df_month = self.get_month(data_source, month)
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_month.iterrows():
result[str(idx)] = {}
for col in df_month.columns:
value = row[col]
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
return result
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {}
# Группируем данные по месяцам
for _, row in self.df.iterrows():
month = row.get('month')
data = row.get('data')
if month and data is not None:
data_dict[month] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Парсим данные и сохраняем словарь для использования в геттерах
self.data_dict = self.parse_monitoring_fuel_files(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
if self.data_dict:
# Создаем DataFrame с информацией о месяцах и данных
data_rows = []
for month, df_data in self.data_dict.items():
if df_data is not None and not df_data.empty:
data_rows.append({
'month': month,
'rows_count': len(df_data),
'data': df_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
return self.df
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Парсинг ZIP архива с файлами мониторинга топлива"""
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
@@ -103,7 +173,74 @@ class MonitoringFuelParser(ParserPort):
return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
# Временно используем name как id
df_full['id'] = df_full['name']
else:
# Если нет колонки name, создаем id из индекса
df_full['id'] = df_full.index
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних
@@ -185,22 +322,3 @@ class MonitoringFuelParser(ParserPort):
total.name = 'mean'
return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
json_result = data_to_json(data)
return json_result

View File

@@ -2,17 +2,224 @@ import pandas as pd
import numpy as np
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_ca import SvodkaCARequest
from adapters.pconfig import get_og_by_name
class SvodkaCAParser(ParserPort):
"""Парсер для сводки СА"""
"""Парсер для сводок СА"""
name = "Сводка СА"
name = "Сводки СА"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="get_ca_data",
method=self._get_data_wrapper,
schema_class=SvodkaCARequest,
description="Получение данных по режимам и таблицам"
)
def _get_data_wrapper(self, params: dict):
"""Получение данных по режимам и таблицам"""
print(f"🔍 DEBUG: _get_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaCARequest)
modes = validated_params["modes"]
tables = validated_params["tables"]
print(f"🔍 DEBUG: Запрошенные режимы: {modes}")
print(f"🔍 DEBUG: Запрошенные таблицы: {tables}")
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
print(f"🔍 DEBUG: Используем data_dict с режимами: {list(data_source.keys())}")
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
print(f"🔍 DEBUG: Используем df, преобразованный в data_dict с режимами: {list(data_source.keys())}")
else:
print(f"🔍 DEBUG: Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
return {}
# Фильтруем данные по запрошенным режимам и таблицам
result_data = {}
for mode in modes:
if mode in data_source:
result_data[mode] = {}
available_tables = list(data_source[mode].keys())
print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {available_tables}")
for table_name, table_data in data_source[mode].items():
# Ищем таблицы по частичному совпадению
for requested_table in tables:
if requested_table in table_name:
result_data[mode][table_name] = table_data
print(f"🔍 DEBUG: Добавлена таблица '{table_name}' (совпадение с '{requested_table}') с {len(table_data)} записями")
break # Найдено совпадение, переходим к следующей таблице
else:
print(f"🔍 DEBUG: Режим '{mode}' не найден в data_source")
print(f"🔍 DEBUG: Итоговый результат содержит режимы: {list(result_data.keys())}")
return result_data
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {}
# Группируем данные по режимам и таблицам
for _, row in self.df.iterrows():
mode = row.get('mode')
table = row.get('table')
data = row.get('data')
if mode and table and data is not None:
if mode not in data_dict:
data_dict[mode] = {}
data_dict[mode][table] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
print(f"🔍 DEBUG: SvodkaCAParser.parse вызван с файлом: {file_path}")
# Парсим данные и сохраняем словарь для использования в геттерах
self.data_dict = self.parse_svodka_ca(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
# Создаем простой DataFrame с информацией о загруженных данных
if self.data_dict:
# Создаем DataFrame с информацией о режимах и таблицах
data_rows = []
for mode, tables in self.data_dict.items():
for table_name, table_data in tables.items():
if table_data:
data_rows.append({
'mode': mode,
'table': table_name,
'rows_count': len(table_data),
'data': table_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
print(f"🔍 DEBUG: Создан DataFrame с {len(data_rows)} записями")
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
print(f"🔍 DEBUG: Возвращаем пустой DataFrame")
return self.df
def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
"""Парсинг сводки СА - работает с тремя листами: План, Факт, Норматив"""
print(f"🔍 DEBUG: Начинаем парсинг сводки СА из файла: {file_path}")
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
# Выгружаем План
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan)
print(f"🔍 DEBUG: Объединённый и отсортированный План: {df_ca_plan.shape if df_ca_plan is not None else 'None'}")
# Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact)
print(f"🔍 DEBUG: Объединённый и отсортированный Факт: {df_ca_fact.shape if df_ca_fact is not None else 'None'}")
# Выгружаем Норматив
inclusion_list_normativ = {
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
print(f"🔍 DEBUG: Объединённый и отсортированный Норматив: {df_ca_normativ.shape if df_ca_normativ is not None else 'None'}")
# Преобразуем DataFrame в словарь по режимам и таблицам
data_dict = {}
# Обрабатываем План
if df_ca_plan is not None and not df_ca_plan.empty:
data_dict['plan'] = {}
for table_name, group_df in df_ca_plan.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['plan'][table_name] = table_data.to_dict('records')
# Обрабатываем Факт
if df_ca_fact is not None and not df_ca_fact.empty:
data_dict['fact'] = {}
for table_name, group_df in df_ca_fact.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['fact'][table_name] = table_data.to_dict('records')
# Обрабатываем Норматив
if df_ca_normativ is not None and not df_ca_normativ.empty:
data_dict['normativ'] = {}
for table_name, group_df in df_ca_normativ.groupby('table'):
table_data = group_df.drop('table', axis=1)
data_dict['normativ'][table_name] = table_data.to_dict('records')
print(f"🔍 DEBUG: Итоговый data_dict содержит режимы: {list(data_dict.keys())}")
for mode, tables in data_dict.items():
print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {list(tables.keys())}")
return data_dict
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлекает все таблицы из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
"""Извлечение всех таблиц из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl')
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
@@ -83,8 +290,8 @@ class SvodkaCAParser(ParserPort):
return None
return name.strip()
def parse_sheet(self, file_path, sheet_name, inclusion_list):
"""Собственно функция парсинга отчета СА"""
def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame:
"""Парсинг листа Excel"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
@@ -190,77 +397,6 @@ class SvodkaCAParser(ParserPort):
else:
return None
def parse(self, file_path: str, params: dict) -> dict:
"""Парсинг файла сводки СА"""
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
# Выгружаем План в df_ca_plan
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---")
# Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---")
# Выгружаем План в df_ca_normativ
inclusion_list_normativ = {
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---")
df_dict = {
"plan": df_ca_plan,
"fact": df_ca_fact,
"normativ": df_ca_normativ
}
return df_dict
def data_dict_to_json(self, data_dict):
''' Служебная функция для парсинга словаря в json. '''
def convert_types(obj):
@@ -308,17 +444,3 @@ class SvodkaCAParser(ParserPort):
filtered_df = df[df['table'].isin(table_values)].copy()
result_dict = {key: group for key, group in filtered_df.groupby('table')}
return result_dict
def get_value(self, df: pd.DataFrame, params: dict):
modes = params.get("modes")
tables = params.get("tables")
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
# Собираем данные
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(df, mode, tables)
return self.data_dict_to_json(data_dict)

View File

@@ -0,0 +1,326 @@
import pandas as pd
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort):
"""Парсер для сводок ПМ (план и факт)"""
name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
first_data_row = df_probe.iloc[0]
# Находим столбец с 'INDICATOR_ID'
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
if len(indicator_cols) == 0:
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
if indicator_col_name not in df_full.columns:
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
# Перемещаем INDICATOR_ID в начало и делаем индексом
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
df_full = df_full[cols]
df_full.set_index(indicator_col_name, inplace=True)
# Обрезаем до "Итого" + 1
header_list = [str(h).strip() for h in df_full.columns]
try:
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = []
last_good_name = None
for col in df_final.columns:
col_str = str(col).strip()
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, используем имя по умолчанию
new_columns.append(f"col_{len(new_columns)}")
else:
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Убеждаемся, что количество столбцов совпадает
if len(new_columns) != len(df_final.columns):
# Если количество не совпадает, обрезаем или дополняем
if len(new_columns) > len(df_final.columns):
new_columns = new_columns[:len(df_final.columns)]
else:
# Дополняем недостающие столбцы
while len(new_columns) < len(df_final.columns):
new_columns.append(f"col_{len(new_columns)}")
# Применяем новые заголовки
df_final.columns = new_columns
return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
else:
return numeric_values.iloc[0]
else:
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
result = {}
# Безопасно получаем данные, проверяя их наличие
fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None
plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
col_result = {}
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
return total_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -1,7 +1,14 @@
import pandas as pd
import pandas as pd
import os
import json
import zipfile
import tempfile
import shutil
from typing import Dict, Any, List, Optional
from core.ports import ParserPort
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json
class SvodkaPMParser(ParserPort):
@@ -9,39 +16,141 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ"
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
def __init__(self):
super().__init__()
self._register_default_getters()
def _register_default_getters(self):
"""Регистрация геттеров для Сводки ПМ"""
self.register_getter(
name="single_og",
method=self._get_single_og,
required_params=["id", "codes", "columns"],
optional_params=["search"],
description="Получение данных по одному ОГ из сводки ПМ"
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
self.register_getter(
name="total_ogs",
method=self._get_total_ogs,
required_params=["codes", "columns"],
optional_params=["search"],
description="Получение данных по всем ОГ из сводки ПМ"
)
def parse(self, file_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Парсинг ZIP архива со сводками ПМ и возврат словаря с DataFrame"""
# Проверяем расширение файла
if not file_path.lower().endswith('.zip'):
raise ValueError(f"Ожидается ZIP архив: {file_path}")
# Создаем временную директорию для разархивирования
temp_dir = tempfile.mkdtemp()
try:
# Разархивируем файл
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
print(f"📦 Архив разархивирован в: {temp_dir}")
# Посмотрим, что находится в архиве
print(f"🔍 Содержимое архива:")
for root, dirs, files in os.walk(temp_dir):
level = root.replace(temp_dir, '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f"{subindent}{file}")
# Создаем словари для хранения данных как в оригинале
df_pm_facts = {} # Словарь с данными факта, ключ - ID ОГ
df_pm_plans = {} # Словарь с данными плана, ключ - ID ОГ
# Ищем файлы в архиве (адаптируемся к реальной структуре)
fact_files = []
plan_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.xlsx', '.xlsm')):
full_path = os.path.join(root, file)
if 'fact' in file.lower() or 'факт' in file.lower():
fact_files.append(full_path)
elif 'plan' in file.lower() or 'план' in file.lower():
plan_files.append(full_path)
print(f"📊 Найдено файлов факта: {len(fact_files)}")
print(f"📊 Найдено файлов плана: {len(plan_files)}")
# Обрабатываем найденные файлы
for fact_file in fact_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(fact_file)
# Ищем паттерн типа svodka_fact_pm_SNPZ.xlsm
if 'svodka_fact_pm_' in filename:
og_id = filename.replace('svodka_fact_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка факта: {fact_file} (ОГ: {og_id})')
df_pm_facts[og_id] = self._parse_svodka_pm(fact_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен для {og_id}")
for plan_file in plan_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(plan_file)
# Ищем паттерн типа svodka_plan_pm_SNPZ.xlsm
if 'svodka_plan_pm_' in filename:
og_id = filename.replace('svodka_plan_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка плана: {plan_file} (ОГ: {og_id})')
df_pm_plans[og_id] = self._parse_svodka_pm(plan_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен для {og_id}")
# Инициализируем None для ОГ, для которых файлы не найдены
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
if og_id not in df_pm_facts:
df_pm_facts[og_id] = None
if og_id not in df_pm_plans:
df_pm_plans[og_id] = None
# Возвращаем словарь с данными (как в оригинале)
result = {
'df_pm_facts': df_pm_facts,
'df_pm_plans': df_pm_plans
}
print(f"🎯 Обработано ОГ: {len([k for k, v in df_pm_facts.items() if v is not None])} факт, {len([k for k, v in df_pm_plans.items() if v is not None])} план")
return result
finally:
# Удаляем временную директорию
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов одного ОГ для БП, ПП и факта"""
try:
# Автоопределение header_num, если не передан
if header_num is None:
header_num = find_header_row(file_path, sheet_name, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl' # Явно указываем движок
)
except Exception as e:
raise ValueError(f"Ошибка при чтении файла {file_path}: {str(e)}")
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
@@ -53,15 +162,15 @@ class SvodkaPMParser(ParserPort):
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
index_col=None
index_col=None,
engine='openpyxl' # Явно указываем движок
)
if indicator_col_name not in df_full.columns:
@@ -78,19 +187,18 @@ class SvodkaPMParser(ParserPort):
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
# Удаление полностью пустых столбцов
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
# Обработка заголовков: Unnamed и "Итого" → "Итого"
new_columns = []
last_good_name = None
for col in df_final.columns:
@@ -102,106 +210,149 @@ class SvodkaPMParser(ParserPort):
# Проверяем, начинается ли на "Итого"
if col_str.startswith('Итого'):
current_name = 'Итого'
last_good_name = current_name # обновляем last_good_name
last_good_name = current_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name)
else:
# Имя, полученное из exel
# Имя, полученное из excel
last_good_name = col_str
new_columns.append(col_str)
df_final.columns = new_columns
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
return df_final
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
def _get_svodka_value(self, df_svodka: pd.DataFrame, og_id: str, code: int, search_value: Optional[str] = None):
"""Служебная функция для простой выборке по сводке"""
print(f"🔍 DEBUG: Ищем код '{code}' для ОГ '{og_id}' в DataFrame с {len(df_svodka)} строками")
print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}")
print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}")
print(f"🔍 DEBUG: Доступные столбцы: {list(df_svodka.columns)}")
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, id, code, search_value=None):
''' Служебная функция для простой выборке по сводке '''
row_index = id
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
# Проверяем, есть ли код в индексе
if code not in df_svodka.index:
print(f"⚠️ Код '{code}' не найден в индексе")
return 0
# Получаем позицию строки с кодом
code_row_loc = df_svodka.index.get_loc(code)
print(f"🔍 DEBUG: Код '{code}' в позиции {code_row_loc}")
# Определяем позиции для поиска
if search_value is None:
# Ищем все позиции кроме "Итого" и None (первый столбец с заголовком)
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name != 'Итого' and col_name is not None:
target_positions.append(i)
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Ищем позиции в первой строке, где есть нужное название
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name == search_value:
target_positions.append(i)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
print(f"🔍 DEBUG: Найдены позиции для '{search_value}': {target_positions[:5]}...")
print(f"🔍 DEBUG: Позиции в первой строке: {target_positions[:5]}...")
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Ищем столбцы с названием '{search_value}'")
print(f"🔍 DEBUG: Целевые позиции: {target_positions[:10]}...")
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
if not target_positions:
print(f"⚠️ Позиции '{search_value}' не найдены")
return 0
# Извлекаем значения из найденных позиций
values = []
for pos in target_positions:
# Берем значение из пересечения строки с кодом и позиции столбца
value = df_svodka.iloc[code_row_loc, pos]
# Если это Series, берем первое значение
if isinstance(value, pd.Series):
if len(value) > 0:
# Берем первое не-NaN значение
first_valid = value.dropna().iloc[0] if not value.dropna().empty else 0
values.append(first_valid)
else:
return numeric_values.iloc[0]
values.append(0)
else:
return None
values.append(value)
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения (первые 5): {numeric_values.tolist()[:5]}")
# Попробуем альтернативное преобразование
try:
# Если pandas не может преобразовать, попробуем вручную
manual_values = []
for v in values:
if pd.isna(v) or v is None:
manual_values.append(0)
else:
try:
# Пробуем преобразовать в float
manual_values.append(float(str(v).replace(',', '.')))
except (ValueError, TypeError):
manual_values.append(0)
print(f"🔍 DEBUG: Ручное преобразование (первые 5): {manual_values[:5]}")
numeric_values = pd.Series(manual_values)
except Exception as e:
print(f"⚠️ Ошибка при ручном преобразовании: {e}")
# Используем исходные значения
numeric_values = pd.Series([0 if pd.isna(v) or v is None else v for v in values])
# Агрегация данных (NaN игнорируются)
if search_value is None:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
else:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None):
"""Служебная функция получения данных по одному ОГ"""
result = {}
fact_df = pm_dict['facts'][id]
plan_df = pm_dict['plans'][id]
# Получаем данные из сохраненных словарей (через self.df)
if not hasattr(self, 'df') or self.df is None:
print("❌ Данные не загружены. Сначала загрузите ZIP архив.")
return {col: {str(code): None for code in codes} for col in columns}
# Извлекаем словари из сохраненных данных
df_pm_facts = self.df.get('df_pm_facts', {})
df_pm_plans = self.df.get('df_pm_plans', {})
# Получаем данные для конкретного ОГ
fact_df = df_pm_facts.get(og_id)
plan_df = df_pm_plans.get(og_id)
print(f"🔍 ===== НАЧАЛО ОБРАБОТКИ ОГ {og_id} =====")
print(f"🔍 Коды: {codes}")
print(f"🔍 Столбцы: {columns}")
print(f"🔍 Получены данные для {og_id}: факт={'' if fact_df is not None else ''}, план={'' if plan_df is not None else ''}")
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
@@ -209,67 +360,91 @@ class SvodkaPMParser(ParserPort):
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}")
else:
print(f"🔍 DEBUG: ===== ОБРАБАТЫВАЕМ '{col}' ИЗ ДАННЫХ ПЛАНА =====")
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
print(f"🔍 DEBUG: --- Код {code} для {col} ---")
val = self._get_svodka_value(plan_df, og_id, code, col)
col_result[str(code)] = val
print(f"🔍 DEBUG: ===== ЗАВЕРШИЛИ ОБРАБОТКУ '{col}' =====")
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}")
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
val = self._get_svodka_value(fact_df, og_id, code, col)
col_result[str(code)] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
col_result = {str(code): None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
def _get_single_og(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по одному ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"Некорректный JSON: {e}")
return total_result
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
def get_value(self, df, params):
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
mode = params.get("mode", "total")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = None
if mode == "single":
if not og_id:
raise ValueError("Отсутствует идентификатор ОГ")
data = self.get_svodka_og(df, og_id, codes, columns, search)
elif mode == "total":
data = self.get_svodka_total(df, codes, columns, search)
data = self._get_svodka_og(og_id, codes, columns, search)
json_result = data_to_json(data)
return json_result
def _get_total_ogs(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по всем ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"❌Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
total_result = {}
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
try:
data = self._get_svodka_og(og_id, codes, columns, search)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {og_id}: {e}")
total_result[og_id] = None
json_result = data_to_json(total_result)
return json_result

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
import json
import numpy as np
import pandas as pd
import os
OG_IDS = {
"Комсомольский НПЗ": "KNPZ",
@@ -22,8 +23,37 @@ OG_IDS = {
"Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS",
"Уфанефтехим": "UNH",
"РНПК": "RNPK",
"КмсНПЗ": "KNPZ",
"АНХК": "ANHK",
"НК НПЗ": "NovKuybNPZ",
"КНПЗ": "KuybNPZ",
"СНПЗ": "CyzNPZ",
"Нижневаторское НПО": "NVNPO",
"ПурНП": "PurNP",
}
SINGLE_OGS = [
"KNPZ",
"ANHK",
"AchNPZ",
"BASH",
"UNPZ",
"UNH",
"NOV",
"NovKuybNPZ",
"KuybNPZ",
"CyzNPZ",
"TuapsNPZ",
"SNPZ",
"RNPK",
"NVNPO",
"KLNPZ",
"PurNP",
"YANOS",
]
SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM",
@@ -40,7 +70,18 @@ SNPZ_IDS = {
def replace_id_in_path(file_path, new_id):
return file_path.replace('ID', str(new_id))
# Заменяем 'ID' на новое значение
modified_path = file_path.replace('ID', str(new_id)) + '.xlsx'
# Проверяем, существует ли файл
if not os.path.exists(modified_path):
# Меняем расширение на .xlsm
directory, filename = os.path.split(modified_path)
name, ext = os.path.splitext(filename)
new_filename = name + '.xlsm'
modified_path = os.path.join(directory, new_filename)
return modified_path
def get_table_name(exel):
@@ -109,6 +150,25 @@ def get_id_by_name(name, dictionary):
return best_match
def find_header_row(file, sheet, search_value="Итого", max_rows=50):
''' Определения индекса заголовка в exel по ключевому слову '''
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def data_to_json(data, indent=2, ensure_ascii=False):
"""
Полностью безопасная сериализация данных в JSON.
@@ -153,11 +213,18 @@ def data_to_json(data, indent=2, ensure_ascii=False):
# --- рекурсия по dict и list ---
elif isinstance(obj, dict):
return {
key: convert_obj(value)
for key, value in obj.items()
if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON)
}
# Обрабатываем только значения, ключи оставляем как строки
converted = {}
for k, v in obj.items():
if is_nan_like(k):
continue # ключи не могут быть null в JSON
# Превращаем ключ в строку, но не пытаемся интерпретировать как число
key_str = str(k)
converted[key_str] = convert_obj(v) # только значение проходит через convert_obj
# Если все значения 0.0 — считаем, что данных нет, т.к. возможно ожидается массив.
if converted and all(v == 0.0 for v in converted.values()):
return None
return converted
elif isinstance(obj, list):
return [convert_obj(item) for item in obj]
@@ -175,7 +242,6 @@ def data_to_json(data, indent=2, ensure_ascii=False):
try:
cleaned_data = convert_obj(data)
cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
return cleaned_data
return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")

View File

@@ -323,7 +323,7 @@ async def get_svodka_pm_single_og(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'single'
request_dict['mode'] = 'single_og'
request = DataRequest(
report_type='svodka_pm',
get_params=request_dict
@@ -377,7 +377,7 @@ async def get_svodka_pm_total_ogs(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request_dict['mode'] = 'total_ogs'
request = DataRequest(
report_type='svodka_pm',
get_params=request_dict
@@ -804,7 +804,7 @@ async def get_monitoring_fuel_total_by_columns(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request_dict['mode'] = 'total_by_columns'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
@@ -849,7 +849,7 @@ async def get_monitoring_fuel_month_by_code(
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'month'
request_dict['mode'] = 'month_by_code'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict

View File

@@ -25,7 +25,7 @@ class OGID(str, Enum):
class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field(
id: str = Field(
...,
description="Идентификатор МА для запрашиваемого ОГ",
example="SNPZ"

View File

@@ -0,0 +1,140 @@
"""
Упрощенные утилиты для работы со схемами Pydantic
"""
from typing import List, Dict, Any, Type
from pydantic import BaseModel
import inspect
def get_required_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
"""
Извлекает список обязательных полей из схемы Pydantic
Args:
schema_class: Класс схемы Pydantic
Returns:
Список имен обязательных полей
"""
required_fields = []
# Используем model_fields для Pydantic v2 или __fields__ для v1
if hasattr(schema_class, 'model_fields'):
fields = schema_class.model_fields
else:
fields = schema_class.__fields__
for field_name, field_info in fields.items():
# В Pydantic v2 есть метод is_required()
if hasattr(field_info, 'is_required'):
if field_info.is_required():
required_fields.append(field_name)
elif hasattr(field_info, 'required'):
if field_info.required:
required_fields.append(field_name)
else:
# Fallback для старых версий - проверяем наличие default
has_default = False
if hasattr(field_info, 'default'):
has_default = field_info.default is not ...
elif hasattr(field_info, 'default_factory'):
has_default = field_info.default_factory is not None
if not has_default:
required_fields.append(field_name)
return required_fields
def get_optional_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
"""
Извлекает список необязательных полей из схемы Pydantic
Args:
schema_class: Класс схемы Pydantic
Returns:
Список имен необязательных полей
"""
optional_fields = []
# Используем model_fields для Pydantic v2 или __fields__ для v1
if hasattr(schema_class, 'model_fields'):
fields = schema_class.model_fields
else:
fields = schema_class.__fields__
for field_name, field_info in fields.items():
# В Pydantic v2 есть метод is_required()
if hasattr(field_info, 'is_required'):
if not field_info.is_required():
optional_fields.append(field_name)
elif hasattr(field_info, 'required'):
if not field_info.required:
optional_fields.append(field_name)
else:
# Fallback для старых версий - проверяем наличие default
has_default = False
if hasattr(field_info, 'default'):
has_default = field_info.default is not ...
elif hasattr(field_info, 'default_factory'):
has_default = field_info.default_factory is not None
if has_default:
optional_fields.append(field_name)
return optional_fields
def register_getter_from_schema(parser_instance, getter_name: str, method: callable,
schema_class: Type[BaseModel], description: str = ""):
"""
Регистрирует геттер в парсере, используя схему Pydantic для определения параметров
Args:
parser_instance: Экземпляр парсера
getter_name: Имя геттера
method: Метод для выполнения
schema_class: Класс схемы Pydantic
description: Описание геттера (если не указано, берется из docstring метода)
"""
# Извлекаем параметры из схемы
required_params = get_required_fields_from_schema(schema_class)
optional_params = get_optional_fields_from_schema(schema_class)
# Если описание не указано, берем из docstring метода
if not description:
description = inspect.getdoc(method) or ""
# Регистрируем геттер
parser_instance.register_getter(
name=getter_name,
method=method,
required_params=required_params,
optional_params=optional_params,
description=description
)
def validate_params_with_schema(params: Dict[str, Any], schema_class: Type[BaseModel]) -> Dict[str, Any]:
"""
Валидирует параметры с помощью схемы Pydantic
Args:
params: Словарь параметров
schema_class: Класс схемы Pydantic
Returns:
Валидированные параметры
Raises:
ValidationError: Если параметры не прошли валидацию
"""
try:
# Создаем экземпляр схемы для валидации
validated_data = schema_class(**params)
return validated_data.dict()
except Exception as e:
raise ValueError(f"Ошибка валидации параметров: {str(e)}")

View File

@@ -43,7 +43,7 @@ class ReportService:
try:
# Парсим файл
parse_params = request.parse_params or {}
df = parser.parse(temp_file_path, parse_params)
parse_result = parser.parse(temp_file_path, parse_params)
# Генерируем object_id
object_id = f"nin_excel_data_{request.report_type}"
@@ -54,7 +54,7 @@ class ReportService:
print(f"Старый объект удален: {object_id}")
# Сохраняем в хранилище
if self.storage.save_dataframe(df, object_id):
if self.storage.save_dataframe(parse_result, object_id):
return UploadResult(
success=True,
message="Отчет успешно загружен",
@@ -89,9 +89,9 @@ class ReportService:
message=f"Отчет типа '{request.report_type}' не найден. Возможно, MinIO недоступен или отчет не был загружен."
)
# Загружаем DataFrame из хранилища
df = self.storage.load_dataframe(object_id)
if df is None:
# Загружаем данные из хранилища
loaded_data = self.storage.load_dataframe(object_id)
if loaded_data is None:
return DataResult(
success=False,
message="Ошибка при загрузке данных из хранилища. Возможно, MinIO недоступен."
@@ -100,25 +100,84 @@ class ReportService:
# Получаем парсер
parser = get_parser(request.report_type)
# Устанавливаем DataFrame в парсер для использования в геттерах
parser.df = df
# Устанавливаем данные в парсер для использования в геттерах
parser.df = loaded_data
print(f"🔍 DEBUG: ReportService.get_data - установлены данные в парсер {request.report_type}")
# Проверяем тип загруженных данных
if hasattr(loaded_data, 'shape'):
# Это DataFrame
print(f"🔍 DEBUG: DataFrame shape: {loaded_data.shape}")
print(f"🔍 DEBUG: DataFrame columns: {list(loaded_data.columns) if not loaded_data.empty else 'Empty'}")
elif isinstance(loaded_data, dict):
# Это словарь (для парсера ПМ)
print(f"🔍 DEBUG: Словарь с ключами: {list(loaded_data.keys())}")
else:
print(f"🔍 DEBUG: Неизвестный тип данных: {type(loaded_data)}")
# Получаем параметры запроса
get_params = request.get_params or {}
# Определяем имя геттера (по умолчанию используем первый доступный)
getter_name = get_params.pop("getter", None)
if not getter_name:
# Если геттер не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Геттер не указан, используем первый доступный: {getter_name}")
# Для svodka_ca определяем режим из данных или используем 'fact' по умолчанию
if request.report_type == 'svodka_ca':
# Извлекаем режим из DataFrame или используем 'fact' по умолчанию
if hasattr(parser, 'df') and parser.df is not None and not parser.df.empty:
modes_in_df = parser.df['mode'].unique() if 'mode' in parser.df.columns else ['fact']
# Используем первый найденный режим или 'fact' по умолчанию
default_mode = modes_in_df[0] if len(modes_in_df) > 0 else 'fact'
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
default_mode = 'fact'
# Устанавливаем режим в параметры, если он не указан
if 'mode' not in get_params:
get_params['mode'] = default_mode
# Определяем имя геттера
if request.report_type == 'svodka_ca':
# Для svodka_ca используем геттер get_ca_data
getter_name = 'get_ca_data'
elif request.report_type == 'monitoring_fuel':
# Для monitoring_fuel определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
elif request.report_type == 'svodka_pm':
# Для svodka_pm определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
else:
# Для других парсеров определяем из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
# Получаем значение через указанный геттер
try:

20
python_parser/test_app.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""
Простой тест для проверки работы FastAPI
"""
from fastapi import FastAPI
app = FastAPI(title="Test API")
@app.get("/")
async def root():
return {"message": "Test API is working"}
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
print("Starting test server...")
uvicorn.run(app, host="0.0.0.0", port=8000)