Files
python_parser/python_parser/adapters/parsers/oper_spravka_tech_pos.py
2025-09-04 18:24:53 +03:00

285 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import tempfile
import zipfile
import pandas as pd
import logging
from typing import Dict, Any, List
from datetime import datetime
from core.ports import ParserPort
from adapters.pconfig import find_header_row, get_object_by_name, data_to_json
# Настройка логгера для модуля
logger = logging.getLogger(__name__)
class OperSpravkaTechPosParser(ParserPort):
"""Парсер для операционных справок технологических позиций"""
name = "oper_spravka_tech_pos"
def __init__(self):
super().__init__()
self.data_dict = {}
self.df = None
# Регистрируем геттер
self.register_getter('get_tech_pos', self._get_tech_pos_wrapper, required_params=['id'])
def parse(self, file_path: str, params: Dict[str, Any] = None) -> pd.DataFrame:
"""Парсит ZIP архив с файлами операционных справок технологических позиций"""
logger.debug(f"🔍 OperSpravkaTechPosParser.parse вызван с файлом: {file_path}")
if not file_path.endswith('.zip'):
raise ValueError("OperSpravkaTechPosParser поддерживает только ZIP архивы")
# Обрабатываем ZIP архив
result = self._parse_zip_archive(file_path)
# Конвертируем результат в DataFrame для совместимости с ReportService
if result:
data_list = []
for id, data in result.items():
if data is not None and not data.empty:
records = data.to_dict(orient='records')
data_list.append({
'id': id,
'data': records,
'records_count': len(records)
})
df = pd.DataFrame(data_list)
logger.debug(f"🔍 Создан DataFrame с {len(df)} записями")
return df
else:
logger.debug("🔍 Возвращаем пустой DataFrame")
return pd.DataFrame()
def _parse_zip_archive(self, zip_path: str) -> Dict[str, pd.DataFrame]:
"""Парсит ZIP архив с файлами операционных справок"""
logger.info(f"📦 Обработка ZIP архива: {zip_path}")
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем файлы операционных справок
tech_pos_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if (file.startswith('oper_spavka_tech_pos_') or
file.startswith('oper_spravka_tech_pos_')) and file.endswith(('.xlsx', '.xls', '.xlsm')):
tech_pos_files.append(os.path.join(root, file))
if not tech_pos_files:
raise ValueError("В архиве не найдены файлы операционных справок технологических позиций")
logger.info(f"📁 Найдено {len(tech_pos_files)} файлов операционных справок")
# Обрабатываем каждый файл
all_data = {}
for file_path in tech_pos_files:
logger.info(f"📁 Обработка файла: {file_path}")
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(file_path)
og_id = self._extract_og_id_from_filename(filename)
logger.debug(f"🏭 ОГ ID: {og_id}")
# Парсим файл
file_data = self._parse_single_file(file_path)
if file_data:
all_data.update(file_data)
return all_data
def _extract_og_id_from_filename(self, filename: str) -> str:
"""Извлекает ID ОГ из имени файла"""
# Для файлов типа oper_spavka_tech_pos_SNPZ.xlsx
parts = filename.split('_')
if len(parts) >= 4:
og_id = parts[-1].split('.')[0] # Убираем расширение
return og_id
return "UNKNOWN"
def _parse_single_file(self, file_path: str) -> Dict[str, pd.DataFrame]:
"""Парсит один файл операционной справки"""
try:
# Находим актуальный лист
actual_sheet = self._find_actual_sheet_num(file_path)
logger.debug(f"📅 Актуальный лист: {actual_sheet}")
# Находим заголовок
header_row = self._find_header_row(file_path, actual_sheet)
logger.debug(f"📋 Заголовок найден в строке {header_row}")
# Парсим данные
df = self._parse_tech_pos_data(file_path, actual_sheet, header_row)
if df is not None and not df.empty:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(file_path)
og_id = self._extract_og_id_from_filename(filename)
return {og_id: df}
else:
logger.warning(f"⚠️ Нет данных в файле {file_path}")
return {}
except Exception as e:
logger.error(f"❌ Ошибка при обработке файла {file_path}: {e}")
return {}
def _find_actual_sheet_num(self, file_path: str) -> str:
"""Поиск номера актуального листа"""
current_day = datetime.now().day
current_month = datetime.now().month
actual_sheet = f"{current_day:02d}"
try:
# Читаем все листы от 1 до текущего дня
all_sheets = {}
for day in range(1, current_day + 1):
sheet_num = f"{day:02d}"
try:
df_temp = pd.read_excel(file_path, sheet_name=sheet_num, usecols=[1], nrows=2, header=None)
all_sheets[sheet_num] = df_temp
except:
continue
# Идем от текущего дня к 1
for day in range(current_day, 0, -1):
sheet_num = f"{day:02d}"
if sheet_num in all_sheets:
df_temp = all_sheets[sheet_num]
if df_temp.shape[0] > 1:
date_str = df_temp.iloc[1, 0] # B2
if pd.notna(date_str):
try:
date = pd.to_datetime(date_str)
# Проверяем совпадение месяца даты с текущим месяцем
if date.month == current_month:
actual_sheet = sheet_num
break
except:
continue
except Exception as e:
logger.warning(f"⚠️ Ошибка при поиске актуального листа: {e}")
return actual_sheet
def _find_header_row(self, file_path: str, sheet_name: str, search_value: str = "Загрузка основных процессов") -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
try:
# Читаем первый столбец
df_temp = pd.read_excel(file_path, sheet_name=sheet_name, usecols=[0])
# Ищем строку с искомым значением
for idx, row in df_temp.iterrows():
if row.astype(str).str.contains(search_value, case=False, regex=False).any():
logger.debug(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based), который будет использован как `header=`
raise ValueError(f"Не найдена строка с заголовком '{search_value}'.")
except Exception as e:
logger.error(f"❌ Ошибка при поиске заголовка: {e}")
return 0
def _parse_tech_pos_data(self, file_path: str, sheet_name: str, header_row: int) -> pd.DataFrame:
"""Парсинг данных технологических позиций"""
try:
valid_processes = ['Первичная переработка', 'Гидроочистка топлив', 'Риформирование', 'Изомеризация']
df_temp = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_row + 1, # Исправлено: добавляем +1 как в оригинале
usecols=range(1, 5)
)
logger.debug(f"🔍 Прочитано {len(df_temp)} строк из Excel")
logger.debug(f"🔍 Колонки: {list(df_temp.columns)}")
# Фильтруем по валидным процессам
df_cleaned = df_temp[
df_temp['Процесс'].str.strip().isin(valid_processes) &
df_temp['Процесс'].notna()
].copy()
logger.debug(f"🔍 После фильтрации осталось {len(df_cleaned)} строк")
if df_cleaned.empty:
logger.warning("⚠️ Нет данных после фильтрации по процессам")
logger.debug(f"🔍 Доступные процессы в данных: {df_temp['Процесс'].unique()}")
return pd.DataFrame()
df_cleaned['Процесс'] = df_cleaned['Процесс'].astype(str).str.strip()
# Добавляем ID установки
if 'Установка' in df_cleaned.columns:
df_cleaned['id'] = df_cleaned['Установка'].apply(get_object_by_name)
logger.debug(f"🔍 Добавлены ID установок: {df_cleaned['id'].unique()}")
else:
logger.warning("⚠️ Колонка 'Установка' не найдена")
logger.info(f"✅ Получено {len(df_cleaned)} записей")
return df_cleaned
except Exception as e:
logger.error(f"❌ Ошибка при парсинге данных: {e}")
return pd.DataFrame()
def _get_tech_pos_wrapper(self, params: Dict[str, Any] = None) -> str:
"""Обертка для получения данных технологических позиций"""
logger.debug(f"🔍 _get_tech_pos_wrapper вызван с параметрами: {params}")
# Получаем ID ОГ из параметров
og_id = params.get('id') if params else None
if not og_id:
logger.error("Не указан ID ОГ")
return "{}"
# Получаем данные
tech_pos_data = {}
if hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из MinIO
logger.debug(f"🔍 Ищем данные для ОГ '{og_id}' в DataFrame с {len(self.df)} записями")
available_ogs = self.df['id'].tolist()
logger.debug(f"🔍 Доступные ОГ в данных: {available_ogs}")
for _, row in self.df.iterrows():
if row['id'] == og_id:
tech_pos_data = row['data']
logger.info(f"✅ Найдены данные для ОГ '{og_id}': {len(tech_pos_data)} записей")
break
else:
logger.warning(f"❌ Данные для ОГ '{og_id}' не найдены")
elif hasattr(self, 'data_dict') and self.data_dict:
# Локальные данные
logger.debug(f"🔍 Ищем данные для ОГ '{og_id}' в data_dict")
available_ogs = list(self.data_dict.keys())
logger.debug(f"🔍 Доступные ОГ в data_dict: {available_ogs}")
if og_id in self.data_dict:
tech_pos_data = self.data_dict[og_id].to_dict(orient='records')
logger.info(f"✅ Найдены данные для ОГ '{og_id}': {len(tech_pos_data)} записей")
else:
logger.warning(f"❌ Данные для ОГ '{og_id}' не найдены в data_dict")
# Конвертируем в список записей
try:
if isinstance(tech_pos_data, pd.DataFrame):
# Если это DataFrame, конвертируем в список словарей
result_list = tech_pos_data.to_dict(orient='records')
logger.debug(f"🔍 Конвертировано в список: {len(result_list)} записей")
return result_list
elif isinstance(tech_pos_data, list):
# Если уже список, возвращаем как есть
logger.debug(f"🔍 Уже список: {len(tech_pos_data)} записей")
return tech_pos_data
else:
logger.warning(f"🔍 Неожиданный тип данных: {type(tech_pos_data)}")
return []
except Exception as e:
logger.error(f"❌ Ошибка при конвертации данных: {e}")
return []