381 lines
17 KiB
Python
381 lines
17 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
import os
|
||
import tempfile
|
||
import shutil
|
||
import zipfile
|
||
import logging
|
||
from typing import Dict, List, Optional, Any
|
||
|
||
from core.ports import ParserPort
|
||
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||
from app.schemas.svodka_repair_ca import SvodkaRepairCARequest
|
||
from adapters.pconfig import SINGLE_OGS, find_header_row, get_og_by_name
|
||
|
||
# Настройка логгера для модуля
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class SvodkaRepairCAParser(ParserPort):
|
||
"""Парсер для сводок ремонта СА"""
|
||
|
||
name = "Сводки ремонта СА"
|
||
|
||
def _register_default_getters(self):
|
||
"""Регистрация геттеров по умолчанию"""
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="get_repair_data",
|
||
method=self._get_repair_data_wrapper,
|
||
schema_class=SvodkaRepairCARequest,
|
||
description="Получение данных о ремонтных работах"
|
||
)
|
||
|
||
def _get_repair_data_wrapper(self, params: dict):
|
||
"""Получение данных о ремонтных работах"""
|
||
logger.debug(f"🔍 _get_repair_data_wrapper вызван с параметрами: {params}")
|
||
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, SvodkaRepairCARequest)
|
||
|
||
og_ids = validated_params.get("og_ids")
|
||
repair_types = validated_params.get("repair_types")
|
||
include_planned = validated_params.get("include_planned", True)
|
||
include_factual = validated_params.get("include_factual", True)
|
||
|
||
logger.debug(f"🔍 Запрошенные ОГ: {og_ids}")
|
||
logger.debug(f"🔍 Запрошенные типы ремонта: {repair_types}")
|
||
logger.debug(f"🔍 Включать плановые: {include_planned}, фактические: {include_factual}")
|
||
|
||
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
|
||
if hasattr(self, 'data_dict') and self.data_dict is not None:
|
||
# Данные из парсинга
|
||
data_source = self.data_dict
|
||
logger.debug(f"🔍 Используем data_dict с {len(data_source)} записями")
|
||
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
|
||
# Данные из загрузки - преобразуем DataFrame обратно в словарь
|
||
data_source = self._df_to_data_dict()
|
||
logger.debug(f"🔍 Используем df, преобразованный в data_dict с {len(data_source)} записями")
|
||
else:
|
||
logger.warning(f"🔍 Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}")
|
||
return []
|
||
|
||
# Группируем данные по ОГ (как в оригинале)
|
||
grouped_data = {}
|
||
|
||
for item in data_source:
|
||
og_id = item.get('id')
|
||
if not og_id:
|
||
continue
|
||
|
||
# Проверяем фильтры
|
||
if og_ids is not None and og_id not in og_ids:
|
||
continue
|
||
if repair_types is not None and item.get('type') not in repair_types:
|
||
continue
|
||
|
||
# Фильтрация по плановым/фактическим данным
|
||
filtered_item = item.copy()
|
||
if not include_planned:
|
||
filtered_item.pop('plan', None)
|
||
if not include_factual:
|
||
filtered_item.pop('fact', None)
|
||
|
||
# Убираем поле 'id' из записи, так как оно уже в ключе
|
||
filtered_item.pop('id', None)
|
||
|
||
# Добавляем в группу по ОГ
|
||
if og_id not in grouped_data:
|
||
grouped_data[og_id] = []
|
||
grouped_data[og_id].append(filtered_item)
|
||
|
||
total_records = sum(len(v) for v in grouped_data.values())
|
||
logger.debug(f"🔍 Отфильтровано {total_records} записей из {len(data_source)}")
|
||
logger.debug(f"🔍 Группировано по {len(grouped_data)} ОГ: {list(grouped_data.keys())}")
|
||
return grouped_data
|
||
|
||
def _df_to_data_dict(self):
|
||
"""Преобразование DataFrame обратно в словарь данных"""
|
||
if not hasattr(self, 'df') or self.df is None or self.df.empty:
|
||
return []
|
||
|
||
# Если df содержит данные в формате списка записей
|
||
if 'data' in self.df.columns:
|
||
# Извлекаем данные из колонки 'data'
|
||
all_data = []
|
||
for _, row in self.df.iterrows():
|
||
data = row.get('data')
|
||
if data and isinstance(data, list):
|
||
all_data.extend(data)
|
||
return all_data
|
||
|
||
return []
|
||
|
||
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||
"""Парсинг файла и возврат DataFrame"""
|
||
logger.debug(f"🔍 SvodkaRepairCAParser.parse вызван с файлом: {file_path}")
|
||
|
||
# Определяем, это ZIP архив или одиночный файл
|
||
if file_path.lower().endswith('.zip'):
|
||
# Обрабатываем ZIP архив
|
||
self.data_dict = self._parse_zip_archive(file_path, params)
|
||
else:
|
||
# Обрабатываем одиночный файл
|
||
self.data_dict = self._parse_single_file(file_path, params)
|
||
|
||
# Преобразуем словарь в DataFrame для совместимости с services.py
|
||
if self.data_dict:
|
||
# Создаем DataFrame с информацией о загруженных данных
|
||
data_rows = []
|
||
for i, item in enumerate(self.data_dict):
|
||
data_rows.append({
|
||
'index': i,
|
||
'data': [item], # Обертываем в список для совместимости
|
||
'records_count': 1
|
||
})
|
||
|
||
if data_rows:
|
||
df = pd.DataFrame(data_rows)
|
||
self.df = df
|
||
logger.debug(f"🔍 Создан DataFrame с {len(data_rows)} записями")
|
||
return df
|
||
|
||
# Если данных нет, возвращаем пустой DataFrame
|
||
self.df = pd.DataFrame()
|
||
logger.debug(f"🔍 Возвращаем пустой DataFrame")
|
||
return self.df
|
||
|
||
def _parse_zip_archive(self, file_path: str, params: dict) -> List[Dict]:
|
||
"""Парсинг ZIP архива с файлами ремонта СА"""
|
||
logger.info(f"🔍 Парсинг ZIP архива: {file_path}")
|
||
|
||
all_data = []
|
||
temp_dir = None
|
||
|
||
try:
|
||
# Создаем временную директорию
|
||
temp_dir = tempfile.mkdtemp()
|
||
logger.debug(f"📦 Архив разархивирован в: {temp_dir}")
|
||
|
||
# Разархивируем файл
|
||
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
||
zip_ref.extractall(temp_dir)
|
||
|
||
# Ищем Excel файлы в архиве
|
||
excel_files = []
|
||
for root, dirs, files in os.walk(temp_dir):
|
||
for file in files:
|
||
if file.lower().endswith(('.xlsx', '.xlsm', '.xls')):
|
||
excel_files.append(os.path.join(root, file))
|
||
|
||
logger.info(f"📊 Найдено Excel файлов: {len(excel_files)}")
|
||
|
||
# Обрабатываем каждый найденный файл
|
||
for excel_file in excel_files:
|
||
logger.info(f"📊 Обработка файла: {excel_file}")
|
||
file_data = self._parse_single_file(excel_file, params)
|
||
if file_data:
|
||
all_data.extend(file_data)
|
||
|
||
logger.info(f"🎯 Всего обработано записей: {len(all_data)}")
|
||
return all_data
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при обработке ZIP архива: {e}")
|
||
return []
|
||
finally:
|
||
# Удаляем временную директорию
|
||
if temp_dir:
|
||
shutil.rmtree(temp_dir, ignore_errors=True)
|
||
logger.debug(f"🗑️ Временная директория удалена: {temp_dir}")
|
||
|
||
def _parse_single_file(self, file_path: str, params: dict) -> List[Dict]:
|
||
"""Парсинг одиночного Excel файла"""
|
||
logger.debug(f"🔍 Парсинг файла: {file_path}")
|
||
|
||
try:
|
||
# Получаем параметры
|
||
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
|
||
header_num = params.get('header_num', None)
|
||
|
||
# Автоопределение header_num, если не передан
|
||
if header_num is None:
|
||
header_num = find_header_row(file_path, sheet_name, search_value="ОГ")
|
||
if header_num is None:
|
||
logger.error(f"❌ Не найден заголовок в файле {file_path}")
|
||
return []
|
||
|
||
logger.debug(f"🔍 Заголовок найден в строке {header_num}")
|
||
|
||
# Читаем Excel файл
|
||
df = pd.read_excel(
|
||
file_path,
|
||
sheet_name=sheet_name,
|
||
header=header_num,
|
||
usecols=None,
|
||
index_col=None
|
||
)
|
||
|
||
if df.empty:
|
||
logger.error(f"❌ Файл {file_path} пуст")
|
||
return []
|
||
|
||
if "ОГ" not in df.columns:
|
||
logger.warning(f"⚠️ Предупреждение: Колонка 'ОГ' не найдена в файле {file_path}")
|
||
return []
|
||
|
||
# Обрабатываем данные
|
||
return self._process_repair_data(df)
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при парсинге файла {file_path}: {e}")
|
||
return []
|
||
|
||
def _process_repair_data(self, df: pd.DataFrame) -> List[Dict]:
|
||
"""Обработка данных о ремонте"""
|
||
logger.debug(f"🔍 Обработка данных с {len(df)} строками")
|
||
|
||
# Шаг 1: Нормализация ОГ
|
||
def safe_replace(val):
|
||
if pd.notna(val) and isinstance(val, str) and val.strip():
|
||
cleaned_val = val.strip()
|
||
result = get_og_by_name(cleaned_val)
|
||
if result and pd.notna(result) and result != "" and result != "UNKNOWN":
|
||
return result
|
||
return val
|
||
|
||
df["ОГ"] = df["ОГ"].apply(safe_replace)
|
||
|
||
# Шаг 2: Приведение к NA и forward fill
|
||
og_series = df["ОГ"].map(
|
||
lambda x: pd.NA if (isinstance(x, str) and x.strip() == "") or pd.isna(x) else x
|
||
)
|
||
df["ОГ"] = og_series.ffill()
|
||
|
||
# Шаг 3: Фильтрация по валидным ОГ
|
||
valid_og_values = set(SINGLE_OGS)
|
||
mask_og = df["ОГ"].notna() & df["ОГ"].isin(valid_og_values)
|
||
df = df[mask_og].copy()
|
||
|
||
if df.empty:
|
||
logger.info(f"❌ Нет данных после фильтрации по ОГ")
|
||
return []
|
||
|
||
# Шаг 4: Удаление строк без "Вид простоя"
|
||
if "Вид простоя" in df.columns:
|
||
downtime_clean = df["Вид простоя"].astype(str).str.strip()
|
||
mask_downtime = (downtime_clean != "") & (downtime_clean != "nan")
|
||
df = df[mask_downtime].copy()
|
||
else:
|
||
logger.info("⚠️ Предупреждение: Колонка 'Вид простоя' не найдена.")
|
||
return []
|
||
|
||
# Шаг 5: Удаление ненужных колонок
|
||
cols_to_drop = []
|
||
for col in df.columns:
|
||
if col.strip().lower() in ["п/п", "пп", "п.п.", "№"]:
|
||
cols_to_drop.append(col)
|
||
elif "НАЛИЧИЕ ПОДРЯДЧИКА" in col.upper() and "ОСНОВНЫЕ РАБОТЫ" in col.upper():
|
||
cols_to_drop.append(col)
|
||
|
||
df.drop(columns=list(set(cols_to_drop)), inplace=True, errors='ignore')
|
||
|
||
# Шаг 6: Переименование первых 8 колонок по порядку
|
||
if df.shape[1] < 8:
|
||
logger.info(f"⚠️ Внимание: В DataFrame только {df.shape[1]} колонок, требуется минимум 8.")
|
||
return []
|
||
|
||
new_names = ["id", "name", "type", "start_date", "end_date", "plan", "fact", "downtime"]
|
||
|
||
# Сохраняем оставшиеся колонки (если больше 8)
|
||
remaining_cols = df.columns[8:].tolist() # Все, что после 8-й
|
||
renamed_cols = new_names + remaining_cols
|
||
df.columns = renamed_cols
|
||
|
||
# меняем прочерки на null
|
||
df = df.replace("-", None)
|
||
|
||
# Сброс индекса
|
||
df.reset_index(drop=True, inplace=True)
|
||
|
||
# Шаг 7: Преобразование в список словарей
|
||
result_data = []
|
||
|
||
for _, row in df.iterrows():
|
||
try:
|
||
# Извлекаем основные поля (теперь с правильными именами)
|
||
og_id = row.get('id')
|
||
name = row.get('name', '')
|
||
repair_type = row.get('type', '')
|
||
|
||
# Обрабатываем даты
|
||
start_date = self._parse_date(row.get('start_date'))
|
||
end_date = self._parse_date(row.get('end_date'))
|
||
|
||
# Обрабатываем числовые значения
|
||
plan = self._parse_numeric(row.get('plan'))
|
||
fact = self._parse_numeric(row.get('fact'))
|
||
downtime = self._parse_downtime(row.get('downtime'))
|
||
|
||
# Создаем запись
|
||
record = {
|
||
"id": og_id,
|
||
"name": str(name) if pd.notna(name) else "",
|
||
"type": str(repair_type) if pd.notna(repair_type) else "",
|
||
"start_date": start_date,
|
||
"end_date": end_date,
|
||
"plan": plan,
|
||
"fact": fact,
|
||
"downtime": downtime
|
||
}
|
||
|
||
result_data.append(record)
|
||
|
||
except Exception as e:
|
||
logger.info(f"⚠️ Ошибка при обработке строки: {e}")
|
||
continue
|
||
|
||
logger.info(f"✅ Обработано {len(result_data)} записей")
|
||
return result_data
|
||
|
||
def _parse_date(self, value) -> Optional[str]:
|
||
"""Парсинг даты"""
|
||
if pd.isna(value) or value is None:
|
||
return None
|
||
|
||
try:
|
||
if isinstance(value, str):
|
||
# Пытаемся преобразовать строку в дату
|
||
date_obj = pd.to_datetime(value, errors='coerce')
|
||
if pd.notna(date_obj):
|
||
return date_obj.strftime('%Y-%m-%d %H:%M:%S')
|
||
elif hasattr(value, 'strftime'):
|
||
# Это уже объект даты
|
||
return value.strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
return None
|
||
except Exception:
|
||
return None
|
||
|
||
def _parse_numeric(self, value) -> Optional[float]:
|
||
"""Парсинг числового значения"""
|
||
if pd.isna(value) or value is None:
|
||
return None
|
||
|
||
try:
|
||
if isinstance(value, (int, float)):
|
||
return float(value)
|
||
elif isinstance(value, str):
|
||
# Заменяем запятую на точку для русских чисел
|
||
cleaned = value.replace(',', '.').strip()
|
||
return float(cleaned) if cleaned else None
|
||
return None
|
||
except (ValueError, TypeError):
|
||
return None
|
||
|
||
def _parse_downtime(self, value) -> Optional[str]:
|
||
"""Парсинг данных о простое"""
|
||
if pd.isna(value) or value is None:
|
||
return None
|
||
|
||
return str(value).strip() if str(value).strip() else None |