285 lines
14 KiB
Python
285 lines
14 KiB
Python
import os
|
||
import tempfile
|
||
import zipfile
|
||
import pandas as pd
|
||
import logging
|
||
from typing import Dict, Any, List
|
||
from datetime import datetime
|
||
from core.ports import ParserPort
|
||
from adapters.pconfig import find_header_row, get_object_by_name, data_to_json
|
||
|
||
# Настройка логгера для модуля
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class OperSpravkaTechPosParser(ParserPort):
|
||
"""Парсер для операционных справок технологических позиций"""
|
||
|
||
name = "oper_spravka_tech_pos"
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.data_dict = {}
|
||
self.df = None
|
||
|
||
# Регистрируем геттер
|
||
self.register_getter('get_tech_pos', self._get_tech_pos_wrapper, required_params=['id'])
|
||
|
||
def parse(self, file_path: str, params: Dict[str, Any] = None) -> pd.DataFrame:
|
||
"""Парсит ZIP архив с файлами операционных справок технологических позиций"""
|
||
logger.debug(f"🔍 OperSpravkaTechPosParser.parse вызван с файлом: {file_path}")
|
||
|
||
if not file_path.endswith('.zip'):
|
||
raise ValueError("OperSpravkaTechPosParser поддерживает только ZIP архивы")
|
||
|
||
# Обрабатываем ZIP архив
|
||
result = self._parse_zip_archive(file_path)
|
||
|
||
# Конвертируем результат в DataFrame для совместимости с ReportService
|
||
if result:
|
||
data_list = []
|
||
for id, data in result.items():
|
||
if data is not None and not data.empty:
|
||
records = data.to_dict(orient='records')
|
||
data_list.append({
|
||
'id': id,
|
||
'data': records,
|
||
'records_count': len(records)
|
||
})
|
||
|
||
df = pd.DataFrame(data_list)
|
||
logger.debug(f"🔍 Создан DataFrame с {len(df)} записями")
|
||
return df
|
||
else:
|
||
logger.debug("🔍 Возвращаем пустой DataFrame")
|
||
return pd.DataFrame()
|
||
|
||
def _parse_zip_archive(self, zip_path: str) -> Dict[str, pd.DataFrame]:
|
||
"""Парсит ZIP архив с файлами операционных справок"""
|
||
logger.info(f"📦 Обработка ZIP архива: {zip_path}")
|
||
|
||
with tempfile.TemporaryDirectory() as temp_dir:
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
zip_ref.extractall(temp_dir)
|
||
|
||
# Ищем файлы операционных справок
|
||
tech_pos_files = []
|
||
for root, dirs, files in os.walk(temp_dir):
|
||
for file in files:
|
||
if (file.startswith('oper_spavka_tech_pos_') or
|
||
file.startswith('oper_spravka_tech_pos_')) and file.endswith(('.xlsx', '.xls', '.xlsm')):
|
||
tech_pos_files.append(os.path.join(root, file))
|
||
|
||
if not tech_pos_files:
|
||
raise ValueError("В архиве не найдены файлы операционных справок технологических позиций")
|
||
|
||
logger.info(f"📁 Найдено {len(tech_pos_files)} файлов операционных справок")
|
||
|
||
# Обрабатываем каждый файл
|
||
all_data = {}
|
||
for file_path in tech_pos_files:
|
||
logger.info(f"📁 Обработка файла: {file_path}")
|
||
|
||
# Извлекаем ID ОГ из имени файла
|
||
filename = os.path.basename(file_path)
|
||
og_id = self._extract_og_id_from_filename(filename)
|
||
logger.debug(f"🏭 ОГ ID: {og_id}")
|
||
|
||
# Парсим файл
|
||
file_data = self._parse_single_file(file_path)
|
||
if file_data:
|
||
all_data.update(file_data)
|
||
|
||
return all_data
|
||
|
||
def _extract_og_id_from_filename(self, filename: str) -> str:
|
||
"""Извлекает ID ОГ из имени файла"""
|
||
# Для файлов типа oper_spavka_tech_pos_SNPZ.xlsx
|
||
parts = filename.split('_')
|
||
if len(parts) >= 4:
|
||
og_id = parts[-1].split('.')[0] # Убираем расширение
|
||
return og_id
|
||
return "UNKNOWN"
|
||
|
||
def _parse_single_file(self, file_path: str) -> Dict[str, pd.DataFrame]:
|
||
"""Парсит один файл операционной справки"""
|
||
try:
|
||
# Находим актуальный лист
|
||
actual_sheet = self._find_actual_sheet_num(file_path)
|
||
logger.debug(f"📅 Актуальный лист: {actual_sheet}")
|
||
|
||
# Находим заголовок
|
||
header_row = self._find_header_row(file_path, actual_sheet)
|
||
logger.debug(f"📋 Заголовок найден в строке {header_row}")
|
||
|
||
# Парсим данные
|
||
df = self._parse_tech_pos_data(file_path, actual_sheet, header_row)
|
||
|
||
if df is not None and not df.empty:
|
||
# Извлекаем ID ОГ из имени файла
|
||
filename = os.path.basename(file_path)
|
||
og_id = self._extract_og_id_from_filename(filename)
|
||
return {og_id: df}
|
||
else:
|
||
logger.warning(f"⚠️ Нет данных в файле {file_path}")
|
||
return {}
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при обработке файла {file_path}: {e}")
|
||
return {}
|
||
|
||
def _find_actual_sheet_num(self, file_path: str) -> str:
|
||
"""Поиск номера актуального листа"""
|
||
current_day = datetime.now().day
|
||
current_month = datetime.now().month
|
||
|
||
actual_sheet = f"{current_day:02d}"
|
||
|
||
try:
|
||
# Читаем все листы от 1 до текущего дня
|
||
all_sheets = {}
|
||
for day in range(1, current_day + 1):
|
||
sheet_num = f"{day:02d}"
|
||
try:
|
||
df_temp = pd.read_excel(file_path, sheet_name=sheet_num, usecols=[1], nrows=2, header=None)
|
||
all_sheets[sheet_num] = df_temp
|
||
except:
|
||
continue
|
||
|
||
# Идем от текущего дня к 1
|
||
for day in range(current_day, 0, -1):
|
||
sheet_num = f"{day:02d}"
|
||
if sheet_num in all_sheets:
|
||
df_temp = all_sheets[sheet_num]
|
||
if df_temp.shape[0] > 1:
|
||
date_str = df_temp.iloc[1, 0] # B2
|
||
|
||
if pd.notna(date_str):
|
||
try:
|
||
date = pd.to_datetime(date_str)
|
||
# Проверяем совпадение месяца даты с текущим месяцем
|
||
if date.month == current_month:
|
||
actual_sheet = sheet_num
|
||
break
|
||
except:
|
||
continue
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ Ошибка при поиске актуального листа: {e}")
|
||
|
||
return actual_sheet
|
||
|
||
def _find_header_row(self, file_path: str, sheet_name: str, search_value: str = "Загрузка основных процессов") -> int:
|
||
"""Определение индекса заголовка в Excel по ключевому слову"""
|
||
try:
|
||
# Читаем первый столбец
|
||
df_temp = pd.read_excel(file_path, sheet_name=sheet_name, usecols=[0])
|
||
|
||
# Ищем строку с искомым значением
|
||
for idx, row in df_temp.iterrows():
|
||
if row.astype(str).str.contains(search_value, case=False, regex=False).any():
|
||
logger.debug(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
|
||
return idx + 1 # возвращаем индекс строки (0-based), который будет использован как `header=`
|
||
|
||
raise ValueError(f"Не найдена строка с заголовком '{search_value}'.")
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при поиске заголовка: {e}")
|
||
return 0
|
||
|
||
def _parse_tech_pos_data(self, file_path: str, sheet_name: str, header_row: int) -> pd.DataFrame:
|
||
"""Парсинг данных технологических позиций"""
|
||
try:
|
||
valid_processes = ['Первичная переработка', 'Гидроочистка топлив', 'Риформирование', 'Изомеризация']
|
||
|
||
df_temp = pd.read_excel(
|
||
file_path,
|
||
sheet_name=sheet_name,
|
||
header=header_row + 1, # Исправлено: добавляем +1 как в оригинале
|
||
usecols=range(1, 5)
|
||
)
|
||
|
||
logger.debug(f"🔍 Прочитано {len(df_temp)} строк из Excel")
|
||
logger.debug(f"🔍 Колонки: {list(df_temp.columns)}")
|
||
|
||
# Фильтруем по валидным процессам
|
||
df_cleaned = df_temp[
|
||
df_temp['Процесс'].str.strip().isin(valid_processes) &
|
||
df_temp['Процесс'].notna()
|
||
].copy()
|
||
|
||
logger.debug(f"🔍 После фильтрации осталось {len(df_cleaned)} строк")
|
||
|
||
if df_cleaned.empty:
|
||
logger.warning("⚠️ Нет данных после фильтрации по процессам")
|
||
logger.debug(f"🔍 Доступные процессы в данных: {df_temp['Процесс'].unique()}")
|
||
return pd.DataFrame()
|
||
|
||
df_cleaned['Процесс'] = df_cleaned['Процесс'].astype(str).str.strip()
|
||
|
||
# Добавляем ID установки
|
||
if 'Установка' in df_cleaned.columns:
|
||
df_cleaned['id'] = df_cleaned['Установка'].apply(get_object_by_name)
|
||
logger.debug(f"🔍 Добавлены ID установок: {df_cleaned['id'].unique()}")
|
||
else:
|
||
logger.warning("⚠️ Колонка 'Установка' не найдена")
|
||
|
||
logger.info(f"✅ Получено {len(df_cleaned)} записей")
|
||
return df_cleaned
|
||
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при парсинге данных: {e}")
|
||
return pd.DataFrame()
|
||
|
||
def _get_tech_pos_wrapper(self, params: Dict[str, Any] = None) -> str:
|
||
"""Обертка для получения данных технологических позиций"""
|
||
logger.debug(f"🔍 _get_tech_pos_wrapper вызван с параметрами: {params}")
|
||
|
||
# Получаем ID ОГ из параметров
|
||
og_id = params.get('id') if params else None
|
||
if not og_id:
|
||
logger.error("❌ Не указан ID ОГ")
|
||
return "{}"
|
||
|
||
# Получаем данные
|
||
tech_pos_data = {}
|
||
if hasattr(self, 'df') and self.df is not None and not self.df.empty:
|
||
# Данные из MinIO
|
||
logger.debug(f"🔍 Ищем данные для ОГ '{og_id}' в DataFrame с {len(self.df)} записями")
|
||
available_ogs = self.df['id'].tolist()
|
||
logger.debug(f"🔍 Доступные ОГ в данных: {available_ogs}")
|
||
|
||
for _, row in self.df.iterrows():
|
||
if row['id'] == og_id:
|
||
tech_pos_data = row['data']
|
||
logger.info(f"✅ Найдены данные для ОГ '{og_id}': {len(tech_pos_data)} записей")
|
||
break
|
||
else:
|
||
logger.warning(f"❌ Данные для ОГ '{og_id}' не найдены")
|
||
elif hasattr(self, 'data_dict') and self.data_dict:
|
||
# Локальные данные
|
||
logger.debug(f"🔍 Ищем данные для ОГ '{og_id}' в data_dict")
|
||
available_ogs = list(self.data_dict.keys())
|
||
logger.debug(f"🔍 Доступные ОГ в data_dict: {available_ogs}")
|
||
|
||
if og_id in self.data_dict:
|
||
tech_pos_data = self.data_dict[og_id].to_dict(orient='records')
|
||
logger.info(f"✅ Найдены данные для ОГ '{og_id}': {len(tech_pos_data)} записей")
|
||
else:
|
||
logger.warning(f"❌ Данные для ОГ '{og_id}' не найдены в data_dict")
|
||
|
||
# Конвертируем в список записей
|
||
try:
|
||
if isinstance(tech_pos_data, pd.DataFrame):
|
||
# Если это DataFrame, конвертируем в список словарей
|
||
result_list = tech_pos_data.to_dict(orient='records')
|
||
logger.debug(f"🔍 Конвертировано в список: {len(result_list)} записей")
|
||
return result_list
|
||
elif isinstance(tech_pos_data, list):
|
||
# Если уже список, возвращаем как есть
|
||
logger.debug(f"🔍 Уже список: {len(tech_pos_data)} записей")
|
||
return tech_pos_data
|
||
else:
|
||
logger.warning(f"🔍 Неожиданный тип данных: {type(tech_pos_data)}")
|
||
return []
|
||
except Exception as e:
|
||
logger.error(f"❌ Ошибка при конвертации данных: {e}")
|
||
return [] |