2 Commits

8 changed files with 1379 additions and 16 deletions

View File

@@ -1,9 +1,13 @@
from .monitoring_fuel import MonitoringFuelParser
from .svodka_ca import SvodkaCAParser
from .svodka_pm import SvodkaPMParser
from .svodka_repair_ca import SvodkaRepairCAParser
from .statuses_repair_ca import StatusesRepairCAParser
__all__ = [
'MonitoringFuelParser',
'SvodkaCAParser',
'SvodkaPMParser'
'SvodkaPMParser',
'SvodkaRepairCAParser',
'StatusesRepairCAParser'
]

View File

@@ -0,0 +1,341 @@
import pandas as pd
import os
import tempfile
import zipfile
from typing import Dict, Any, List, Tuple, Optional
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.statuses_repair_ca import StatusesRepairCARequest
from adapters.pconfig import find_header_row, get_og_by_name, data_to_json
class StatusesRepairCAParser(ParserPort):
"""Парсер для статусов ремонта СА"""
name = "Статусы ремонта СА"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
register_getter_from_schema(
parser_instance=self,
getter_name="get_repair_statuses",
method=self._get_repair_statuses_wrapper,
schema_class=StatusesRepairCARequest,
description="Получение статусов ремонта по ОГ и ключам"
)
def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
"""Парсинг файла статусов ремонта СА"""
print(f"🔍 DEBUG: StatusesRepairCAParser.parse вызван с файлом: {file_path}")
try:
# Определяем тип файла
if file_path.endswith('.zip'):
return self._parse_zip_file(file_path)
elif file_path.endswith(('.xlsx', '.xls')):
return self._parse_excel_file(file_path)
else:
raise ValueError(f"Неподдерживаемый формат файла: {file_path}")
except Exception as e:
print(f"❌ Ошибка при парсинге файла {file_path}: {e}")
raise
def _parse_zip_file(self, zip_path: str) -> Dict[str, Any]:
"""Парсинг ZIP архива"""
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем Excel файл в архиве
excel_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith(('.xlsx', '.xls')):
excel_files.append(os.path.join(root, file))
if not excel_files:
raise ValueError("В архиве не найдено Excel файлов")
# Берем первый найденный Excel файл
excel_file = excel_files[0]
print(f"🔍 DEBUG: Найден Excel файл в архиве: {excel_file}")
return self._parse_excel_file(excel_file)
def _parse_excel_file(self, file_path: str) -> Dict[str, Any]:
"""Парсинг Excel файла"""
print(f"🔍 DEBUG: Парсинг Excel файла: {file_path}")
# Парсим данные
df_statuses = self._parse_statuses_repair_ca(file_path, 0)
if df_statuses.empty:
print("⚠️ Нет данных после парсинга")
return {"data": [], "records_count": 0}
# Преобразуем в список словарей для хранения
data_list = self._data_to_structured_json(df_statuses)
result = {
"data": data_list,
"records_count": len(data_list)
}
# Устанавливаем данные в парсер для использования в геттерах
self.data_dict = result
print(f"✅ Парсинг завершен. Получено {len(data_list)} записей")
return result
def _parse_statuses_repair_ca(self, file: str, sheet: int, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов статусов ремонта"""
# === ШАГ 1: Создание MultiIndex ===
columns_level_1 = [
'id',
'ОГ',
'Дата начала ремонта',
'Готовность к КР',
'Отставание / опережение подготовки к КР',
'Заключение договоров на СМР',
'Поставка МТР'
]
sub_columns_cmp = {
'ДВ': ['всего', 'плановая дата', 'факт', '%'],
'Сметы': ['всего', 'плановая дата', 'факт', '%'],
'Формирование лотов': ['всего', 'плановая дата', 'факт', '%'],
'Договор': ['всего', 'плановая дата', 'факт', '%']
}
sub_columns_mtp = {
'Выполнение плана на текущую дату': ['инициирования закупок', 'заключения договоров', 'поставки'],
'На складе, позиций': ['всего', 'поставлено', '%', 'динамика за прошедшую неделю, поз.']
}
# Формируем MultiIndex — ВСЕ кортежи длиной 3
cols = []
for col1 in columns_level_1:
if col1 == 'id':
cols.append((col1, '', ''))
elif col1 == 'ОГ':
cols.append((col1, '', ''))
elif col1 == 'Дата начала ремонта':
cols.append((col1, '', ''))
elif col1 == 'Готовность к КР':
cols.extend([(col1, 'План', ''), (col1, 'Факт', '')])
elif col1 == 'Отставание / опережение подготовки к КР':
cols.extend([
(col1, 'Отставание / опережение', ''),
(col1, 'Динамика за прошедшую неделю', '')
])
elif col1 == 'Заключение договоров на СМР':
for subcol, sub_sub_cols in sub_columns_cmp.items():
for ssc in sub_sub_cols:
cols.append((col1, subcol, ssc))
elif col1 == 'Поставка МТР':
for subcol, sub_sub_cols in sub_columns_mtp.items():
for ssc in sub_sub_cols:
cols.append((col1, subcol, ssc))
else:
cols.append((col1, '', ''))
# Создаем MultiIndex
multi_index = pd.MultiIndex.from_tuples(cols, names=['Level1', 'Level2', 'Level3'])
# === ШАГ 2: Читаем данные из Excel ===
if header_num is None:
header_num = find_header_row(file, sheet, search_value="ОГ")
df_data = pd.read_excel(
file,
skiprows=header_num + 3,
header=None,
index_col=0,
engine='openpyxl'
)
# Убираем строки с пустыми данными
df_data.dropna(how='all', inplace=True)
# Применяем функцию get_og_by_name для 'id'
df_data['id'] = df_data.iloc[:, 0].copy()
df_data['id'] = df_data['id'].apply(get_og_by_name)
# Перемещаем 'id' на первое место
cols = ['id'] + [col for col in df_data.columns if col != 'id']
df_data = df_data[cols]
# Удаляем строки с пустым id
df_data = df_data.dropna(subset=['id'])
df_data = df_data[df_data['id'].astype(str).str.strip() != '']
# Сбрасываем индекс
df_data = df_data.reset_index(drop=True)
# Выбираем 4-ю колонку (индекс 3) для фильтрации
col_index = 3
numeric_series = pd.to_numeric(df_data.iloc[:, col_index], errors='coerce')
# Фильтруем: оставляем только строки, где значение — число
mask = pd.notna(numeric_series)
df_data = df_data[mask].copy()
# === ШАГ 3: Применяем MultiIndex к данным ===
df_data.columns = multi_index
return df_data
def _data_to_structured_json(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Преобразование DataFrame с MultiIndex в структурированный JSON"""
if df.empty:
return []
result_list = []
for idx, row in df.iterrows():
result = {}
for col in df.columns:
value = row[col]
# Пропускаем NaN
if pd.isna(value):
value = None
# Распаковываем уровни
level1, level2, level3 = col
# Убираем пустые/неинформативные значения
level1 = str(level1).strip() if level1 else ""
level2 = str(level2).strip() if level2 else None
level3 = str(level3).strip() if level3 else None
# Обработка id и ОГ — выносим на верх
if level1 == "id":
result["id"] = value
elif level1 == "ОГ":
result["name"] = value
else:
# Группируем по Level1
if level1 not in result:
result[level1] = {}
# Вложенные уровни
if level2 and level3:
if level2 not in result[level1]:
result[level1][level2] = {}
result[level1][level2][level3] = value
elif level2:
result[level1][level2] = value
else:
result[level1] = value
result_list.append(result)
return result_list
def _get_repair_statuses_wrapper(self, params: dict):
"""Обертка для получения статусов ремонта"""
print(f"🔍 DEBUG: _get_repair_statuses_wrapper вызван с параметрами: {params}")
# Валидация параметров
validated_params = validate_params_with_schema(params, StatusesRepairCARequest)
ids = validated_params.get('ids')
keys = validated_params.get('keys')
print(f"🔍 DEBUG: Запрошенные ОГ: {ids}")
print(f"🔍 DEBUG: Запрошенные ключи: {keys}")
# Получаем данные из парсера
if hasattr(self, 'df') and self.df is not None:
# Данные загружены из MinIO
if isinstance(self.df, dict):
# Это словарь (как в других парсерах)
data_source = self.df.get('data', [])
elif hasattr(self.df, 'columns') and 'data' in self.df.columns:
# Это DataFrame
data_source = []
for _, row in self.df.iterrows():
if row['data']:
data_source.extend(row['data'])
else:
data_source = []
elif hasattr(self, 'data_dict') and self.data_dict:
# Данные из локального парсинга
data_source = self.data_dict.get('data', [])
else:
print("⚠️ Нет данных в парсере")
return []
print(f"🔍 DEBUG: Используем данные с {len(data_source)} записями")
# Фильтруем данные
filtered_data = self._filter_statuses_data(data_source, ids, keys)
print(f"🔍 DEBUG: Отфильтровано {len(filtered_data)} записей")
return filtered_data
def _filter_statuses_data(self, data_source: List[Dict], ids: Optional[List[str]], keys: Optional[List[List[str]]]) -> List[Dict]:
"""Фильтрация данных по ОГ и ключам"""
if not data_source:
return []
# Если не указаны фильтры, возвращаем все данные
if not ids and not keys:
return data_source
filtered_data = []
for item in data_source:
# Фильтр по ОГ
if ids is not None:
item_id = item.get('id')
if item_id not in ids:
continue
# Если указаны ключи, извлекаем только нужные поля
if keys is not None:
filtered_item = self._extract_keys_from_item(item, keys)
if filtered_item:
filtered_data.append(filtered_item)
else:
filtered_data.append(item)
return filtered_data
def _extract_keys_from_item(self, item: Dict[str, Any], keys: List[List[str]]) -> Dict[str, Any]:
"""Извлечение указанных ключей из элемента"""
result = {}
# Всегда добавляем id и name
if 'id' in item:
result['id'] = item['id']
if 'name' in item:
result['name'] = item['name']
# Извлекаем указанные ключи
for key_path in keys:
if not key_path:
continue
value = item
for key in key_path:
if isinstance(value, dict) and key in value:
value = value[key]
else:
value = None
break
if value is not None:
# Строим вложенную структуру
current = result
for i, key in enumerate(key_path):
if i == len(key_path) - 1:
current[key] = value
else:
if key not in current:
current[key] = {}
current = current[key]
return result

View File

@@ -0,0 +1,377 @@
import pandas as pd
import numpy as np
import os
import tempfile
import shutil
import zipfile
from typing import Dict, List, Optional, Any
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_repair_ca import SvodkaRepairCARequest
from adapters.pconfig import SINGLE_OGS, find_header_row, get_og_by_name
class SvodkaRepairCAParser(ParserPort):
"""Парсер для сводок ремонта СА"""
name = "Сводки ремонта СА"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
register_getter_from_schema(
parser_instance=self,
getter_name="get_repair_data",
method=self._get_repair_data_wrapper,
schema_class=SvodkaRepairCARequest,
description="Получение данных о ремонтных работах"
)
def _get_repair_data_wrapper(self, params: dict):
"""Получение данных о ремонтных работах"""
print(f"🔍 DEBUG: _get_repair_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaRepairCARequest)
og_ids = validated_params.get("og_ids")
repair_types = validated_params.get("repair_types")
include_planned = validated_params.get("include_planned", True)
include_factual = validated_params.get("include_factual", True)
print(f"🔍 DEBUG: Запрошенные ОГ: {og_ids}")
print(f"🔍 DEBUG: Запрошенные типы ремонта: {repair_types}")
print(f"🔍 DEBUG: Включать плановые: {include_planned}, фактические: {include_factual}")
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
if hasattr(self, 'data_dict') and self.data_dict is not None:
# Данные из парсинга
data_source = self.data_dict
print(f"🔍 DEBUG: Используем data_dict с {len(data_source)} записями")
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из загрузки - преобразуем DataFrame обратно в словарь
data_source = self._df_to_data_dict()
print(f"🔍 DEBUG: Используем df, преобразованный в data_dict с {len(data_source)} записями")
else:
print(f"🔍 DEBUG: Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
return []
# Группируем данные по ОГ (как в оригинале)
grouped_data = {}
for item in data_source:
og_id = item.get('id')
if not og_id:
continue
# Проверяем фильтры
if og_ids is not None and og_id not in og_ids:
continue
if repair_types is not None and item.get('type') not in repair_types:
continue
# Фильтрация по плановым/фактическим данным
filtered_item = item.copy()
if not include_planned:
filtered_item.pop('plan', None)
if not include_factual:
filtered_item.pop('fact', None)
# Убираем поле 'id' из записи, так как оно уже в ключе
filtered_item.pop('id', None)
# Добавляем в группу по ОГ
if og_id not in grouped_data:
grouped_data[og_id] = []
grouped_data[og_id].append(filtered_item)
total_records = sum(len(v) for v in grouped_data.values())
print(f"🔍 DEBUG: Отфильтровано {total_records} записей из {len(data_source)}")
print(f"🔍 DEBUG: Группировано по {len(grouped_data)} ОГ: {list(grouped_data.keys())}")
return grouped_data
def _df_to_data_dict(self):
"""Преобразование DataFrame обратно в словарь данных"""
if not hasattr(self, 'df') or self.df is None or self.df.empty:
return []
# Если df содержит данные в формате списка записей
if 'data' in self.df.columns:
# Извлекаем данные из колонки 'data'
all_data = []
for _, row in self.df.iterrows():
data = row.get('data')
if data and isinstance(data, list):
all_data.extend(data)
return all_data
return []
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
print(f"🔍 DEBUG: SvodkaRepairCAParser.parse вызван с файлом: {file_path}")
# Определяем, это ZIP архив или одиночный файл
if file_path.lower().endswith('.zip'):
# Обрабатываем ZIP архив
self.data_dict = self._parse_zip_archive(file_path, params)
else:
# Обрабатываем одиночный файл
self.data_dict = self._parse_single_file(file_path, params)
# Преобразуем словарь в DataFrame для совместимости с services.py
if self.data_dict:
# Создаем DataFrame с информацией о загруженных данных
data_rows = []
for i, item in enumerate(self.data_dict):
data_rows.append({
'index': i,
'data': [item], # Обертываем в список для совместимости
'records_count': 1
})
if data_rows:
df = pd.DataFrame(data_rows)
self.df = df
print(f"🔍 DEBUG: Создан DataFrame с {len(data_rows)} записями")
return df
# Если данных нет, возвращаем пустой DataFrame
self.df = pd.DataFrame()
print(f"🔍 DEBUG: Возвращаем пустой DataFrame")
return self.df
def _parse_zip_archive(self, file_path: str, params: dict) -> List[Dict]:
"""Парсинг ZIP архива с файлами ремонта СА"""
print(f"🔍 DEBUG: Парсинг ZIP архива: {file_path}")
all_data = []
temp_dir = None
try:
# Создаем временную директорию
temp_dir = tempfile.mkdtemp()
print(f"📦 Архив разархивирован в: {temp_dir}")
# Разархивируем файл
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем Excel файлы в архиве
excel_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.xlsx', '.xlsm', '.xls')):
excel_files.append(os.path.join(root, file))
print(f"📊 Найдено Excel файлов: {len(excel_files)}")
# Обрабатываем каждый найденный файл
for excel_file in excel_files:
print(f"📊 Обработка файла: {excel_file}")
file_data = self._parse_single_file(excel_file, params)
if file_data:
all_data.extend(file_data)
print(f"🎯 Всего обработано записей: {len(all_data)}")
return all_data
except Exception as e:
print(f"❌ Ошибка при обработке ZIP архива: {e}")
return []
finally:
# Удаляем временную директорию
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def _parse_single_file(self, file_path: str, params: dict) -> List[Dict]:
"""Парсинг одиночного Excel файла"""
print(f"🔍 DEBUG: Парсинг файла: {file_path}")
try:
# Получаем параметры
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
header_num = params.get('header_num', None)
# Автоопределение header_num, если не передан
if header_num is None:
header_num = find_header_row(file_path, sheet_name, search_value="ОГ")
if header_num is None:
print(f"Не найден заголовок в файле {file_path}")
return []
print(f"🔍 DEBUG: Заголовок найден в строке {header_num}")
# Читаем Excel файл
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
index_col=None
)
if df.empty:
print(f"❌ Файл {file_path} пуст")
return []
if "ОГ" not in df.columns:
print(f"⚠️ Предупреждение: Колонка 'ОГ' не найдена в файле {file_path}")
return []
# Обрабатываем данные
return self._process_repair_data(df)
except Exception as e:
print(f"❌ Ошибка при парсинге файла {file_path}: {e}")
return []
def _process_repair_data(self, df: pd.DataFrame) -> List[Dict]:
"""Обработка данных о ремонте"""
print(f"🔍 DEBUG: Обработка данных с {len(df)} строками")
# Шаг 1: Нормализация ОГ
def safe_replace(val):
if pd.notna(val) and isinstance(val, str) and val.strip():
cleaned_val = val.strip()
result = get_og_by_name(cleaned_val)
if result and pd.notna(result) and result != "" and result != "UNKNOWN":
return result
return val
df["ОГ"] = df["ОГ"].apply(safe_replace)
# Шаг 2: Приведение к NA и forward fill
og_series = df["ОГ"].map(
lambda x: pd.NA if (isinstance(x, str) and x.strip() == "") or pd.isna(x) else x
)
df["ОГ"] = og_series.ffill()
# Шаг 3: Фильтрация по валидным ОГ
valid_og_values = set(SINGLE_OGS)
mask_og = df["ОГ"].notna() & df["ОГ"].isin(valid_og_values)
df = df[mask_og].copy()
if df.empty:
print(f"❌ Нет данных после фильтрации по ОГ")
return []
# Шаг 4: Удаление строк без "Вид простоя"
if "Вид простоя" in df.columns:
downtime_clean = df["Вид простоя"].astype(str).str.strip()
mask_downtime = (downtime_clean != "") & (downtime_clean != "nan")
df = df[mask_downtime].copy()
else:
print("⚠️ Предупреждение: Колонка 'Вид простоя' не найдена.")
return []
# Шаг 5: Удаление ненужных колонок
cols_to_drop = []
for col in df.columns:
if col.strip().lower() in ["п/п", "пп", "п.п.", ""]:
cols_to_drop.append(col)
elif "НАЛИЧИЕ ПОДРЯДЧИКА" in col.upper() and "ОСНОВНЫЕ РАБОТЫ" in col.upper():
cols_to_drop.append(col)
df.drop(columns=list(set(cols_to_drop)), inplace=True, errors='ignore')
# Шаг 6: Переименование первых 8 колонок по порядку
if df.shape[1] < 8:
print(f"⚠️ Внимание: В DataFrame только {df.shape[1]} колонок, требуется минимум 8.")
return []
new_names = ["id", "name", "type", "start_date", "end_date", "plan", "fact", "downtime"]
# Сохраняем оставшиеся колонки (если больше 8)
remaining_cols = df.columns[8:].tolist() # Все, что после 8-й
renamed_cols = new_names + remaining_cols
df.columns = renamed_cols
# меняем прочерки на null
df = df.replace("-", None)
# Сброс индекса
df.reset_index(drop=True, inplace=True)
# Шаг 7: Преобразование в список словарей
result_data = []
for _, row in df.iterrows():
try:
# Извлекаем основные поля (теперь с правильными именами)
og_id = row.get('id')
name = row.get('name', '')
repair_type = row.get('type', '')
# Обрабатываем даты
start_date = self._parse_date(row.get('start_date'))
end_date = self._parse_date(row.get('end_date'))
# Обрабатываем числовые значения
plan = self._parse_numeric(row.get('plan'))
fact = self._parse_numeric(row.get('fact'))
downtime = self._parse_downtime(row.get('downtime'))
# Создаем запись
record = {
"id": og_id,
"name": str(name) if pd.notna(name) else "",
"type": str(repair_type) if pd.notna(repair_type) else "",
"start_date": start_date,
"end_date": end_date,
"plan": plan,
"fact": fact,
"downtime": downtime
}
result_data.append(record)
except Exception as e:
print(f"⚠️ Ошибка при обработке строки: {e}")
continue
print(f"✅ Обработано {len(result_data)} записей")
return result_data
def _parse_date(self, value) -> Optional[str]:
"""Парсинг даты"""
if pd.isna(value) or value is None:
return None
try:
if isinstance(value, str):
# Пытаемся преобразовать строку в дату
date_obj = pd.to_datetime(value, errors='coerce')
if pd.notna(date_obj):
return date_obj.strftime('%Y-%m-%d %H:%M:%S')
elif hasattr(value, 'strftime'):
# Это уже объект даты
return value.strftime('%Y-%m-%d %H:%M:%S')
return None
except Exception:
return None
def _parse_numeric(self, value) -> Optional[float]:
"""Парсинг числового значения"""
if pd.isna(value) or value is None:
return None
try:
if isinstance(value, (int, float)):
return float(value)
elif isinstance(value, str):
# Заменяем запятую на точку для русских чисел
cleaned = value.replace(',', '.').strip()
return float(cleaned) if cleaned else None
return None
except (ValueError, TypeError):
return None
def _parse_downtime(self, value) -> Optional[str]:
"""Парсинг данных о простое"""
if pd.isna(value) or value is None:
return None
return str(value).strip() if str(value).strip() else None

View File

@@ -6,7 +6,7 @@ from fastapi import FastAPI, File, UploadFile, HTTPException, status
from fastapi.responses import JSONResponse
from adapters.storage import MinIOStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser, SvodkaRepairCAParser, StatusesRepairCAParser
from core.models import UploadRequest, DataRequest
from core.services import ReportService, PARSERS
@@ -18,6 +18,8 @@ from app.schemas import (
SvodkaCARequest,
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
)
from app.schemas.svodka_repair_ca import SvodkaRepairCARequest
from app.schemas.statuses_repair_ca import StatusesRepairCARequest
# Парсеры
@@ -25,6 +27,8 @@ PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
'svodka_repair_ca': SvodkaRepairCAParser,
'statuses_repair_ca': StatusesRepairCAParser,
# 'svodka_plan_sarnpz': SvodkaPlanSarnpzParser,
})
@@ -80,22 +84,69 @@ async def root():
description="Возвращает список идентификаторов всех доступных парсеров",
response_model=Dict[str, List[str]],
responses={
200: {
"content": {
"application/json": {
"example": {
"parsers": ["monitoring_fuel", "svodka_ca", "svodka_pm"]
}
}
}
}
},)
200: {
"content": {
"application/json": {
"example": {
"parsers": ["monitoring_fuel", "svodka_ca", "svodka_pm"]
}
}
}
}
},)
async def get_available_parsers():
"""Получение списка доступных парсеров"""
parsers = list(PARSERS.keys())
return {"parsers": parsers}
@app.get("/parsers/{parser_name}/available_ogs", tags=["Общее"],
summary="Доступные ОГ для парсера",
description="Возвращает список доступных ОГ для указанного парсера",
responses={
200: {
"content": {
"application/json": {
"example": {
"parser": "svodka_repair_ca",
"available_ogs": ["KNPZ", "ANHK", "SNPZ", "BASH"]
}
}
}
}
},)
async def get_available_ogs(parser_name: str):
"""Получение списка доступных ОГ для парсера"""
if parser_name not in PARSERS:
raise HTTPException(status_code=404, detail=f"Парсер '{parser_name}' не найден")
parser_class = PARSERS[parser_name]
# Для svodka_repair_ca возвращаем ОГ из загруженных данных
if parser_name == "svodka_repair_ca":
try:
# Создаем экземпляр сервиса и загружаем данные из MinIO
report_service = get_report_service()
from core.models import DataRequest
data_request = DataRequest(report_type=parser_name, get_params={})
loaded_data = report_service.get_data(data_request)
# Если данные загружены, извлекаем ОГ из них
if loaded_data is not None and hasattr(loaded_data, 'data') and loaded_data.data is not None:
# Для svodka_repair_ca данные возвращаются в формате словаря по ОГ
data_value = loaded_data.data.get('value')
if isinstance(data_value, dict):
available_ogs = list(data_value.keys())
return {"parser": parser_name, "available_ogs": available_ogs}
except Exception as e:
print(f"⚠️ Ошибка при получении ОГ: {e}")
import traceback
traceback.print_exc()
# Для других парсеров или если нет данных возвращаем статический список из pconfig
from adapters.pconfig import SINGLE_OGS
return {"parser": parser_name, "available_ogs": SINGLE_OGS}
@app.get("/parsers/{parser_name}/getters", tags=["Общее"],
summary="Информация о геттерах парсера",
description="Возвращает информацию о доступных геттерах для указанного парсера",
@@ -556,6 +607,246 @@ async def get_svodka_ca_data(
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_repair_ca/upload", tags=[SvodkaRepairCAParser.name],
summary="Загрузка файла отчета сводки ремонта СА",
response_model=UploadResponse,
responses={
400: {"model": UploadErrorResponse, "description": "Неверный формат файла"},
500: {"model": UploadErrorResponse, "description": "Внутренняя ошибка сервера"}
},)
async def upload_svodka_repair_ca(
file: UploadFile = File(..., description="Excel файл или ZIP архив сводки ремонта СА (.xlsx, .xlsm, .xls, .zip)")
):
"""
Загрузка и обработка Excel файла или ZIP архива отчета сводки ремонта СА
**Поддерживаемые форматы:**
- Excel (.xlsx, .xlsm, .xls)
- ZIP архив (.zip)
"""
report_service = get_report_service()
try:
# Проверяем тип файла
if not file.filename.lower().endswith(('.xlsx', '.xlsm', '.xls', '.zip')):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=UploadErrorResponse(
message="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls) или ZIP архивы (.zip)",
error_code="INVALID_FILE_TYPE",
details={
"expected_formats": [".xlsx", ".xlsm", ".xls", ".zip"],
"received_format": file.filename.split('.')[-1] if '.' in file.filename else "unknown"
}
).model_dump()
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос
request = UploadRequest(
report_type='svodka_repair_ca',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(request)
if result.success:
return UploadResponse(
success=True,
message=result.message,
object_id=result.object_id
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=result.message,
error_code="ERR_UPLOAD"
).model_dump(),
)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=f"Внутренняя ошибка сервера: {str(e)}",
error_code="INTERNAL_SERVER_ERROR"
).model_dump()
)
@app.post("/svodka_repair_ca/get_data", tags=[SvodkaRepairCAParser.name],
summary="Получение данных из отчета сводки ремонта СА")
async def get_svodka_repair_ca_data(
request_data: SvodkaRepairCARequest
):
"""
Получение данных из отчета сводки ремонта СА
### Структура параметров:
- `og_ids`: **Массив ID ОГ** для фильтрации (опциональный)
- `repair_types`: **Массив типов ремонта** - `КР`, `КП`, `ТР` (опциональный)
- `include_planned`: **Включать плановые данные** (по умолчанию true)
- `include_factual`: **Включать фактические данные** (по умолчанию true)
### Пример тела запроса:
```json
{
"og_ids": ["SNPZ", "KNPZ"],
"repair_types": ["КР", "КП"],
"include_planned": true,
"include_factual": true
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='svodka_repair_ca',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/statuses_repair_ca/upload", tags=[StatusesRepairCAParser.name],
summary="Загрузка отчета статусов ремонта СА")
async def upload_statuses_repair_ca(
file: UploadFile = File(...)
):
"""
Загрузка отчета статусов ремонта СА
### Поддерживаемые форматы:
- **Excel файлы**: `.xlsx`, `.xlsm`, `.xls`
- **ZIP архивы**: `.zip` (содержащие Excel файлы)
### Пример использования:
```bash
curl -X POST "http://localhost:8000/statuses_repair_ca/upload" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "file=@statuses_repair_ca.xlsx"
```
"""
report_service = get_report_service()
try:
# Проверяем тип файла
if not file.filename.endswith(('.xlsx', '.xlsm', '.xls', '.zip')):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls) или архивы (.zip)"
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос на загрузку
upload_request = UploadRequest(
report_type='statuses_repair_ca',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(upload_request)
if result.success:
return UploadResponse(
success=True,
message="Отчет успешно загружен и обработан",
report_id=result.object_id,
filename=file.filename
).model_dump()
else:
return UploadErrorResponse(
success=False,
message=result.message,
error_code="ERR_UPLOAD",
details=None
).model_dump()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/statuses_repair_ca/get_data", tags=[StatusesRepairCAParser.name],
summary="Получение данных из отчета статусов ремонта СА")
async def get_statuses_repair_ca_data(
request_data: StatusesRepairCARequest
):
"""
Получение данных из отчета статусов ремонта СА
### Структура параметров:
- `ids`: **Массив ID ОГ** для фильтрации (опциональный)
- `keys`: **Массив ключей** для извлечения данных (опциональный)
### Пример тела запроса:
```json
{
"ids": ["SNPZ", "KNPZ", "ANHK"],
"keys": [
["Дата начала ремонта"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"]
]
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='statuses_repair_ca',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload", tags=[MonitoringFuelParser.name])
# async def upload_monitoring_fuel(
# file: UploadFile = File(...),

View File

@@ -0,0 +1,34 @@
from pydantic import BaseModel, Field
from typing import List, Optional, Union
from enum import Enum
class StatusesRepairCARequest(BaseModel):
ids: Optional[List[str]] = Field(
None,
description="Массив ID ОГ для фильтрации (например, ['SNPZ', 'KNPZ'])",
example=["SNPZ", "KNPZ", "ANHK"]
)
keys: Optional[List[List[str]]] = Field(
None,
description="Массив ключей для извлечения данных (например, [['Дата начала ремонта'], ['Готовность к КР', 'Факт']])",
example=[
["Дата начала ремонта"],
["Отставание / опережение подготовки к КР", "Отставание / опережение"],
["Отставание / опережение подготовки к КР", "Динамика за прошедшую неделю"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"],
["Поставка МТР", "На складе, позиций", "%"]
]
)
class Config:
json_schema_extra = {
"example": {
"ids": ["SNPZ", "KNPZ", "ANHK"],
"keys": [
["Дата начала ремонта"],
["Готовность к КР", "Факт"],
["Заключение договоров на СМР", "Договор", "%"]
]
}
}

View File

@@ -0,0 +1,46 @@
from pydantic import BaseModel, Field
from typing import List, Optional
from enum import Enum
class RepairType(str, Enum):
"""Типы ремонтных работ"""
KR = "КР" # Капитальный ремонт
KP = "КП" # Капитальный ремонт
TR = "ТР" # Текущий ремонт
class SvodkaRepairCARequest(BaseModel):
"""Запрос на получение данных сводки ремонта СА"""
og_ids: Optional[List[str]] = Field(
default=None,
description="Список ID ОГ для фильтрации. Если не указан, возвращаются данные по всем ОГ",
example=["SNPZ", "KNPZ", "BASH"]
)
repair_types: Optional[List[RepairType]] = Field(
default=None,
description="Список типов ремонта для фильтрации. Если не указан, возвращаются все типы",
example=[RepairType.KR, RepairType.KP]
)
include_planned: bool = Field(
default=True,
description="Включать ли плановые данные"
)
include_factual: bool = Field(
default=True,
description="Включать ли фактические данные"
)
class Config:
json_schema_extra = {
"example": {
"og_ids": ["SNPZ", "KNPZ"],
"repair_types": ["КР", "КП"],
"include_planned": True,
"include_factual": True
}
}

View File

@@ -136,6 +136,12 @@ class ReportService:
if request.report_type == 'svodka_ca':
# Для svodka_ca используем геттер get_ca_data
getter_name = 'get_ca_data'
elif request.report_type == 'svodka_repair_ca':
# Для svodka_repair_ca используем геттер get_repair_data
getter_name = 'get_repair_data'
elif request.report_type == 'statuses_repair_ca':
# Для statuses_repair_ca используем геттер get_repair_statuses
getter_name = 'get_repair_statuses'
elif request.report_type == 'monitoring_fuel':
# Для monitoring_fuel определяем геттер из параметра mode
getter_name = get_params.pop("mode", None)

View File

@@ -4,7 +4,7 @@ import json
import pandas as pd
import io
import zipfile
from typing import Dict, Any
from typing import Dict, Any, List
import os
# Конфигурация страницы
@@ -50,7 +50,12 @@ def get_server_info():
def upload_file_to_api(endpoint: str, file_data: bytes, filename: str):
"""Загрузка файла на API"""
try:
files = {"zip_file": (filename, file_data, "application/zip")}
# Определяем правильное имя поля в зависимости от эндпоинта
if "zip" in endpoint:
files = {"zip_file": (filename, file_data, "application/zip")}
else:
files = {"file": (filename, file_data, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")}
response = requests.post(f"{API_BASE_URL}{endpoint}", files=files)
return response.json(), response.status_code
except Exception as e:
@@ -64,6 +69,20 @@ def make_api_request(endpoint: str, data: Dict[str, Any]):
except Exception as e:
return {"error": str(e)}, 500
def get_available_ogs(parser_name: str) -> List[str]:
"""Получение доступных ОГ для парсера"""
try:
response = requests.get(f"{API_BASE_URL}/parsers/{parser_name}/available_ogs")
if response.status_code == 200:
data = response.json()
return data.get("available_ogs", [])
else:
print(f"⚠️ Ошибка получения ОГ: {response.status_code}")
return []
except Exception as e:
print(f"⚠️ Ошибка при запросе ОГ: {e}")
return []
def main():
st.title("🚀 NIN Excel Parsers API - Демонстрация")
st.markdown("---")
@@ -96,10 +115,12 @@ def main():
st.write(f"{parser}")
# Основные вкладки - по одной на каждый парсер
tab1, tab2, tab3 = st.tabs([
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"📊 Сводки ПМ",
"🏭 Сводки СА",
"⛽ Мониторинг топлива"
"⛽ Мониторинг топлива",
"🔧 Ремонт СА",
"📋 Статусы ремонта СА"
])
# Вкладка 1: Сводки ПМ - полный функционал
@@ -371,6 +392,247 @@ def main():
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Вкладка 4: Ремонт СА
with tab4:
st.header("🔧 Ремонт СА - Управление ремонтными работами")
# Секция загрузки файлов
st.subheader("📤 Загрузка файлов")
uploaded_file = st.file_uploader(
"Выберите Excel файл или ZIP архив с данными о ремонте СА",
type=['xlsx', 'xlsm', 'xls', 'zip'],
key="repair_ca_upload"
)
if uploaded_file is not None:
if st.button("📤 Загрузить файл", key="repair_ca_upload_btn"):
with st.spinner("Загружаю файл..."):
file_data = uploaded_file.read()
result, status = upload_file_to_api("/svodka_repair_ca/upload", file_data, uploaded_file.name)
if status == 200:
st.success("✅ Файл успешно загружен")
st.json(result)
else:
st.error(f"❌ Ошибка загрузки: {result.get('message', 'Неизвестная ошибка')}")
st.markdown("---")
# Секция получения данных
st.subheader("🔍 Получение данных")
col1, col2 = st.columns(2)
with col1:
st.subheader("Фильтры")
# Получаем доступные ОГ динамически
available_ogs = get_available_ogs("svodka_repair_ca")
# Фильтр по ОГ
og_ids = st.multiselect(
"Выберите ОГ (оставьте пустым для всех)",
available_ogs if available_ogs else ["KNPZ", "ANHK", "SNPZ", "BASH", "UNH", "NOV"], # fallback
key="repair_ca_og_ids"
)
# Фильтр по типам ремонта
repair_types = st.multiselect(
"Выберите типы ремонта (оставьте пустым для всех)",
["КР", "КП", "ТР"],
key="repair_ca_types"
)
# Включение плановых/фактических данных
include_planned = st.checkbox("Включать плановые данные", value=True, key="repair_ca_planned")
include_factual = st.checkbox("Включать фактические данные", value=True, key="repair_ca_factual")
with col2:
st.subheader("Действия")
if st.button("🔍 Получить данные о ремонте", key="repair_ca_get_btn"):
with st.spinner("Получаю данные..."):
data = {
"include_planned": include_planned,
"include_factual": include_factual
}
# Добавляем фильтры только если они выбраны
if og_ids:
data["og_ids"] = og_ids
if repair_types:
data["repair_types"] = repair_types
result, status = make_api_request("/svodka_repair_ca/get_data", data)
if status == 200:
st.success("✅ Данные получены")
# Отображаем данные в виде таблицы, если возможно
if result.get("data") and isinstance(result["data"], list):
df_data = []
for item in result["data"]:
df_data.append({
"ID ОГ": item.get("id", ""),
"Наименование": item.get("name", ""),
"Тип ремонта": item.get("type", ""),
"Дата начала": item.get("start_date", ""),
"Дата окончания": item.get("end_date", ""),
"План": item.get("plan", ""),
"Факт": item.get("fact", ""),
"Простой": item.get("downtime", "")
})
if df_data:
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True)
else:
st.info("📋 Нет данных для отображения")
else:
st.json(result)
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Вкладка 5: Статусы ремонта СА
with tab5:
st.header("📋 Статусы ремонта СА")
# Секция загрузки файлов
st.subheader("📤 Загрузка файлов")
uploaded_file = st.file_uploader(
"Выберите файл статусов ремонта СА",
type=['xlsx', 'xlsm', 'xls', 'zip'],
key="statuses_repair_ca_upload"
)
if uploaded_file is not None:
if st.button("📤 Загрузить файл", key="statuses_repair_ca_upload_btn"):
with st.spinner("Загружаем файл..."):
file_data = uploaded_file.read()
result, status_code = upload_file_to_api("/statuses_repair_ca/upload", file_data, uploaded_file.name)
if status_code == 200:
st.success("✅ Файл успешно загружен!")
st.json(result)
else:
st.error(f"❌ Ошибка загрузки: {result}")
# Секция получения данных
st.subheader("📊 Получение данных")
# Получаем доступные ОГ динамически
available_ogs = get_available_ogs("statuses_repair_ca")
# Фильтр по ОГ
og_ids = st.multiselect(
"Выберите ОГ (оставьте пустым для всех)",
available_ogs if available_ogs else ["KNPZ", "ANHK", "SNPZ", "BASH", "UNH", "NOV"], # fallback
key="statuses_repair_ca_og_ids"
)
# Предустановленные ключи для извлечения
st.subheader("🔑 Ключи для извлечения данных")
# Основные ключи
include_basic_keys = st.checkbox("Основные данные", value=True, key="statuses_basic_keys")
include_readiness_keys = st.checkbox("Готовность к КР", value=True, key="statuses_readiness_keys")
include_contract_keys = st.checkbox("Заключение договоров", value=True, key="statuses_contract_keys")
include_supply_keys = st.checkbox("Поставка МТР", value=True, key="statuses_supply_keys")
# Формируем ключи на основе выбора
keys = []
if include_basic_keys:
keys.append(["Дата начала ремонта"])
keys.append(["Отставание / опережение подготовки к КР", "Отставание / опережение"])
keys.append(["Отставание / опережение подготовки к КР", "Динамика за прошедшую неделю"])
if include_readiness_keys:
keys.append(["Готовность к КР", "Факт"])
if include_contract_keys:
keys.append(["Заключение договоров на СМР", "Договор", "%"])
if include_supply_keys:
keys.append(["Поставка МТР", "На складе, позиций", "%"])
# Кнопка получения данных
if st.button("📊 Получить данные", key="statuses_repair_ca_get_data_btn"):
if not keys:
st.warning("⚠️ Выберите хотя бы одну группу ключей для извлечения")
else:
with st.spinner("Получаем данные..."):
request_data = {
"ids": og_ids if og_ids else None,
"keys": keys
}
result, status_code = make_api_request("/statuses_repair_ca/get_data", request_data)
if status_code == 200 and result.get("success"):
st.success("✅ Данные успешно получены!")
data = result.get("data", {}).get("value", [])
if data:
# Отображаем данные в виде таблицы
if isinstance(data, list) and len(data) > 0:
# Преобразуем в DataFrame для лучшего отображения
df_data = []
for item in data:
row = {
"ID": item.get("id", ""),
"Название": item.get("name", ""),
}
# Добавляем основные поля
if "Дата начала ремонта" in item:
row["Дата начала ремонта"] = item["Дата начала ремонта"]
# Добавляем готовность к КР
if "Готовность к КР" in item:
readiness = item["Готовность к КР"]
if isinstance(readiness, dict) and "Факт" in readiness:
row["Готовность к КР (Факт)"] = readiness["Факт"]
# Добавляем отставание/опережение
if "Отставание / опережение подготовки к КР" in item:
delay = item["Отставание / опережение подготовки к КР"]
if isinstance(delay, dict):
if "Отставание / опережение" in delay:
row["Отставание/опережение"] = delay["Отставание / опережение"]
if "Динамика за прошедшую неделю" in delay:
row["Динамика за неделю"] = delay["Динамика за прошедшую неделю"]
# Добавляем договоры
if "Заключение договоров на СМР" in item:
contracts = item["Заключение договоров на СМР"]
if isinstance(contracts, dict) and "Договор" in contracts:
contract = contracts["Договор"]
if isinstance(contract, dict) and "%" in contract:
row["Договоры (%)"] = contract["%"]
# Добавляем поставки МТР
if "Поставка МТР" in item:
supply = item["Поставка МТР"]
if isinstance(supply, dict) and "На складе, позиций" in supply:
warehouse = supply["На складе, позиций"]
if isinstance(warehouse, dict) and "%" in warehouse:
row["МТР на складе (%)"] = warehouse["%"]
df_data.append(row)
if df_data:
df = pd.DataFrame(df_data)
st.dataframe(df, use_container_width=True)
else:
st.info("📋 Нет данных для отображения")
else:
st.json(result)
else:
st.info("📋 Нет данных для отображения")
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
# Футер
st.markdown("---")
st.markdown("### 📚 Документация API")
@@ -385,6 +647,8 @@ def main():
- 📊 Парсинг сводок ПМ (план и факт)
- 🏭 Парсинг сводок СА
- ⛽ Мониторинг топлива
- 🔧 Управление ремонтными работами СА
- 📋 Мониторинг статусов ремонта СА
**Технологии:**
- FastAPI