13 Commits

25 changed files with 2577 additions and 357 deletions

2
.gitignore vendored
View File

@@ -15,6 +15,8 @@ python_parser/app/schemas/test_schemas/test_core/__pycache__/
python_parser/app/schemas/test_schemas/test_adapters/__pycache__/ python_parser/app/schemas/test_schemas/test_adapters/__pycache__/
python_parser/app/schemas/test_schemas/test_app/__pycache__/ python_parser/app/schemas/test_schemas/test_app/__pycache__/
nin_python_parser
*.pyc
*.py[cod] *.py[cod]
*$py.class *$py.class

View File

@@ -14,7 +14,7 @@ services:
restart: unless-stopped restart: unless-stopped
fastapi: fastapi:
build: ./python_parser image: python:3.11-slim
container_name: svodka_fastapi_dev container_name: svodka_fastapi_dev
ports: ports:
- "8000:8000" - "8000:8000"
@@ -24,9 +24,20 @@ services:
- MINIO_SECRET_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin
- MINIO_SECURE=false - MINIO_SECURE=false
- MINIO_BUCKET=svodka-data - MINIO_BUCKET=svodka-data
volumes:
# Монтируем исходный код для автоматической перезагрузки
- ./python_parser:/app
# Монтируем requirements.txt для установки зависимостей
- ./python_parser/requirements.txt:/app/requirements.txt
working_dir: /app
depends_on: depends_on:
- minio - minio
restart: unless-stopped restart: unless-stopped
command: >
bash -c "
pip install --no-cache-dir -r requirements.txt &&
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
"
streamlit: streamlit:
image: python:3.11-slim image: python:3.11-slim

View File

@@ -0,0 +1,88 @@
# Парсер Сводки ПМ
## Описание
Парсер для обработки сводок ПМ (план и факт) с поддержкой множественных геттеров. Наследуется от `ParserPort` и реализует архитектуру hexagonal architecture.
## Доступные геттеры
### 1. `get_single_og`
Получение данных по одному ОГ из сводки ПМ.
**Обязательные параметры:**
- `id` (str): ID ОГ (например, "SNPZ", "KNPZ")
- `codes` (list): Список кодов показателей (например, [78, 79, 81, 82])
- `columns` (list): Список столбцов для извлечения (например, ["ПП", "БП", "СЭБ"])
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"id": "SNPZ",
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_single_og", params)
```
### 2. `get_total_ogs`
Получение данных по всем ОГ из сводки ПМ.
**Обязательные параметры:**
- `codes` (list): Список кодов показателей
- `columns` (list): Список столбцов для извлечения
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_total_ogs", params)
```
## Поддерживаемые столбцы
- **ПП, БП**: Данные из файлов плана
- **ТБ, СЭБ, НЭБ**: Данные из файлов факта
## Структура файлов
Парсер ожидает следующую структуру файлов:
- `data/pm_fact/svodka_fact_pm_{OG_ID}.xlsx` или `.xlsm`
- `data/pm_plan/svodka_plan_pm_{OG_ID}.xlsx` или `.xlsm`
Где `{OG_ID}` - это ID ОГ (например, SNPZ, KNPZ и т.д.)
## Формат результата
Результат возвращается в формате JSON со следующей структурой:
```json
{
"ПП": {
"78": 123.45,
"79": 234.56
},
"БП": {
"78": 111.11,
"79": 222.22
},
"СЭБ": {
"78": 333.33,
"79": 444.44
}
}
```
## Обработка ошибок
- Если файл плана/факта не найден, соответствующие столбцы будут пустыми
- Если код показателя не найден, возвращается 0
- Валидация параметров выполняется автоматически

View File

@@ -1,9 +1,13 @@
from .monitoring_fuel import MonitoringFuelParser from .monitoring_fuel import MonitoringFuelParser
from .svodka_ca import SvodkaCAParser from .svodka_ca import SvodkaCAParser
from .svodka_pm import SvodkaPMParser from .svodka_pm import SvodkaPMParser
from .svodka_repair_ca import SvodkaRepairCAParser
from .statuses_repair_ca import StatusesRepairCAParser
__all__ = [ __all__ = [
'MonitoringFuelParser', 'MonitoringFuelParser',
'SvodkaCAParser', 'SvodkaCAParser',
'SvodkaPMParser' 'SvodkaPMParser',
'SvodkaRepairCAParser',
'StatusesRepairCAParser'
] ]

View File

@@ -39,9 +39,31 @@ class MonitoringFuelParser(ParserPort):
columns = validated_params["columns"] columns = validated_params["columns"]
# TODO: Переделать под новую архитектуру # Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
df_means, _ = self.aggregate_by_columns(self.df, columns) if hasattr(self, 'data_dict') and self.data_dict is not None:
return df_means.to_dict(orient='index') # Данные из парсинга
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
# Агрегируем данные по колонкам
df_means, _ = self.aggregate_by_columns(data_source, columns)
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_means.iterrows():
result[str(idx)] = {}
for col in columns:
value = row.get(col)
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
return result
def _get_month_by_code(self, params: dict): def _get_month_by_code(self, params: dict):
"""Получение данных за конкретный месяц""" """Получение данных за конкретный месяц"""
@@ -50,14 +72,73 @@ class MonitoringFuelParser(ParserPort):
month = validated_params["month"] month = validated_params["month"]
# TODO: Переделать под новую архитектуру # Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
df_month = self.get_month(self.df, month) if hasattr(self, 'data_dict') and self.data_dict is not None:
return df_month.to_dict(orient='index') # Данные из парсинга
data_source = self.data_dict
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
else:
return {}
# Получаем данные за конкретный месяц
df_month = self.get_month(data_source, month)
# Преобразуем в JSON-совместимый формат
result = {}
for idx, row in df_month.iterrows():
result[str(idx)] = {}
for col in df_month.columns:
value = row[col]
if pd.isna(value) or value == float('inf') or value == float('-inf'):
result[str(idx)][col] = None
else:
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
return result
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {}
# Группируем данные по месяцам
for _, row in self.df.iterrows():
month = row.get('month')
data = row.get('data')
if month and data is not None:
data_dict[month] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame: def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame""" """Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах # Парсим данные и сохраняем словарь для использования в геттерах
self.df = self.parse_monitoring_fuel_files(file_path, params) self.data_dict = self.parse_monitoring_fuel_files(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
if self.data_dict:
# Создаем DataFrame с информацией о месяцах и данных
data_rows = []
for month, df_data in self.data_dict.items():
if df_data is not None and not df_data.empty:
data_rows.append({
'month': month,
'rows_count': len(df_data),
'data': df_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
return self.df return self.df
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]: def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
@@ -148,7 +229,11 @@ class MonitoringFuelParser(ParserPort):
if 'name' in df_full.columns: if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name' # Применяем функцию get_id_by_name к каждой строке в колонке 'name'
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code # df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
pass # Placeholder for new_code # Временно используем name как id
df_full['id'] = df_full['name']
else:
# Если нет колонки name, создаем id из индекса
df_full['id'] = df_full.index
# Устанавливаем id как индекс # Устанавливаем id как индекс
df_full.set_index('id', inplace=True) df_full.set_index('id', inplace=True)

View File

@@ -0,0 +1,341 @@
import pandas as pd
import os
import tempfile
import zipfile
from typing import Dict, Any, List, Tuple, Optional
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.statuses_repair_ca import StatusesRepairCARequest
from adapters.pconfig import find_header_row, get_og_by_name, data_to_json
class StatusesRepairCAParser(ParserPort):
"""Парсер для статусов ремонта СА"""
name = "Статусы ремонта СА"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
register_getter_from_schema(
parser_instance=self,
getter_name="get_repair_statuses",
method=self._get_repair_statuses_wrapper,
schema_class=StatusesRepairCARequest,
description="Получение статусов ремонта по ОГ и ключам"
)
def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
"""Парсинг файла статусов ремонта СА"""
print(f"🔍 DEBUG: StatusesRepairCAParser.parse вызван с файлом: {file_path}")
try:
# Определяем тип файла
if file_path.endswith('.zip'):
return self._parse_zip_file(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
return self._parse_excel_file(file_path)
else:
raise ValueError(f"Неподдерживаемый формат файла: {file_path}")
except Exception as e:
print(f"❌ Ошибка при парсинге файла {file_path}: {e}")
raise
def _parse_zip_file(self, zip_path: str) -> Dict[str, Any]:
"""Парсинг ZIP архива"""
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем Excel файл в архиве
excel_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith(('.xlsx', '.xls')):
excel_files.append(os.path.join(root, file))
if not excel_files:
raise ValueError("В архиве не найдено Excel файлов")
# Берем первый найденный Excel файл
excel_file = excel_files[0]
print(f"🔍 DEBUG: Найден Excel файл в архиве: {excel_file}")
return self._parse_excel_file(excel_file)
def _parse_excel_file(self, file_path: str) -> Dict[str, Any]:
"""Парсинг Excel файла"""
print(f"🔍 DEBUG: Парсинг Excel файла: {file_path}")
# Парсим данные
df_statuses = self._parse_statuses_repair_ca(file_path, 0)
if df_statuses.empty:
print("⚠️ Нет данных после парсинга")
return {"data": [], "records_count": 0}
# Преобразуем в список словарей для хранения
data_list = self._data_to_structured_json(df_statuses)
result = {
"data": data_list,
"records_count": len(data_list)
}
# Устанавливаем данные в парсер для использования в геттерах
self.data_dict = result
print(f"✅ Парсинг завершен. Получено {len(data_list)} записей")
return result
def _parse_statuses_repair_ca(self, file: str, sheet: int, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов статусов ремонта"""
# === ШАГ 1: Создание MultiIndex ===
columns_level_1 = [
'id',
'ОГ',
'Дата начала ремонта',
'Готовность к КР',
'Отставание / опережение подготовки к КР',
'Заключение договоров на СМР',
'Поставка МТР'
]
sub_columns_cmp = {
'ДВ': ['всего', 'плановая дата', 'факт', '%'],
'Сметы': ['всего', 'плановая дата', 'факт', '%'],
'Формирование лотов': ['всего', 'плановая дата', 'факт', '%'],
'Договор': ['всего', 'плановая дата', 'факт', '%']
}
sub_columns_mtp = {
'Выполнение плана на текущую дату': ['инициирования закупок', 'заключения договоров', 'поставки'],
'На складе, позиций': ['всего', 'поставлено', '%', 'динамика за прошедшую неделю, поз.']
}
# Формируем MultiIndex — ВСЕ кортежи длиной 3
cols = []
for col1 in columns_level_1:
if col1 == 'id':
cols.append((col1, '', ''))
elif col1 == 'ОГ':
cols.append((col1, '', ''))
elif col1 == 'Дата начала ремонта':
cols.append((col1, '', ''))
elif col1 == 'Готовность к КР':
cols.extend([(col1, 'План', ''), (col1, 'Факт', '')])
elif col1 == 'Отставание / опережение подготовки к КР':
cols.extend([
(col1, 'Отставание / опережение', ''),
(col1, 'Динамика за прошедшую неделю', '')
])
elif col1 == 'Заключение договоров на СМР':
for subcol, sub_sub_cols in sub_columns_cmp.items():
for ssc in sub_sub_cols:
cols.append((col1, subcol, ssc))
elif col1 == 'Поставка МТР':
for subcol, sub_sub_cols in sub_columns_mtp.items():
for ssc in sub_sub_cols:
cols.append((col1, subcol, ssc))
else:
cols.append((col1, '', ''))
# Создаем MultiIndex
multi_index = pd.MultiIndex.from_tuples(cols, names=['Level1', 'Level2', 'Level3'])
# === ШАГ 2: Читаем данные из Excel ===
if header_num is None:
header_num = find_header_row(file, sheet, search_value="ОГ")
df_data = pd.read_excel(
file,
skiprows=header_num + 3,
header=None,
index_col=0,
engine='openpyxl'
)
# Убираем строки с пустыми данными
df_data.dropna(how='all', inplace=True)
# Применяем функцию get_og_by_name для 'id'
df_data['id'] = df_data.iloc[:, 0].copy()
df_data['id'] = df_data['id'].apply(get_og_by_name)
# Перемещаем 'id' на первое место
cols = ['id'] + [col for col in df_data.columns if col != 'id']
df_data = df_data[cols]
# Удаляем строки с пустым id
df_data = df_data.dropna(subset=['id'])
df_data = df_data[df_data['id'].astype(str).str.strip() != '']
# Сбрасываем индекс
df_data = df_data.reset_index(drop=True)
# Выбираем 4-ю колонку (индекс 3) для фильтрации
col_index = 3
numeric_series = pd.to_numeric(df_data.iloc[:, col_index], errors='coerce')
# Фильтруем: оставляем только строки, где значение — число
mask = pd.notna(numeric_series)
df_data = df_data[mask].copy()
# === ШАГ 3: Применяем MultiIndex к данным ===
df_data.columns = multi_index
return df_data
def _data_to_structured_json(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Преобразование DataFrame с MultiIndex в структурированный JSON"""
if df.empty:
return []
result_list = []
for idx, row in df.iterrows():
result = {}
for col in df.columns:
value = row[col]
# Пропускаем NaN
if pd.isna(value):
value = None
# Распаковываем уровни
level1, level2, level3 = col
# Убираем пустые/неинформативные значения
level1 = str(level1).strip() if level1 else ""
level2 = str(level2).strip() if level2 else None
level3 = str(level3).strip() if level3 else None
# Обработка id и ОГ — выносим на верх
if level1 == "id":
result["id"] = value
elif level1 == "ОГ":
result["name"] = value
else:
# Группируем по Level1
if level1 not in result:
result[level1] = {}
# Вложенные уровни
if level2 and level3:
if level2 not in result[level1]:
result[level1][level2] = {}
result[level1][level2][level3] = value
elif level2:
result[level1][level2] = value
else:
result[level1] = value
result_list.append(result)
return result_list
def _get_repair_statuses_wrapper(self, params: dict):
"""Обертка для получения статусов ремонта"""
print(f"🔍 DEBUG: _get_repair_statuses_wrapper вызван с параметрами: {params}")
# Валидация параметров
validated_params = validate_params_with_schema(params, StatusesRepairCARequest)
ids = validated_params.get('ids')
keys = validated_params.get('keys')
print(f"🔍 DEBUG: Запрошенные ОГ: {ids}")
print(f"🔍 DEBUG: Запрошенные ключи: {keys}")
# Получаем данные из парсера
if hasattr(self, 'df') and self.df is not None:
# Данные загружены из MinIO
if isinstance(self.df, dict):
# Это словарь (как в других парсерах)
data_source = self.df.get('data', [])
elif hasattr(self.df, 'columns') and 'data' in self.df.columns:
# Это DataFrame
data_source = []
for _, row in self.df.iterrows():
if row['data']:
data_source.extend(row['data'])
else:
data_source = []
elif hasattr(self, 'data_dict') and self.data_dict:
# Данные из локального парсинга
data_source = self.data_dict.get('data', [])
else:
print("⚠️ Нет данных в парсере")
return []
print(f"🔍 DEBUG: Используем данные с {len(data_source)} записями")
# Фильтруем данные
filtered_data = self._filter_statuses_data(data_source, ids, keys)
print(f"🔍 DEBUG: Отфильтровано {len(filtered_data)} записей")
return filtered_data
def _filter_statuses_data(self, data_source: List[Dict], ids: Optional[List[str]], keys: Optional[List[List[str]]]) -> List[Dict]:
"""Фильтрация данных по ОГ и ключам"""
if not data_source:
return []
# Если не указаны фильтры, возвращаем все данные
if not ids and not keys:
return data_source
filtered_data = []
for item in data_source:
# Фильтр по ОГ
if ids is not None:
item_id = item.get('id')
if item_id not in ids:
continue
# Если указаны ключи, извлекаем только нужные поля
if keys is not None:
filtered_item = self._extract_keys_from_item(item, keys)
if filtered_item:
filtered_data.append(filtered_item)
else:
filtered_data.append(item)
return filtered_data
def _extract_keys_from_item(self, item: Dict[str, Any], keys: List[List[str]]) -> Dict[str, Any]:
"""Извлечение указанных ключей из элемента"""
result = {}
# Всегда добавляем id и name
if 'id' in item:
result['id'] = item['id']
if 'name' in item:
result['name'] = item['name']
# Извлекаем указанные ключи
for key_path in keys:
if not key_path:
continue
value = item
for key in key_path:
if isinstance(value, dict) and key in value:
value = value[key]
else:
value = None
break
if value is not None:
# Строим вложенную структуру
current = result
for i, key in enumerate(key_path):
if i == len(key_path) - 1:
current[key] = value
else:
if key not in current:
current[key] = {}
current = current[key]
return result

View File

@@ -17,7 +17,7 @@ class SvodkaCAParser(ParserPort):
# Используем схемы Pydantic как единый источник правды # Используем схемы Pydantic как единый источник правды
register_getter_from_schema( register_getter_from_schema(
parser_instance=self, parser_instance=self,
getter_name="get_data", getter_name="get_ca_data",
method=self._get_data_wrapper, method=self._get_data_wrapper,
schema_class=SvodkaCARequest, schema_class=SvodkaCARequest,
description="Получение данных по режимам и таблицам" description="Получение данных по режимам и таблицам"
@@ -25,134 +25,197 @@ class SvodkaCAParser(ParserPort):
def _get_data_wrapper(self, params: dict): def _get_data_wrapper(self, params: dict):
"""Получение данных по режимам и таблицам""" """Получение данных по режимам и таблицам"""
print(f"🔍 DEBUG: _get_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры с помощью схемы Pydantic # Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaCARequest) validated_params = validate_params_with_schema(params, SvodkaCARequest)
modes = validated_params["modes"] modes = validated_params["modes"]
tables = validated_params["tables"] tables = validated_params["tables"]
# TODO: Переделать под новую архитектуру print(f"🔍 DEBUG: Запрошенные режимы: {modes}")
data_dict = {} print(f"🔍 DEBUG: Запрошенные таблицы: {tables}")
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
print(f"🔍 DEBUG: Используем data_dict с режимами: {list(data_source.keys())}")
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
print(f"🔍 DEBUG: Используем df, преобразованный в data_dict с режимами: {list(data_source.keys())}")
else:
print(f"🔍 DEBUG: Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
return {}
# Фильтруем данные по запрошенным режимам и таблицам
result_data = {}
for mode in modes: for mode in modes:
data_dict[mode] = self.get_data(self.df, mode, tables) if mode in data_source:
return self.data_dict_to_json(data_dict) result_data[mode] = {}
available_tables = list(data_source[mode].keys())
print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {available_tables}")
for table_name, table_data in data_source[mode].items():
# Ищем таблицы по частичному совпадению
for requested_table in tables:
if requested_table in table_name:
result_data[mode][table_name] = table_data
print(f"🔍 DEBUG: Добавлена таблица '{table_name}' (совпадение с '{requested_table}') с {len(table_data)} записями")
break # Найдено совпадение, переходим к следующей таблице
else:
print(f"🔍 DEBUG: Режим '{mode}' не найден в data_source")
print(f"🔍 DEBUG: Итоговый результат содержит режимы: {list(result_data.keys())}")
return result_data
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return {}
data_dict = {}
# Группируем данные по режимам и таблицам
for _, row in self.df.iterrows():
mode = row.get('mode')
table = row.get('table')
data = row.get('data')
if mode and table and data is not None:
if mode not in data_dict:
data_dict[mode] = {}
data_dict[mode][table] = data
return data_dict
def parse(self, file_path: str, params: dict) -> pd.DataFrame: def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame""" """Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах print(f"🔍 DEBUG: SvodkaCAParser.parse вызван с файлом: {file_path}")
self.df = self.parse_svodka_ca(file_path, params)
# Парсим данные и сохраняем словарь для использования в геттерах
self.data_dict = self.parse_svodka_ca(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
# Создаем простой DataFrame с информацией о загруженных данных
if self.data_dict:
# Создаем DataFrame с информацией о режимах и таблицах
data_rows = []
for mode, tables in self.data_dict.items():
for table_name, table_data in tables.items():
if table_data:
data_rows.append({
'mode': mode,
'table': table_name,
'rows_count': len(table_data),
'data': table_data
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
print(f"🔍 DEBUG: Создан DataFrame с {len(data_rows)} записями")
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
print(f"🔍 DEBUG: Возвращаем пустой DataFrame")
return self.df return self.df
def parse_svodka_ca(self, file_path: str, params: dict) -> dict: def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
"""Парсинг сводки СА""" """Парсинг сводки СА - работает с тремя листами: План, Факт, Норматив"""
# Получаем параметры из params print(f"🔍 DEBUG: Начинаем парсинг сводки СА из файла: {file_path}")
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'})
# === Извлечение и фильтрация === # === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
tables = self.extract_all_tables(file_path, sheet_name)
# Выгружаем План
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan)
filtered_tables = [] print(f"🔍 DEBUG: Объединённый и отсортированный План: {df_ca_plan.shape if df_ca_plan is not None else 'None'}")
for table in tables:
if table.empty:
continue
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
if any(val in inclusion_list for val in first_row_values):
filtered_tables.append(table)
tables = filtered_tables # Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
# === Итоговый список таблиц датафреймов === df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact)
result_list = [] print(f"🔍 DEBUG: Объединённый и отсортированный Факт: {df_ca_fact.shape if df_ca_fact is not None else 'None'}")
for table in tables: # Выгружаем Норматив
if table.empty: inclusion_list_normativ = {
continue "Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
# Получаем первую строку (до удаления) df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
first_row_values = table.iloc[0].astype(str).str.strip().tolist() print(f"🔍 DEBUG: Объединённый и отсортированный Норматив: {df_ca_normativ.shape if df_ca_normativ is not None else 'None'}")
# Находим, какой элемент из inclusion_list присутствует # Преобразуем DataFrame в словарь по режимам и таблицам
matched_key = None data_dict = {}
for val in first_row_values:
if val in inclusion_list: # Обрабатываем План
matched_key = val if df_ca_plan is not None and not df_ca_plan.empty:
break # берём первый совпадающий заголовок data_dict['plan'] = {}
for table_name, group_df in df_ca_plan.groupby('table'):
if matched_key is None: table_data = group_df.drop('table', axis=1)
continue # на всякий случай (хотя уже отфильтровано) data_dict['plan'][table_name] = table_data.to_dict('records')
# Удаляем первую строку (заголовок) и сбрасываем индекс # Обрабатываем Факт
df_cleaned = table.iloc[1:].copy().reset_index(drop=True) if df_ca_fact is not None and not df_ca_fact.empty:
data_dict['fact'] = {}
# Пропускаем, если таблица пустая for table_name, group_df in df_ca_fact.groupby('table'):
if df_cleaned.empty: table_data = group_df.drop('table', axis=1)
continue data_dict['fact'][table_name] = table_data.to_dict('records')
# Первая строка становится заголовком # Обрабатываем Норматив
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов if df_ca_normativ is not None and not df_ca_normativ.empty:
data_dict['normativ'] = {}
# Преобразуем заголовок: только первый столбец может быть заменён на "name" for table_name, group_df in df_ca_normativ.groupby('table'):
cleaned_header = [] table_data = group_df.drop('table', axis=1)
data_dict['normativ'][table_name] = table_data.to_dict('records')
# Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0] print(f"🔍 DEBUG: Итоговый data_dict содержит режимы: {list(data_dict.keys())}")
first_item_str = str(first_item).strip() if pd.notna(first_item) else "" for mode, tables in data_dict.items():
if first_item_str == "" or first_item_str == "nan": print(f"🔍 DEBUG: Режим '{mode}' содержит таблицы: {list(tables.keys())}")
cleaned_header.append("name")
else: return data_dict
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def extract_all_tables(self, file_path, sheet_name=0): def extract_all_tables(self, file_path, sheet_name=0):
"""Извлечение всех таблиц из Excel файла""" """Извлечение всех таблиц из Excel файла"""

View File

@@ -0,0 +1,326 @@
import pandas as pd
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort):
"""Парсер для сводок ПМ (план и факт)"""
name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
first_data_row = df_probe.iloc[0]
# Находим столбец с 'INDICATOR_ID'
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
if len(indicator_cols) == 0:
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
if indicator_col_name not in df_full.columns:
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
# Перемещаем INDICATOR_ID в начало и делаем индексом
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
df_full = df_full[cols]
df_full.set_index(indicator_col_name, inplace=True)
# Обрезаем до "Итого" + 1
header_list = [str(h).strip() for h in df_full.columns]
try:
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = []
last_good_name = None
for col in df_final.columns:
col_str = str(col).strip()
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, используем имя по умолчанию
new_columns.append(f"col_{len(new_columns)}")
else:
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Убеждаемся, что количество столбцов совпадает
if len(new_columns) != len(df_final.columns):
# Если количество не совпадает, обрезаем или дополняем
if len(new_columns) > len(df_final.columns):
new_columns = new_columns[:len(df_final.columns)]
else:
# Дополняем недостающие столбцы
while len(new_columns) < len(df_final.columns):
new_columns.append(f"col_{len(new_columns)}")
# Применяем новые заголовки
df_final.columns = new_columns
return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
else:
return numeric_values.iloc[0]
else:
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
result = {}
# Безопасно получаем данные, проверяя их наличие
fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None
plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
col_result = {}
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
return total_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -1,9 +1,14 @@
import pandas as pd
import pandas as pd
import os
import json
import zipfile
import tempfile
import shutil
from typing import Dict, Any, List, Optional
from core.ports import ParserPort from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort): class SvodkaPMParser(ParserPort):
@@ -11,91 +16,140 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ" name = "Сводки ПМ"
def __init__(self):
super().__init__()
self._register_default_getters()
def _register_default_getters(self): def _register_default_getters(self):
"""Регистрация геттеров по умолчанию""" """Регистрация геттеров для Сводки ПМ"""
# Используем схемы Pydantic как единый источник правды self.register_getter(
register_getter_from_schema( name="single_og",
parser_instance=self,
getter_name="single_og",
method=self._get_single_og, method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest, required_params=["id", "codes", "columns"],
description="Получение данных по одному ОГ" optional_params=["search"],
description="Получение данных по одному ОГ из сводки ПМ"
) )
register_getter_from_schema( self.register_getter(
parser_instance=self, name="total_ogs",
getter_name="total_ogs",
method=self._get_total_ogs, method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest, required_params=["codes", "columns"],
description="Получение данных по всем ОГ" optional_params=["search"],
description="Получение данных по всем ОГ из сводки ПМ"
) )
def _get_single_og(self, params: dict): def parse(self, file_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Получение данных по одному ОГ""" """Парсинг ZIP архива со сводками ПМ и возврат словаря с DataFrame"""
# Валидируем параметры с помощью схемы Pydantic # Проверяем расширение файла
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest) if not file_path.lower().endswith('.zip'):
raise ValueError(f"Ожидается ZIP архив: {file_path}")
og_id = validated_params["id"] # Создаем временную директорию для разархивирования
codes = validated_params["codes"] temp_dir = tempfile.mkdtemp()
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику try:
# TODO: Переделать под новую архитектуру # Разархивируем файл
return self.get_svodka_og(self.df, og_id, codes, columns, search) with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
print(f"📦 Архив разархивирован в: {temp_dir}")
# Посмотрим, что находится в архиве
print(f"🔍 Содержимое архива:")
for root, dirs, files in os.walk(temp_dir):
level = root.replace(temp_dir, '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f"{subindent}{file}")
# Создаем словари для хранения данных как в оригинале
df_pm_facts = {} # Словарь с данными факта, ключ - ID ОГ
df_pm_plans = {} # Словарь с данными плана, ключ - ID ОГ
# Ищем файлы в архиве (адаптируемся к реальной структуре)
fact_files = []
plan_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.xlsx', '.xlsm')):
full_path = os.path.join(root, file)
if 'fact' in file.lower() or 'факт' in file.lower():
fact_files.append(full_path)
elif 'plan' in file.lower() or 'план' in file.lower():
plan_files.append(full_path)
print(f"📊 Найдено файлов факта: {len(fact_files)}")
print(f"📊 Найдено файлов плана: {len(plan_files)}")
# Обрабатываем найденные файлы
for fact_file in fact_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(fact_file)
# Ищем паттерн типа svodka_fact_pm_SNPZ.xlsm
if 'svodka_fact_pm_' in filename:
og_id = filename.replace('svodka_fact_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка факта: {fact_file} (ОГ: {og_id})')
df_pm_facts[og_id] = self._parse_svodka_pm(fact_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен для {og_id}")
for plan_file in plan_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(plan_file)
# Ищем паттерн типа svodka_plan_pm_SNPZ.xlsm
if 'svodka_plan_pm_' in filename:
og_id = filename.replace('svodka_plan_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка плана: {plan_file} (ОГ: {og_id})')
df_pm_plans[og_id] = self._parse_svodka_pm(plan_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен для {og_id}")
# Инициализируем None для ОГ, для которых файлы не найдены
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
if og_id not in df_pm_facts:
df_pm_facts[og_id] = None
if og_id not in df_pm_plans:
df_pm_plans[og_id] = None
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ""" # Возвращаем словарь с данными (как в оригинале)
# Валидируем параметры с помощью схемы Pydantic result = {
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest) 'df_pm_facts': df_pm_facts,
'df_pm_plans': df_pm_plans
codes = validated_params["codes"] }
columns = validated_params["columns"]
search = validated_params.get("search") print(f"🎯 Обработано ОГ: {len([k for k, v in df_pm_facts.items() if v is not None])} факт, {len([k for k, v in df_pm_plans.items() if v is not None])} план")
# TODO: Переделать под новую архитектуру return result
return self.get_svodka_total(self.df, codes, columns, search)
finally:
# Удаляем временную директорию
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def parse(self, file_path: str, params: dict) -> pd.DataFrame: def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame""" """Парсинг отчетов одного ОГ для БП, ПП и факта"""
# Сохраняем DataFrame для использования в геттерах try:
self.df = self.parse_svodka_pm_files(file_path, params) # Автоопределение header_num, если не передан
return self.df if header_num is None:
header_num = find_header_row(file_path, sheet_name, search_value="Итого")
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: # Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
"""Определения индекса заголовка в excel по ключевому слову""" df_probe = pd.read_excel(
# Читаем первые max_rows строк без заголовков file_path,
df_temp = pd.read_excel( sheet_name=sheet_name,
file, header=header_num,
sheet_name=sheet, usecols=None,
header=None, nrows=2,
nrows=max_rows, engine='openpyxl' # Явно указываем движок
engine='openpyxl' )
) except Exception as e:
raise ValueError(f"Ошибка при чтении файла {file_path}: {str(e)}")
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0: if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.") raise ValueError("Файл пуст или не содержит данных.")
@@ -108,16 +162,15 @@ class SvodkaPMParser(ParserPort):
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.') raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0] indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист # Читаем весь лист
df_full = pd.read_excel( df_full = pd.read_excel(
file, file_path,
sheet_name=sheet, sheet_name=sheet_name,
header=header_num, header=header_num,
usecols=None, usecols=None,
index_col=None, index_col=None,
engine='openpyxl' engine='openpyxl' # Явно указываем движок
) )
if indicator_col_name not in df_full.columns: if indicator_col_name not in df_full.columns:
@@ -134,19 +187,18 @@ class SvodkaPMParser(ParserPort):
itogo_idx = header_list.index("Итого") itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2 num_cols_needed = itogo_idx + 2
except ValueError: except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list) num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list)) num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed] df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов === # Удаление полностью пустых столбцов
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True) df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA) df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any() non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask] df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" === # Обработка заголовков: Unnamed и "Итого" → "Итого"
new_columns = [] new_columns = []
last_good_name = None last_good_name = None
for col in df_final.columns: for col in df_final.columns:
@@ -155,109 +207,152 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной # Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed: # Проверяем, начинается ли на "Итого"
# Если это пустая колонка, используем последнее хорошее имя if col_str.startswith('Итого'):
if last_good_name: current_name = 'Итого'
new_columns.append(last_good_name) last_good_name = current_name
else: new_columns.append(current_name)
# Если нет хорошего имени, пропускаем elif is_empty_or_unnamed:
continue # Используем последнее хорошее имя
new_columns.append(last_good_name)
else: else:
# Это хорошая колонка # Имя, полученное из excel
last_good_name = col_str last_good_name = col_str
new_columns.append(col_str) new_columns.append(col_str)
# Применяем новые заголовки
df_final.columns = new_columns df_final.columns = new_columns
return df_final return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict: def _get_svodka_value(self, df_svodka: pd.DataFrame, og_id: str, code: int, search_value: Optional[str] = None):
"""Парсинг ZIP архива со сводками ПМ""" """Служебная функция для простой выборке по сводке"""
import zipfile print(f"🔍 DEBUG: Ищем код '{code}' для ОГ '{og_id}' в DataFrame с {len(df_svodka)} строками")
pm_dict = { print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}")
"facts": {}, print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}")
"plans": {} print(f"🔍 DEBUG: Доступные столбцы: {list(df_svodka.columns)}")
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id) # Проверяем, есть ли код в индексе
fact_candidates = [f for f in file_list if current_fact in f] if code not in df_svodka.index:
if len(fact_candidates) == 1: print(f"⚠️ Код '{code}' не найден в индексе")
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0 return 0
# Получаем позицию строки с кодом
code_row_loc = df_svodka.index.get_loc(code)
print(f"🔍 DEBUG: Код '{code}' в позиции {code_row_loc}")
# Определяем позиции для поиска
if search_value is None:
# Ищем все позиции кроме "Итого" и None (первый столбец с заголовком)
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name != 'Итого' and col_name is not None:
target_positions.append(i)
else: else:
if row_index in df_svodka.index: # Ищем позиции в первой строке, где есть нужное название
# Получаем позицию строки target_positions = []
row_loc = df_svodka.index.get_loc(row_index) for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name == search_value:
target_positions.append(i)
print(f"🔍 DEBUG: Найдены позиции для '{search_value}': {target_positions[:5]}...")
print(f"🔍 DEBUG: Позиции в первой строке: {target_positions[:5]}...")
# Извлекаем значения по позициям столбцов print(f"🔍 DEBUG: Ищем столбцы с названием '{search_value}'")
values = df_svodka.iloc[row_loc, col_positions] print(f"🔍 DEBUG: Целевые позиции: {target_positions[:10]}...")
# Преобразуем в числовой формат if not target_positions:
numeric_values = pd.to_numeric(values, errors='coerce') print(f"⚠️ Позиции '{search_value}' не найдены")
return 0
# Агрегация данных (NaN игнорируются) # Извлекаем значения из найденных позиций
if search_value is None: values = []
return numeric_values for pos in target_positions:
# Берем значение из пересечения строки с кодом и позиции столбца
value = df_svodka.iloc[code_row_loc, pos]
# Если это Series, берем первое значение
if isinstance(value, pd.Series):
if len(value) > 0:
# Берем первое не-NaN значение
first_valid = value.dropna().iloc[0] if not value.dropna().empty else 0
values.append(first_valid)
else: else:
return numeric_values.iloc[0] values.append(0)
else: else:
return None values.append(value)
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения (первые 5): {numeric_values.tolist()[:5]}")
# Попробуем альтернативное преобразование
try:
# Если pandas не может преобразовать, попробуем вручную
manual_values = []
for v in values:
if pd.isna(v) or v is None:
manual_values.append(0)
else:
try:
# Пробуем преобразовать в float
manual_values.append(float(str(v).replace(',', '.')))
except (ValueError, TypeError):
manual_values.append(0)
print(f"🔍 DEBUG: Ручное преобразование (первые 5): {manual_values[:5]}")
numeric_values = pd.Series(manual_values)
except Exception as e:
print(f"⚠️ Ошибка при ручном преобразовании: {e}")
# Используем исходные значения
numeric_values = pd.Series([0 if pd.isna(v) or v is None else v for v in values])
# Агрегация данных (NaN игнорируются)
if search_value is None:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
else:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None):
"""Служебная функция получения данных по одному ОГ"""
result = {} result = {}
fact_df = pm_dict['facts'][id] # Получаем данные из сохраненных словарей (через self.df)
plan_df = pm_dict['plans'][id] if not hasattr(self, 'df') or self.df is None:
print("❌ Данные не загружены. Сначала загрузите ZIP архив.")
return {col: {str(code): None for code in codes} for col in columns}
# Извлекаем словари из сохраненных данных
df_pm_facts = self.df.get('df_pm_facts', {})
df_pm_plans = self.df.get('df_pm_plans', {})
# Получаем данные для конкретного ОГ
fact_df = df_pm_facts.get(og_id)
plan_df = df_pm_plans.get(og_id)
print(f"🔍 ===== НАЧАЛО ОБРАБОТКИ ОГ {og_id} =====")
print(f"🔍 Коды: {codes}")
print(f"🔍 Столбцы: {columns}")
print(f"🔍 Получены данные для {og_id}: факт={'' if fact_df is not None else ''}, план={'' if plan_df is not None else ''}")
# Определяем, какие столбцы из какого датафрейма брать # Определяем, какие столбцы из какого датафрейма брать
for col in columns: for col in columns:
@@ -265,49 +360,91 @@ class SvodkaPMParser(ParserPort):
if col in ['ПП', 'БП']: if col in ['ПП', 'БП']:
if plan_df is None: if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}") print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}")
else: else:
print(f"🔍 DEBUG: ===== ОБРАБАТЫВАЕМ '{col}' ИЗ ДАННЫХ ПЛАНА =====")
for code in codes: for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value) print(f"🔍 DEBUG: --- Код {code} для {col} ---")
col_result[code] = val val = self._get_svodka_value(plan_df, og_id, code, col)
col_result[str(code)] = val
print(f"🔍 DEBUG: ===== ЗАВЕРШИЛИ ОБРАБОТКУ '{col}' =====")
elif col in ['ТБ', 'СЭБ', 'НЭБ']: elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None: if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}") print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}")
else: else:
for code in codes: for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value) val = self._get_svodka_value(fact_df, og_id, code, col)
col_result[code] = val col_result[str(code)] = val
else: else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.") print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes} col_result = {str(code): None for code in codes}
result[col] = col_result result[col] = col_result
return result return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None): def _get_single_og(self, params: Dict[str, Any]) -> str:
''' Служебная функция агрегации данные по всем ОГ ''' """API функция для получения данных по одному ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = self._get_svodka_og(og_id, codes, columns, search)
json_result = data_to_json(data)
return json_result
def _get_total_ogs(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по всем ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"❌Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
total_result = {} total_result = {}
for name, og_id in OG_IDS.items(): for og_id in SINGLE_OGS:
if og_id == 'BASH': if og_id == 'BASH':
continue continue
# print(f"📊 Обработка: {name} ({og_id})")
try: try:
data = self.get_svodka_og( data = self._get_svodka_og(og_id, codes, columns, search)
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data total_result[og_id] = data
except Exception as e: except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}") print(f"❌ Ошибка при обработке {og_id}: {e}")
total_result[og_id] = None total_result[og_id] = None
return total_result json_result = data_to_json(total_result)
return json_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -0,0 +1,377 @@
import pandas as pd
import numpy as np
import os
import tempfile
import shutil
import zipfile
from typing import Dict, List, Optional, Any
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_repair_ca import SvodkaRepairCARequest
from adapters.pconfig import SINGLE_OGS, find_header_row, get_og_by_name
class SvodkaRepairCAParser(ParserPort):
"""Парсер для сводок ремонта СА"""
name = "Сводки ремонта СА"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
register_getter_from_schema(
parser_instance=self,
getter_name="get_repair_data",
method=self._get_repair_data_wrapper,
schema_class=SvodkaRepairCARequest,
description="Получение данных о ремонтных работах"
)
def _get_repair_data_wrapper(self, params: dict):
"""Получение данных о ремонтных работах"""
print(f"🔍 DEBUG: _get_repair_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaRepairCARequest)
og_ids = validated_params.get("og_ids")
repair_types = validated_params.get("repair_types")
include_planned = validated_params.get("include_planned", True)
include_factual = validated_params.get("include_factual", True)
print(f"🔍 DEBUG: Запрошенные ОГ: {og_ids}")
print(f"🔍 DEBUG: Запрошенные типы ремонта: {repair_types}")
print(f"🔍 DEBUG: Включать плановые: {include_planned}, фактические: {include_factual}")
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
print(f"🔍 DEBUG: Используем data_dict с {len(data_source)} записями")
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
print(f"🔍 DEBUG: Используем df, преобразованный в data_dict с {len(data_source)} записями")
else:
print(f"🔍 DEBUG: Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
return []
# Группируем данные по ОГ (как в оригинале)
grouped_data = {}
for item in data_source:
og_id = item.get('id')
if not og_id:
continue
# Проверяем фильтры
if og_ids is not None and og_id not in og_ids:
continue
if repair_types is not None and item.get('type') not in repair_types:
continue
# Фильтрация по плановым/фактическим данным
filtered_item = item.copy()
if not include_planned:
filtered_item.pop('plan', None)
if not include_factual:
filtered_item.pop('fact', None)
# Убираем поле 'id' из записи, так как оно уже в ключе
filtered_item.pop('id', None)
# Добавляем в группу по ОГ
if og_id not in grouped_data:
grouped_data[og_id] = []
grouped_data[og_id].append(filtered_item)
total_records = sum(len(v) for v in grouped_data.values())
print(f"🔍 DEBUG: Отфильтровано {total_records} записей из {len(data_source)}")
print(f"🔍 DEBUG: Группировано по {len(grouped_data)} ОГ: {list(grouped_data.keys())}")
return grouped_data
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return []
# Если df содержит данные в формате списка записей
if 'data' in self.df.columns:
# Извлекаем данные из колонки 'data'
all_data = []
for _, row in self.df.iterrows():
data = row.get('data')
if data and isinstance(data, list):
all_data.extend(data)
return all_data
return []
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
print(f"🔍 DEBUG: SvodkaRepairCAParser.parse вызван с файлом: {file_path}")
# Определяем, это ZIP архив или одиночный файл
if file_path.lower().endswith('.zip'):
# Обрабатываем ZIP архив
self.data_dict = self._parse_zip_archive(file_path, params)
else:
# Обрабатываем одиночный файл
self.data_dict = self._parse_single_file(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
if self.data_dict:
# Создаем DataFrame с информацией о загруженных данных
data_rows = []
for i, item in enumerate(self.data_dict):
data_rows.append({
'index': i,
'data': [item], # Обертываем в список для совместимости
'records_count': 1
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
print(f"🔍 DEBUG: Создан DataFrame с {len(data_rows)} записями")
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
print(f"🔍 DEBUG: Возвращаем пустой DataFrame")
return self.df
def _parse_zip_archive(self, file_path: str, params: dict) -> List[Dict]:
"""Парсинг ZIP архива с файлами ремонта СА"""
print(f"🔍 DEBUG: Парсинг ZIP архива: {file_path}")
all_data = []
temp_dir = None
try:
# Создаем временную директорию
temp_dir = tempfile.mkdtemp()
print(f"📦 Архив разархивирован в: {temp_dir}")
# Разархивируем файл
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем Excel файлы в архиве
excel_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.xlsx', '.xlsm', '.xls')):
excel_files.append(os.path.join(root, file))
print(f"📊 Найдено Excel файлов: {len(excel_files)}")
# Обрабатываем каждый найденный файл
for excel_file in excel_files:
print(f"📊 Обработка файла: {excel_file}")
file_data = self._parse_single_file(excel_file, params)
if file_data:
all_data.extend(file_data)
print(f"🎯 Всего обработано записей: {len(all_data)}")
return all_data
except Exception as e:
print(f"❌ Ошибка при обработке ZIP архива: {e}")
return []
finally:
# Удаляем временную директорию
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def _parse_single_file(self, file_path: str, params: dict) -> List[Dict]:
"""Парсинг одиночного Excel файла"""
print(f"🔍 DEBUG: Парсинг файла: {file_path}")
try:
# Получаем параметры
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
header_num = params.get('header_num', None)
# Автоопределение header_num, если не передан
if header_num is None:
header_num = find_header_row(file_path, sheet_name, search_value="ОГ")
if header_num is None:
print(f"Не найден заголовок в файле {file_path}")
return []
print(f"🔍 DEBUG: Заголовок найден в строке {header_num}")
# Читаем Excel файл
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
index_col=None
)
if df.empty:
print(f"❌ Файл {file_path} пуст")
return []
if "ОГ" not in df.columns:
print(f"⚠️ Предупреждение: Колонка 'ОГ' не найдена в файле {file_path}")
return []
# Обрабатываем данные
return self._process_repair_data(df)
except Exception as e:
print(f"❌ Ошибка при парсинге файла {file_path}: {e}")
return []
def _process_repair_data(self, df: pd.DataFrame) -> List[Dict]:
"""Обработка данных о ремонте"""
print(f"🔍 DEBUG: Обработка данных с {len(df)} строками")
# Шаг 1: Нормализация ОГ
def safe_replace(val):
if pd.notna(val) and isinstance(val, str) and val.strip():
cleaned_val = val.strip()
result = get_og_by_name(cleaned_val)
if result and pd.notna(result) and result != "" and result != "UNKNOWN":
return result
return val
df["ОГ"] = df["ОГ"].apply(safe_replace)
# Шаг 2: Приведение к NA и forward fill
og_series = df["ОГ"].map(
lambda x: pd.NA if (isinstance(x, str) and x.strip() == "") or pd.isna(x) else x
)
df["ОГ"] = og_series.ffill()
# Шаг 3: Фильтрация по валидным ОГ
valid_og_values = set(SINGLE_OGS)
mask_og = df["ОГ"].notna() & df["ОГ"].isin(valid_og_values)
df = df[mask_og].copy()
if df.empty:
print(f"❌ Нет данных после фильтрации по ОГ")
return []
# Шаг 4: Удаление строк без "Вид простоя"
if "Вид простоя" in df.columns:
downtime_clean = df["Вид простоя"].astype(str).str.strip()
mask_downtime = (downtime_clean != "") & (downtime_clean != "nan")
df = df[mask_downtime].copy()
else:
print("⚠️ Предупреждение: Колонка 'Вид простоя' не найдена.")
return []
# Шаг 5: Удаление ненужных колонок
cols_to_drop = []
for col in df.columns:
if col.strip().lower() in ["п/п", "пп", "п.п.", ""]:
cols_to_drop.append(col)
elif "НАЛИЧИЕ ПОДРЯДЧИКА" in col.upper() and "ОСНОВНЫЕ РАБОТЫ" in col.upper():
cols_to_drop.append(col)
df.drop(columns=list(set(cols_to_drop)), inplace=True, errors='ignore')
# Шаг 6: Переименование первых 8 колонок по порядку
if df.shape[1] < 8:
print(f"⚠️ Внимание: В DataFrame только {df.shape[1]} колонок, требуется минимум 8.")
return []
new_names = ["id", "name", "type", "start_date", "end_date", "plan", "fact", "downtime"]
# Сохраняем оставшиеся колонки (если больше 8)
remaining_cols = df.columns[8:].tolist() # Все, что после 8-й
renamed_cols = new_names + remaining_cols
df.columns = renamed_cols
# меняем прочерки на null
df = df.replace("-", None)
# Сброс индекса
df.reset_index(drop=True, inplace=True)
# Шаг 7: Преобразование в список словарей
result_data = []
for _, row in df.iterrows():
try:
# Извлекаем основные поля (теперь с правильными именами)
og_id = row.get('id')
name = row.get('name', '')
repair_type = row.get('type', '')
# Обрабатываем даты
start_date = self._parse_date(row.get('start_date'))
end_date = self._parse_date(row.get('end_date'))
# Обрабатываем числовые значения
plan = self._parse_numeric(row.get('plan'))
fact = self._parse_numeric(row.get('fact'))
downtime = self._parse_downtime(row.get('downtime'))
# Создаем запись
record = {
"id": og_id,
"name": str(name) if pd.notna(name) else "",
"type": str(repair_type) if pd.notna(repair_type) else "",
"start_date": start_date,
"end_date": end_date,
"plan": plan,
"fact": fact,
"downtime": downtime
}
result_data.append(record)
except Exception as e:
print(f"⚠️ Ошибка при обработке строки: {e}")
continue
print(f"✅ Обработано {len(result_data)} записей")
return result_data
def _parse_date(self, value) -> Optional[str]:
"""Парсинг даты"""
if pd.isna(value) or value is None:
return None
try:
if isinstance(value, str):
# Пытаемся преобразовать строку в дату
date_obj = pd.to_datetime(value, errors='coerce')
if pd.notna(date_obj):
return date_obj.strftime('%Y-%m-%d %H:%M:%S')
elif hasattr(value, 'strftime'):
# Это уже объект даты
return value.strftime('%Y-%m-%d %H:%M:%S')
return None
except Exception:
return None
def _parse_numeric(self, value) -> Optional[float]:
"""Парсинг числового значения"""
if pd.isna(value) or value is None:
return None
try:
if isinstance(value, (int, float)):
return float(value)
elif isinstance(value, str):
# Заменяем запятую на точку для русских чисел
cleaned = value.replace(',', '.').strip()
return float(cleaned) if cleaned else None
return None
except (ValueError, TypeError):
return None
def _parse_downtime(self, value) -> Optional[str]:
"""Парсинг данных о простое"""
if pd.isna(value) or value is None:
return None
return str(value).strip() if str(value).strip() else None

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
import json import json
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import os
OG_IDS = { OG_IDS = {
"Комсомольский НПЗ": "KNPZ", "Комсомольский НПЗ": "KNPZ",
@@ -22,8 +23,37 @@ OG_IDS = {
"Красноленинский НПЗ": "KLNPZ", "Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP", "Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS", "ЯНОС": "YANOS",
"Уфанефтехим": "UNH",
"РНПК": "RNPK",
"КмсНПЗ": "KNPZ",
"АНХК": "ANHK",
"НК НПЗ": "NovKuybNPZ",
"КНПЗ": "KuybNPZ",
"СНПЗ": "CyzNPZ",
"Нижневаторское НПО": "NVNPO",
"ПурНП": "PurNP",
} }
SINGLE_OGS = [
"KNPZ",
"ANHK",
"AchNPZ",
"BASH",
"UNPZ",
"UNH",
"NOV",
"NovKuybNPZ",
"KuybNPZ",
"CyzNPZ",
"TuapsNPZ",
"SNPZ",
"RNPK",
"NVNPO",
"KLNPZ",
"PurNP",
"YANOS",
]
SNPZ_IDS = { SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB", "Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM", "Изомеризация": "SNPZ.IZOM",
@@ -40,7 +70,18 @@ SNPZ_IDS = {
def replace_id_in_path(file_path, new_id): def replace_id_in_path(file_path, new_id):
return file_path.replace('ID', str(new_id)) # Заменяем 'ID' на новое значение
modified_path = file_path.replace('ID', str(new_id)) + '.xlsx'
# Проверяем, существует ли файл
if not os.path.exists(modified_path):
# Меняем расширение на .xlsm
directory, filename = os.path.split(modified_path)
name, ext = os.path.splitext(filename)
new_filename = name + '.xlsm'
modified_path = os.path.join(directory, new_filename)
return modified_path
def get_table_name(exel): def get_table_name(exel):
@@ -109,6 +150,25 @@ def get_id_by_name(name, dictionary):
return best_match return best_match
def find_header_row(file, sheet, search_value="Итого", max_rows=50):
''' Определения индекса заголовка в exel по ключевому слову '''
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def data_to_json(data, indent=2, ensure_ascii=False): def data_to_json(data, indent=2, ensure_ascii=False):
""" """
Полностью безопасная сериализация данных в JSON. Полностью безопасная сериализация данных в JSON.
@@ -153,11 +213,18 @@ def data_to_json(data, indent=2, ensure_ascii=False):
# --- рекурсия по dict и list --- # --- рекурсия по dict и list ---
elif isinstance(obj, dict): elif isinstance(obj, dict):
return { # Обрабатываем только значения, ключи оставляем как строки
key: convert_obj(value) converted = {}
for key, value in obj.items() for k, v in obj.items():
if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON) if is_nan_like(k):
} continue # ключи не могут быть null в JSON
# Превращаем ключ в строку, но не пытаемся интерпретировать как число
key_str = str(k)
converted[key_str] = convert_obj(v) # только значение проходит через convert_obj
# Если все значения 0.0 — считаем, что данных нет, т.к. возможно ожидается массив.
if converted and all(v == 0.0 for v in converted.values()):
return None
return converted
elif isinstance(obj, list): elif isinstance(obj, list):
return [convert_obj(item) for item in obj] return [convert_obj(item) for item in obj]
@@ -175,7 +242,6 @@ def data_to_json(data, indent=2, ensure_ascii=False):
try: try:
cleaned_data = convert_obj(data) cleaned_data = convert_obj(data)
cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii) return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
return cleaned_data
except Exception as e: except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}") raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")

View File

@@ -6,7 +6,7 @@ from fastapi import FastAPI, File, UploadFile, HTTPException, status
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from adapters.storage import MinIOStorageAdapter from adapters.storage import MinIOStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser, SvodkaRepairCAParser, StatusesRepairCAParser
from core.models import UploadRequest, DataRequest from core.models import UploadRequest, DataRequest
from core.services import ReportService, PARSERS from core.services import ReportService, PARSERS
@@ -18,6 +18,8 @@ from app.schemas import (
SvodkaCARequest, SvodkaCARequest,
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
) )
from app.schemas.svodka_repair_ca import SvodkaRepairCARequest
from app.schemas.statuses_repair_ca import StatusesRepairCARequest
# Парсеры # Парсеры
@@ -25,6 +27,8 @@ PARSERS.update({
'svodka_pm': SvodkaPMParser, 'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser, 'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser, 'monitoring_fuel': MonitoringFuelParser,
'svodka_repair_ca': SvodkaRepairCAParser,
'statuses_repair_ca': StatusesRepairCAParser,
# 'svodka_plan_sarnpz': SvodkaPlanSarnpzParser, # 'svodka_plan_sarnpz': SvodkaPlanSarnpzParser,
}) })
@@ -80,22 +84,69 @@ async def root():
description="Возвращает список идентификаторов всех доступных парсеров", description="Возвращает список идентификаторов всех доступных парсеров",
response_model=Dict[str, List[str]], response_model=Dict[str, List[str]],
responses={ responses={
200: { 200: {
"content": { "content": {
"application/json": { "application/json": {
"example": { "example": {
"parsers": ["monitoring_fuel", "svodka_ca", "svodka_pm"] "parsers": ["monitoring_fuel", "svodka_ca", "svodka_pm"]
} }
} }
} }
} }
},) },)
async def get_available_parsers(): async def get_available_parsers():
"""Получение списка доступных парсеров""" """Получение списка доступных парсеров"""
parsers = list(PARSERS.keys()) parsers = list(PARSERS.keys())
return {"parsers": parsers} return {"parsers": parsers}
@app.get("/parsers/{parser_name}/available_ogs", tags=["Общее"],
summary="Доступные ОГ для парсера",
description="Возвращает список доступных ОГ для указанного парсера",
responses={
200: {
"content": {
"application/json": {
"example": {
"parser": "svodka_repair_ca",
"available_ogs": ["KNPZ", "ANHK", "SNPZ", "BASH"]
}
}
}
}
},)
async def get_available_ogs(parser_name: str):
"""Получение списка доступных ОГ для парсера"""
if parser_name not in PARSERS:
raise HTTPException(status_code=404, detail=f"Парсер '{parser_name}' не найден")
parser_class = PARSERS[parser_name]
# Для svodka_repair_ca возвращаем ОГ из загруженных данных
if parser_name == "svodka_repair_ca":
try:
# Создаем экземпляр сервиса и загружаем данные из MinIO
report_service = get_report_service()
from core.models import DataRequest
data_request = DataRequest(report_type=parser_name, get_params={})
loaded_data = report_service.get_data(data_request)
# Если данные загружены, извлекаем ОГ из них
if loaded_data is not None and hasattr(loaded_data, 'data') and loaded_data.data is not None:
# Для svodka_repair_ca данные возвращаются в формате словаря по ОГ
data_value = loaded_data.data.get('value')
if isinstance(data_value, dict):
available_ogs = list(data_value.keys())
return {"parser": parser_name, "available_ogs": available_ogs}
except Exception as e:
print(f"⚠️ Ошибка при получении ОГ: {e}")
import traceback
traceback.print_exc()
# Для других парсеров или если нет данных возвращаем статический список из pconfig
from adapters.pconfig import SINGLE_OGS
return {"parser": parser_name, "available_ogs": SINGLE_OGS}
@app.get("/parsers/{parser_name}/getters", tags=["Общее"], @app.get("/parsers/{parser_name}/getters", tags=["Общее"],
summary="Информация о геттерах парсера", summary="Информация о геттерах парсера",
description="Возвращает информацию о доступных геттерах для указанного парсера", description="Возвращает информацию о доступных геттерах для указанного парсера",
@@ -323,7 +374,7 @@ async def get_svodka_pm_single_og(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'single' request_dict['mode'] = 'single_og'
request = DataRequest( request = DataRequest(
report_type='svodka_pm', report_type='svodka_pm',
get_params=request_dict get_params=request_dict
@@ -377,7 +428,7 @@ async def get_svodka_pm_total_ogs(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'total' request_dict['mode'] = 'total_ogs'
request = DataRequest( request = DataRequest(
report_type='svodka_pm', report_type='svodka_pm',
get_params=request_dict get_params=request_dict
@@ -556,6 +607,246 @@ async def get_svodka_ca_data(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}") raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_repair_ca/upload", tags=[SvodkaRepairCAParser.name],
summary="Загрузка файла отчета сводки ремонта СА",
response_model=UploadResponse,
responses={
400: {"model": UploadErrorResponse, "description": "Неверный формат файла"},
500: {"model": UploadErrorResponse, "description": "Внутренняя ошибка сервера"}
},)
async def upload_svodka_repair_ca(
file: UploadFile = File(..., description="Excel файл или ZIP архив сводки ремонта СА (.xlsx, .xlsm, .xls, .zip)")
):
"""
Загрузка и обработка Excel файла или ZIP архива отчета сводки ремонта СА
**Поддерживаемые форматы:**
- Excel (.xlsx, .xlsm, .xls)
- ZIP архив (.zip)
"""
report_service = get_report_service()
try:
# Проверяем тип файла
if not file.filename.lower().endswith(('.xlsx', '.xlsm', '.xls', '.zip')):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=UploadErrorResponse(
message="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls) или ZIP архивы (.zip)",
error_code="INVALID_FILE_TYPE",
details={
"expected_formats": [".xlsx", ".xlsm", ".xls", ".zip"],
"received_format": file.filename.split('.')[-1] if '.' in file.filename else "unknown"
}
).model_dump()
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос
request = UploadRequest(
report_type='svodka_repair_ca',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(request)
if result.success:
return UploadResponse(
success=True,
message=result.message,
object_id=result.object_id
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=result.message,
error_code="ERR_UPLOAD"
).model_dump(),
)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=f"Внутренняя ошибка сервера: {str(e)}",
error_code="INTERNAL_SERVER_ERROR"
).model_dump()
)
@app.post("/svodka_repair_ca/get_data", tags=[SvodkaRepairCAParser.name],
summary="Получение данных из отчета сводки ремонта СА")
async def get_svodka_repair_ca_data(
request_data: SvodkaRepairCARequest
):
"""
Получение данных из отчета сводки ремонта СА
### Структура параметров:
- `og_ids`: **Массив ID ОГ** для фильтрации (опциональный)
- `repair_types`: **Массив типов ремонта** - `КР`, `КП`, `ТР` (опциональный)
- `include_planned`: **Включать плановые данные** (по умолчанию true)
- `include_factual`: **Включать фактические данные** (по умолчанию true)
### Пример тела запроса:
```json
{
"og_ids": ["SNPZ", "KNPZ"],
"repair_types": ["КР", "КП"],
"include_planned": true,
"include_factual": true
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='svodka_repair_ca',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/statuses_repair_ca/upload", tags=[StatusesRepairCAParser.name],
summary="Загрузка отчета статусов ремонта СА")
async def upload_statuses_repair_ca(
file: UploadFile = File(...)
):
"""
Загрузка отчета статусов ремонта СА
### Поддерживаемые форматы:
- **Excel файлы**: `.xlsx`, `.xlsm`, `.xls`
- **ZIP архивы**: `.zip` (содержащие Excel файлы)
### Пример использования:
```bash
curl -X POST "http://localhost:8000/statuses_repair_ca/upload" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "file=@statuses_repair_ca.xlsx"
```
"""
report_service = get_report_service()
try:
# Проверяем тип файла
if not file.filename.endswith(('.xlsx', '.xlsm', '.xls', '.zip')):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls) или архивы (.zip)"
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос на загрузку
upload_request = UploadRequest(
report_type='statuses_repair_ca',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(upload_request)
if result.success:
return UploadResponse(
success=True,
message="Отчет успешно загружен и обработан",
report_id=result.object_id,
filename=file.filename
).model_dump()
else:
return UploadErrorResponse(
success=False,
message=result.message,
error_code="ERR_UPLOAD",
details=None
).model_dump()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/statuses_repair_ca/get_data", tags=[StatusesRepairCAParser.name],
summary="Получение данных из отчета статусов ремонта СА")
async def get_statuses_repair_ca_data(
request_data: StatusesRepairCARequest
):
"""
Получение данных из отчета статусов ремонта СА
### Структура параметров:
- `ids`: **Массив ID ОГ** для фильтрации (опциональный)
- `keys`: **Массив ключей** для извлечения данных (опциональный)
### Пример тела запроса:
```json
{
"ids": ["SNPZ", "KNPZ", "ANHK"],
"keys": [
["Дата начала ремонта"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"]
]
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='statuses_repair_ca',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload", tags=[MonitoringFuelParser.name]) # @app.post("/monitoring_fuel/upload", tags=[MonitoringFuelParser.name])
# async def upload_monitoring_fuel( # async def upload_monitoring_fuel(
# file: UploadFile = File(...), # file: UploadFile = File(...),
@@ -804,7 +1095,7 @@ async def get_monitoring_fuel_total_by_columns(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'total' request_dict['mode'] = 'total_by_columns'
request = DataRequest( request = DataRequest(
report_type='monitoring_fuel', report_type='monitoring_fuel',
get_params=request_dict get_params=request_dict
@@ -849,7 +1140,7 @@ async def get_monitoring_fuel_month_by_code(
try: try:
# Создаем запрос # Создаем запрос
request_dict = request_data.model_dump() request_dict = request_data.model_dump()
request_dict['mode'] = 'month' request_dict['mode'] = 'month_by_code'
request = DataRequest( request = DataRequest(
report_type='monitoring_fuel', report_type='monitoring_fuel',
get_params=request_dict get_params=request_dict

View File

@@ -0,0 +1,34 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Union
from enum import Enum
class StatusesRepairCARequest(BaseModel):
ids: Optional[List[str]] = Field(
None,
description="Массив ID ОГ для фильтрации (например, ['SNPZ', 'KNPZ'])",
example=["SNPZ", "KNPZ", "ANHK"]
)
keys: Optional[List[List[str]]] = Field(
None,
description="Массив ключей для извлечения данных (например, [['Дата начала ремонта'], ['Готовность к КР', 'Факт']])",
example=[
["Дата начала ремонта"],
["Отставание / опережение подготовки к КР", "Отставание / опережение"],
["Отставание / опережение подготовки к КР", "Динамика за прошедшую неделю"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"],
["Поставка МТР", "На складе, позиций", "%"]
]
)
class Config:
json_schema_extra = {
"example": {
"ids": ["SNPZ", "KNPZ", "ANHK"],
"keys": [
["Дата начала ремонта"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"]
]
}
}

View File

@@ -25,7 +25,7 @@ class OGID(str, Enum):
class SvodkaPMSingleOGRequest(BaseModel): class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field( id: str = Field(
..., ...,
description="Идентификатор МА для запрашиваемого ОГ", description="Идентификатор МА для запрашиваемого ОГ",
example="SNPZ" example="SNPZ"

View File

@@ -0,0 +1,46 @@
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class RepairType(str, Enum):
"""Типы ремонтных работ"""
KR = "КР" # Капитальный ремонт
KP = "КП" # Капитальный ремонт
TR = "ТР" # Текущий ремонт
class SvodkaRepairCARequest(BaseModel):
"""Запрос на получение данных сводки ремонта СА"""
og_ids: Optional[List[str]] = Field(
default=None,
description="Список ID ОГ для фильтрации. Если не указан, возвращаются данные по всем ОГ",
example=["SNPZ", "KNPZ", "BASH"]
)
repair_types: Optional[List[RepairType]] = Field(
default=None,
description="Список типов ремонта для фильтрации. Если не указан, возвращаются все типы",
example=[RepairType.KR, RepairType.KP]
)
include_planned: bool = Field(
default=True,
description="Включать ли плановые данные"
)
include_factual: bool = Field(
default=True,
description="Включать ли фактические данные"
)
class Config:
json_schema_extra = {
"example": {
"og_ids": ["SNPZ", "KNPZ"],
"repair_types": ["КР", "КП"],
"include_planned": True,
"include_factual": True
}
}

View File

@@ -43,7 +43,7 @@ class ReportService:
try: try:
# Парсим файл # Парсим файл
parse_params = request.parse_params or {} parse_params = request.parse_params or {}
df = parser.parse(temp_file_path, parse_params) parse_result = parser.parse(temp_file_path, parse_params)
# Генерируем object_id # Генерируем object_id
object_id = f"nin_excel_data_{request.report_type}" object_id = f"nin_excel_data_{request.report_type}"
@@ -54,7 +54,7 @@ class ReportService:
print(f"Старый объект удален: {object_id}") print(f"Старый объект удален: {object_id}")
# Сохраняем в хранилище # Сохраняем в хранилище
if self.storage.save_dataframe(df, object_id): if self.storage.save_dataframe(parse_result, object_id):
return UploadResult( return UploadResult(
success=True, success=True,
message="Отчет успешно загружен", message="Отчет успешно загружен",
@@ -89,9 +89,9 @@ class ReportService:
message=f"Отчет типа '{request.report_type}' не найден. Возможно, MinIO недоступен или отчет не был загружен." message=f"Отчет типа '{request.report_type}' не найден. Возможно, MinIO недоступен или отчет не был загружен."
) )
# Загружаем DataFrame из хранилища # Загружаем данные из хранилища
df = self.storage.load_dataframe(object_id) loaded_data = self.storage.load_dataframe(object_id)
if df is None: if loaded_data is None:
return DataResult( return DataResult(
success=False, success=False,
message="Ошибка при загрузке данных из хранилища. Возможно, MinIO недоступен." message="Ошибка при загрузке данных из хранилища. Возможно, MinIO недоступен."
@@ -100,25 +100,90 @@ class ReportService:
# Получаем парсер # Получаем парсер
parser = get_parser(request.report_type) parser = get_parser(request.report_type)
# Устанавливаем DataFrame в парсер для использования в геттерах # Устанавливаем данные в парсер для использования в геттерах
parser.df = df parser.df = loaded_data
print(f"🔍 DEBUG: ReportService.get_data - установлены данные в парсер {request.report_type}")
# Проверяем тип загруженных данных
if hasattr(loaded_data, 'shape'):
# Это DataFrame
print(f"🔍 DEBUG: DataFrame shape: {loaded_data.shape}")
print(f"🔍 DEBUG: DataFrame columns: {list(loaded_data.columns) if not loaded_data.empty else 'Empty'}")
elif isinstance(loaded_data, dict):
# Это словарь (для парсера ПМ)
print(f"🔍 DEBUG: Словарь с ключами: {list(loaded_data.keys())}")
else:
print(f"🔍 DEBUG: Неизвестный тип данных: {type(loaded_data)}")
# Получаем параметры запроса # Получаем параметры запроса
get_params = request.get_params or {} get_params = request.get_params or {}
# Определяем имя геттера (по умолчанию используем первый доступный) # Для svodka_ca определяем режим из данных или используем 'fact' по умолчанию
getter_name = get_params.pop("getter", None) if request.report_type == 'svodka_ca':
if not getter_name: # Извлекаем режим из DataFrame или используем 'fact' по умолчанию
# Если геттер не указан, берем первый доступный if hasattr(parser, 'df') and parser.df is not None and not parser.df.empty:
available_getters = list(parser.getters.keys()) modes_in_df = parser.df['mode'].unique() if 'mode' in parser.df.columns else ['fact']
if available_getters: # Используем первый найденный режим или 'fact' по умолчанию
getter_name = available_getters[0] default_mode = modes_in_df[0] if len(modes_in_df) > 0 else 'fact'
print(f"⚠️ Геттер не указан, используем первый доступный: {getter_name}")
else: else:
return DataResult( default_mode = 'fact'
success=False,
message="Парсер не имеет доступных геттеров" # Устанавливаем режим в параметры, если он не указан
) if 'mode' not in get_params:
get_params['mode'] = default_mode
# Определяем имя геттера
if request.report_type == 'svodka_ca':
# Для svodka_ca используем геттер get_ca_data
getter_name = 'get_ca_data'
elif request.report_type == 'svodka_repair_ca':
# Для svodka_repair_ca используем геттер get_repair_data
getter_name = 'get_repair_data'
elif request.report_type == 'statuses_repair_ca':
# Для statuses_repair_ca используем геттер get_repair_statuses
getter_name = 'get_repair_statuses'
elif request.report_type == 'monitoring_fuel':
# Для monitoring_fuel определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
elif request.report_type == 'svodka_pm':
# Для svodka_pm определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
else:
# Для других парсеров определяем из параметра mode
getter_name = get_params.pop("mode", None)
if not getter_name:
# Если режим не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
# Получаем значение через указанный геттер # Получаем значение через указанный геттер
try: try:

20
python_parser/test_app.py Normal file
View File

@@ -0,0 +1,20 @@
#!/usr/bin/env python3
"""
Простой тест для проверки работы FastAPI
"""
from fastapi import FastAPI
app = FastAPI(title="Test API")
@app.get("/")
async def root():
return {"message": "Test API is working"}
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
print("Starting test server...")
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -4,7 +4,7 @@ import json
import pandas as pd import pandas as pd
import io import io
import zipfile import zipfile
from typing import Dict, Any from typing import Dict, Any, List
import os import os
# Конфигурация страницы # Конфигурация страницы
@@ -50,7 +50,12 @@ def get_server_info():
def upload_file_to_api(endpoint: str, file_data: bytes, filename: str): def upload_file_to_api(endpoint: str, file_data: bytes, filename: str):
"""Загрузка файла на API""" """Загрузка файла на API"""
try: try:
files = {"zip_file": (filename, file_data, "application/zip")} # Определяем правильное имя поля в зависимости от эндпоинта
if "zip" in endpoint:
files = {"zip_file": (filename, file_data, "application/zip")}
else:
files = {"file": (filename, file_data, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")}
response = requests.post(f"{API_BASE_URL}{endpoint}", files=files) response = requests.post(f"{API_BASE_URL}{endpoint}", files=files)
return response.json(), response.status_code return response.json(), response.status_code
except Exception as e: except Exception as e:
@@ -64,6 +69,20 @@ def make_api_request(endpoint: str, data: Dict[str, Any]):
except Exception as e: except Exception as e:
return {"error": str(e)}, 500 return {"error": str(e)}, 500
def get_available_ogs(parser_name: str) -> List[str]:
"""Получение доступных ОГ для парсера"""
try:
response = requests.get(f"{API_BASE_URL}/parsers/{parser_name}/available_ogs")
if response.status_code == 200:
data = response.json()
return data.get("available_ogs", [])
else:
print(f"⚠️ Ошибка получения ОГ: {response.status_code}")
return []
except Exception as e:
print(f"⚠️ Ошибка при запросе ОГ: {e}")
return []
def main(): def main():
st.title("🚀 NIN Excel Parsers API - Демонстрация") st.title("🚀 NIN Excel Parsers API - Демонстрация")
st.markdown("---") st.markdown("---")
@@ -96,10 +115,12 @@ def main():
st.write(f"{parser}") st.write(f"{parser}")
# Основные вкладки - по одной на каждый парсер # Основные вкладки - по одной на каждый парсер
tab1, tab2, tab3 = st.tabs([ tab1, tab2, tab3, tab4, tab5 = st.tabs([
"📊 Сводки ПМ", "📊 Сводки ПМ",
"🏭 Сводки СА", "🏭 Сводки СА",
"⛽ Мониторинг топлива" "⛽ Мониторинг топлива",
"🔧 Ремонт СА",
"📋 Статусы ремонта СА"
]) ])
# Вкладка 1: Сводки ПМ - полный функционал # Вкладка 1: Сводки ПМ - полный функционал
@@ -371,6 +392,247 @@ def main():
else: else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}") st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Вкладка 4: Ремонт СА
with tab4:
st.header("🔧 Ремонт СА - Управление ремонтными работами")
# Секция загрузки файлов
st.subheader("📤 Загрузка файлов")
uploaded_file = st.file_uploader(
"Выберите Excel файл или ZIP архив с данными о ремонте СА",
type=['xlsx', 'xlsm', 'xls', 'zip'],
key="repair_ca_upload"
)
if uploaded_file is not None:
if st.button("📤 Загрузить файл", key="repair_ca_upload_btn"):
with st.spinner("Загружаю файл..."):
file_data = uploaded_file.read()
result, status = upload_file_to_api("/svodka_repair_ca/upload", file_data, uploaded_file.name)
if status == 200:
st.success("✅ Файл успешно загружен")
st.json(result)
else:
st.error(f"❌ Ошибка загрузки: {result.get('message', 'Неизвестная ошибка')}")
st.markdown("---")
# Секция получения данных
st.subheader("🔍 Получение данных")
col1, col2 = st.columns(2)
with col1:
st.subheader("Фильтры")
# Получаем доступные ОГ динамически
available_ogs = get_available_ogs("svodka_repair_ca")
# Фильтр по ОГ
og_ids = st.multiselect(
"Выберите ОГ (оставьте пустым для всех)",
available_ogs if available_ogs else ["KNPZ", "ANHK", "SNPZ", "BASH", "UNH", "NOV"], # fallback
key="repair_ca_og_ids"
)
# Фильтр по типам ремонта
repair_types = st.multiselect(
"Выберите типы ремонта (оставьте пустым для всех)",
["КР", "КП", "ТР"],
key="repair_ca_types"
)
# Включение плановых/фактических данных
include_planned = st.checkbox("Включать плановые данные", value=True, key="repair_ca_planned")
include_factual = st.checkbox("Включать фактические данные", value=True, key="repair_ca_factual")
with col2:
st.subheader("Действия")
if st.button("🔍 Получить данные о ремонте", key="repair_ca_get_btn"):
with st.spinner("Получаю данные..."):
data = {
"include_planned": include_planned,
"include_factual": include_factual
}
# Добавляем фильтры только если они выбраны
if og_ids:
data["og_ids"] = og_ids
if repair_types:
data["repair_types"] = repair_types
result, status = make_api_request("/svodka_repair_ca/get_data", data)
if status == 200:
st.success("✅ Данные получены")
# Отображаем данные в виде таблицы, если возможно
if result.get("data") and isinstance(result["data"], list):
df_data = []
for item in result["data"]:
df_data.append({
"ID ОГ": item.get("id", ""),
"Наименование": item.get("name", ""),
"Тип ремонта": item.get("type", ""),
"Дата начала": item.get("start_date", ""),
"Дата окончания": item.get("end_date", ""),
"План": item.get("plan", ""),
"Факт": item.get("fact", ""),
"Простой": item.get("downtime", "")
})
if df_data:
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True)
else:
st.info("📋 Нет данных для отображения")
else:
st.json(result)
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Вкладка 5: Статусы ремонта СА
with tab5:
st.header("📋 Статусы ремонта СА")
# Секция загрузки файлов
st.subheader("📤 Загрузка файлов")
uploaded_file = st.file_uploader(
"Выберите файл статусов ремонта СА",
type=['xlsx', 'xlsm', 'xls', 'zip'],
key="statuses_repair_ca_upload"
)
if uploaded_file is not None:
if st.button("📤 Загрузить файл", key="statuses_repair_ca_upload_btn"):
with st.spinner("Загружаем файл..."):
file_data = uploaded_file.read()
result, status_code = upload_file_to_api("/statuses_repair_ca/upload", file_data, uploaded_file.name)
if status_code == 200:
st.success("✅ Файл успешно загружен!")
st.json(result)
else:
st.error(f"❌ Ошибка загрузки: {result}")
# Секция получения данных
st.subheader("📊 Получение данных")
# Получаем доступные ОГ динамически
available_ogs = get_available_ogs("statuses_repair_ca")
# Фильтр по ОГ
og_ids = st.multiselect(
"Выберите ОГ (оставьте пустым для всех)",
available_ogs if available_ogs else ["KNPZ", "ANHK", "SNPZ", "BASH", "UNH", "NOV"], # fallback
key="statuses_repair_ca_og_ids"
)
# Предустановленные ключи для извлечения
st.subheader("🔑 Ключи для извлечения данных")
# Основные ключи
include_basic_keys = st.checkbox("Основные данные", value=True, key="statuses_basic_keys")
include_readiness_keys = st.checkbox("Готовность к КР", value=True, key="statuses_readiness_keys")
include_contract_keys = st.checkbox("Заключение договоров", value=True, key="statuses_contract_keys")
include_supply_keys = st.checkbox("Поставка МТР", value=True, key="statuses_supply_keys")
# Формируем ключи на основе выбора
keys = []
if include_basic_keys:
keys.append(["Дата начала ремонта"])
keys.append(["Отставание / опережение подготовки к КР", "Отставание / опережение"])
keys.append(["Отставание / опережение подготовки к КР", "Динамика за прошедшую неделю"])
if include_readiness_keys:
keys.append(["Готовность к КР", "Факт"])
if include_contract_keys:
keys.append(["Заключение договоров на СМР", "Договор", "%"])
if include_supply_keys:
keys.append(["Поставка МТР", "На складе, позиций", "%"])
# Кнопка получения данных
if st.button("📊 Получить данные", key="statuses_repair_ca_get_data_btn"):
if not keys:
st.warning("⚠️ Выберите хотя бы одну группу ключей для извлечения")
else:
with st.spinner("Получаем данные..."):
request_data = {
"ids": og_ids if og_ids else None,
"keys": keys
}
result, status_code = make_api_request("/statuses_repair_ca/get_data", request_data)
if status_code == 200 and result.get("success"):
st.success("✅ Данные успешно получены!")
data = result.get("data", {}).get("value", [])
if data:
# Отображаем данные в виде таблицы
if isinstance(data, list) and len(data) > 0:
# Преобразуем в DataFrame для лучшего отображения
df_data = []
for item in data:
row = {
"ID": item.get("id", ""),
"Название": item.get("name", ""),
}
# Добавляем основные поля
if "Дата начала ремонта" in item:
row["Дата начала ремонта"] = item["Дата начала ремонта"]
# Добавляем готовность к КР
if "Готовность к КР" in item:
readiness = item["Готовность к КР"]
if isinstance(readiness, dict) and "Факт" in readiness:
row["Готовность к КР (Факт)"] = readiness["Факт"]
# Добавляем отставание/опережение
if "Отставание / опережение подготовки к КР" in item:
delay = item["Отставание / опережение подготовки к КР"]
if isinstance(delay, dict):
if "Отставание / опережение" in delay:
row["Отставание/опережение"] = delay["Отставание / опережение"]
if "Динамика за прошедшую неделю" in delay:
row["Динамика за неделю"] = delay["Динамика за прошедшую неделю"]
# Добавляем договоры
if "Заключение договоров на СМР" in item:
contracts = item["Заключение договоров на СМР"]
if isinstance(contracts, dict) and "Договор" in contracts:
contract = contracts["Договор"]
if isinstance(contract, dict) and "%" in contract:
row["Договоры (%)"] = contract["%"]
# Добавляем поставки МТР
if "Поставка МТР" in item:
supply = item["Поставка МТР"]
if isinstance(supply, dict) and "На складе, позиций" in supply:
warehouse = supply["На складе, позиций"]
if isinstance(warehouse, dict) and "%" in warehouse:
row["МТР на складе (%)"] = warehouse["%"]
df_data.append(row)
if df_data:
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True)
else:
st.info("📋 Нет данных для отображения")
else:
st.json(result)
else:
st.info("📋 Нет данных для отображения")
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Футер # Футер
st.markdown("---") st.markdown("---")
st.markdown("### 📚 Документация API") st.markdown("### 📚 Документация API")
@@ -385,6 +647,8 @@ def main():
- 📊 Парсинг сводок ПМ (план и факт) - 📊 Парсинг сводок ПМ (план и факт)
- 🏭 Парсинг сводок СА - 🏭 Парсинг сводок СА
- ⛽ Мониторинг топлива - ⛽ Мониторинг топлива
- 🔧 Управление ремонтными работами СА
- 📋 Мониторинг статусов ремонта СА
**Технологии:** **Технологии:**
- FastAPI - FastAPI