Files
python_parser/python_parser/adapters/parsers/monitoring_tar.py

316 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import zipfile
import tempfile
import pandas as pd
import logging
from typing import Dict, Any, List
from core.ports import ParserPort
from adapters.pconfig import find_header_row, SNPZ_IDS, data_to_json
# Настройка логгера для модуля
logger = logging.getLogger(__name__)
class MonitoringTarParser(ParserPort):
"""Парсер для мониторинга ТЭР (топливно-энергетических ресурсов)"""
name = "monitoring_tar"
def __init__(self):
super().__init__()
self.data_dict = {}
self.df = None
# Регистрируем геттеры
self.register_getter('get_tar_data', self._get_tar_data_wrapper, required_params=['mode'])
self.register_getter('get_tar_full_data', self._get_tar_full_data_wrapper, required_params=[])
def determine_getter(self, get_params: dict) -> str:
"""Определение геттера для мониторинга ТАР"""
# Для monitoring_tar определяем геттер по параметрам
if 'mode' in get_params:
# Если есть параметр mode, используем get_tar_data
return 'get_tar_data'
else:
# Если нет параметра mode, используем get_tar_full_data
return 'get_tar_full_data'
def parse(self, file_path: str, params: Dict[str, Any] = None) -> pd.DataFrame:
"""Парсит ZIP архив с файлами мониторинга ТЭР"""
logger.debug(f"🔍 MonitoringTarParser.parse вызван с файлом: {file_path}")
if not file_path.endswith('.zip'):
raise ValueError("MonitoringTarParser поддерживает только ZIP архивы")
# Обрабатываем ZIP архив
result = self._parse_zip_archive(file_path)
# Конвертируем результат в DataFrame для совместимости с ReportService
if result:
data_list = []
for id, data in result.items():
data_list.append({
'id': id,
'data': data,
'records_count': len(data.get('total', [])) + len(data.get('last_day', []))
})
df = pd.DataFrame(data_list)
logger.debug(f"🔍 Создан DataFrame с {len(df)} записями")
return df
else:
logger.debug("🔍 Возвращаем пустой DataFrame")
return pd.DataFrame()
def _parse_zip_archive(self, zip_path: str) -> Dict[str, Any]:
"""Парсит ZIP архив с файлами мониторинга ТЭР"""
logger.info(f"📦 Обработка ZIP архива: {zip_path}")
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем файлы мониторинга ТЭР
tar_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
# Поддерживаем файлы svodka_tar_*.xlsx (основные) и monitoring_*.xlsm (альтернативные)
if (file.startswith('svodka_tar_') and file.endswith('.xlsx')) or (file.startswith('monitoring_') and file.endswith('.xlsm')):
tar_files.append(os.path.join(root, file))
if not tar_files:
raise ValueError("В архиве не найдены файлы мониторинга ТЭР")
logger.info(f"📁 Найдено {len(tar_files)} файлов мониторинга ТЭР")
# Обрабатываем каждый файл
all_data = {}
for file_path in tar_files:
logger.info(f"📁 Обработка файла: {file_path}")
# Извлекаем номер месяца из имени файла
filename = os.path.basename(file_path)
month_str = self._extract_month_from_filename(filename)
logger.debug(f"📅 Месяц: {month_str}")
# Парсим файл
file_data = self._parse_single_file(file_path, month_str)
if file_data:
all_data.update(file_data)
return all_data
def _extract_month_from_filename(self, filename: str) -> str:
"""Извлекает номер месяца из имени файла"""
# Для файлов типа svodka_tar_SNPZ_01.xlsx или monitoring_SNPZ_01.xlsm
parts = filename.split('_')
if len(parts) >= 3:
month_part = parts[-1].split('.')[0] # Убираем расширение
if month_part.isdigit():
return month_part
return "01" # По умолчанию
def _parse_single_file(self, file_path: str, month_str: str) -> Dict[str, Any]:
"""Парсит один файл мониторинга ТЭР"""
try:
excel_file = pd.ExcelFile(file_path)
available_sheets = excel_file.sheet_names
except Exception as e:
logger.error(f"Не удалось открыть Excel-файл {file_path}: {e}")
return {}
# Словарь для хранения данных: id -> {'total': [], 'last_day': []}
df_svodka_tar = {}
# Определяем тип файла и обрабатываем соответственно
filename = os.path.basename(file_path)
if filename.startswith('svodka_tar_'):
# Обрабатываем файлы svodka_tar_*.xlsx с SNPZ_IDS
for name, id in SNPZ_IDS.items():
if name not in available_sheets:
logger.warning(f"🟡 Лист '{name}' отсутствует в файле {file_path}")
continue
# Парсим оба типа строк
result = self._parse_monitoring_tar_single(file_path, name, month_str)
# Инициализируем структуру для id
if id not in df_svodka_tar:
df_svodka_tar[id] = {'total': [], 'last_day': []}
if isinstance(result['total'], pd.DataFrame) and not result['total'].empty:
df_svodka_tar[id]['total'].append(result['total'])
if isinstance(result['last_day'], pd.DataFrame) and not result['last_day'].empty:
df_svodka_tar[id]['last_day'].append(result['last_day'])
elif filename.startswith('monitoring_'):
# Обрабатываем файлы monitoring_*.xlsm с альтернативными листами
monitoring_sheets = {
'Мониторинг потребления': 'SNPZ.MONITORING',
'Исходные данные': 'SNPZ.SOURCE_DATA'
}
for sheet_name, id in monitoring_sheets.items():
if sheet_name not in available_sheets:
logger.warning(f"🟡 Лист '{sheet_name}' отсутствует в файле {file_path}")
continue
# Парсим оба типа строк
result = self._parse_monitoring_tar_single(file_path, sheet_name, month_str)
# Инициализируем структуру для id
if id not in df_svodka_tar:
df_svodka_tar[id] = {'total': [], 'last_day': []}
if isinstance(result['total'], pd.DataFrame) and not result['total'].empty:
df_svodka_tar[id]['total'].append(result['total'])
if isinstance(result['last_day'], pd.DataFrame) and not result['last_day'].empty:
df_svodka_tar[id]['last_day'].append(result['last_day'])
# Агрегация: объединяем списки в DataFrame
for id, data in df_svodka_tar.items():
if data['total']:
df_svodka_tar[id]['total'] = pd.concat(data['total'], ignore_index=True)
else:
df_svodka_tar[id]['total'] = pd.DataFrame()
if data['last_day']:
df_svodka_tar[id]['last_day'] = pd.concat(data['last_day'], ignore_index=True)
else:
df_svodka_tar[id]['last_day'] = pd.DataFrame()
logger.info(f"✅ Агрегировано: {len(df_svodka_tar[id]['total'])} 'total' и "
f"{len(df_svodka_tar[id]['last_day'])} 'last_day' записей для id='{id}'")
return df_svodka_tar
def _parse_monitoring_tar_single(self, file: str, sheet: str, month_str: str) -> Dict[str, Any]:
"""Парсит один файл и лист"""
try:
# Проверяем наличие листа
if sheet not in pd.ExcelFile(file).sheet_names:
logger.warning(f"🟡 Лист '{sheet}' не найден в файле {file}")
return {'total': None, 'last_day': None}
# Определяем номер заголовка в зависимости от типа файла
filename = os.path.basename(file)
if filename.startswith('svodka_tar_'):
# Для файлов svodka_tar_*.xlsx ищем заголовок по значению "1"
header_num = find_header_row(file, sheet, search_value="1")
if header_num is None:
logger.error(f"Не найдена строка с заголовком '1' в файле {file}, лист '{sheet}'")
return {'total': None, 'last_day': None}
elif filename.startswith('monitoring_'):
# Для файлов monitoring_*.xlsm заголовок находится в строке 5
header_num = 5
else:
logger.error(f"❌ Неизвестный тип файла: {filename}")
return {'total': None, 'last_day': None}
logger.debug(f"🔍 Используем заголовок в строке {header_num} для листа '{sheet}'")
# Читаем с двумя уровнями заголовков
df = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
index_col=None
)
# Убираем мультииндекс: оставляем первый уровень
df.columns = df.columns.get_level_values(0)
# Удаляем строки, где все значения — NaN
df = df.dropna(how='all').reset_index(drop=True)
if df.empty:
logger.warning(f"🟡 Нет данных после очистки в файле {file}, лист '{sheet}'")
return {'total': None, 'last_day': None}
# === 1. Обработка строки "Всего" ===
first_col = df.columns[0]
mask_total = df[first_col].astype(str).str.strip() == "Всего"
df_total = df[mask_total].copy()
if not df_total.empty:
# Заменяем "Всего" на номер месяца в первой колонке
df_total.loc[:, first_col] = df_total[first_col].astype(str).str.replace("Всего", month_str, regex=False)
df_total = df_total.reset_index(drop=True)
else:
df_total = pd.DataFrame()
# === 2. Обработка последней строки (не пустая) ===
# Берём последнюю строку из исходного df (не включая "Всего", если она внизу)
# Исключим строку "Всего" из "последней строки", если она есть
df_no_total = df[~mask_total].dropna(how='all')
if not df_no_total.empty:
df_last_day = df_no_total.tail(1).copy()
df_last_day = df_last_day.reset_index(drop=True)
else:
df_last_day = pd.DataFrame()
return {'total': df_total, 'last_day': df_last_day}
except Exception as e:
logger.error(f"❌ Ошибка при обработке файла {file}, лист '{sheet}': {e}")
return {'total': None, 'last_day': None}
def _get_tar_data_wrapper(self, params: Dict[str, Any] = None) -> str:
"""Обертка для получения данных мониторинга ТЭР с фильтрацией по режиму"""
logger.debug(f"🔍 _get_tar_data_wrapper вызван с параметрами: {params}")
# Получаем режим из параметров
mode = params.get('mode', 'total') if params else 'total'
# Фильтруем данные по режиму
filtered_data = {}
if hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из MinIO
for _, row in self.df.iterrows():
id = row['id']
data = row['data']
if isinstance(data, dict) and mode in data:
filtered_data[id] = data[mode]
else:
filtered_data[id] = pd.DataFrame()
elif hasattr(self, 'data_dict') and self.data_dict:
# Локальные данные
for id, data in self.data_dict.items():
if isinstance(data, dict) and mode in data:
filtered_data[id] = data[mode]
else:
filtered_data[id] = pd.DataFrame()
# Конвертируем в JSON
try:
result_json = data_to_json(filtered_data)
return result_json
except Exception as e:
logger.error(f"❌ Ошибка при конвертации данных в JSON: {e}")
return "{}"
def _get_tar_full_data_wrapper(self, params: Dict[str, Any] = None) -> str:
"""Обертка для получения всех данных мониторинга ТЭР"""
logger.debug(f"🔍 _get_tar_full_data_wrapper вызван с параметрами: {params}")
# Получаем все данные
full_data = {}
if hasattr(self, 'df') and self.df is not None and not self.df.empty:
# Данные из MinIO
for _, row in self.df.iterrows():
id = row['id']
data = row['data']
full_data[id] = data
elif hasattr(self, 'data_dict') and self.data_dict:
# Локальные данные
full_data = self.data_dict
# Конвертируем в JSON
try:
result_json = data_to_json(full_data)
return result_json
except Exception as e:
logger.error(f"❌ Ошибка при конвертации данных в JSON: {e}")
return "{}"