424 lines
20 KiB
Python
424 lines
20 KiB
Python
import pandas as pd
|
||
import re
|
||
import zipfile
|
||
import logging
|
||
from typing import Dict, Tuple
|
||
from core.ports import ParserPort
|
||
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||
from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest, MonitoringFuelSeriesRequest
|
||
from adapters.pconfig import data_to_json, get_object_by_name
|
||
|
||
# Настройка логгера для модуля
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MonitoringFuelParser(ParserPort):
|
||
"""Парсер для мониторинга топлива"""
|
||
|
||
name = "Мониторинг топлива"
|
||
|
||
def _register_default_getters(self):
|
||
"""Регистрация геттеров по умолчанию"""
|
||
# Используем схемы Pydantic как единый источник правды
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="total_by_columns",
|
||
method=self._get_total_by_columns,
|
||
schema_class=MonitoringFuelTotalRequest,
|
||
description="Агрегация данных по колонкам"
|
||
)
|
||
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="month_by_code",
|
||
method=self._get_month_by_code,
|
||
schema_class=MonitoringFuelMonthRequest,
|
||
description="Получение данных за конкретный месяц"
|
||
)
|
||
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="series_by_id_and_columns",
|
||
method=self._get_series_by_id_and_columns,
|
||
schema_class=MonitoringFuelSeriesRequest,
|
||
description="Получение временного ряда по ID и колонкам"
|
||
)
|
||
|
||
def determine_getter(self, get_params: dict) -> str:
|
||
"""Определение геттера для мониторинга топлива"""
|
||
# Для monitoring_fuel определяем геттер из параметра mode
|
||
getter_name = get_params.pop("mode", None)
|
||
if not getter_name:
|
||
# Если режим не указан, берем первый доступный
|
||
available_getters = list(self.getters.keys())
|
||
if available_getters:
|
||
getter_name = available_getters[0]
|
||
logger.warning(f"⚠️ Режим не указан, используем первый доступный: {getter_name}")
|
||
else:
|
||
raise ValueError("Парсер не имеет доступных геттеров")
|
||
|
||
return getter_name
|
||
|
||
def _get_total_by_columns(self, params: dict):
|
||
"""Агрегация данных по колонкам"""
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, MonitoringFuelTotalRequest)
|
||
|
||
columns = validated_params["columns"]
|
||
|
||
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
|
||
if hasattr(self, 'data_dict') and self.data_dict is not None:
|
||
# Данные из парсинга
|
||
data_source = self.data_dict
|
||
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
|
||
# Данные из загрузки - преобразуем DataFrame обратно в словарь
|
||
data_source = self._df_to_data_dict()
|
||
else:
|
||
return {}
|
||
|
||
# Агрегируем данные по колонкам
|
||
df_means, _ = self.aggregate_by_columns(data_source, columns)
|
||
|
||
# Преобразуем в JSON-совместимый формат
|
||
result = {}
|
||
for idx, row in df_means.iterrows():
|
||
result[str(idx)] = {}
|
||
for col in columns:
|
||
value = row.get(col)
|
||
if pd.isna(value) or value == float('inf') or value == float('-inf'):
|
||
result[str(idx)][col] = None
|
||
else:
|
||
result[str(idx)][col] = float(value) if isinstance(value, (int, float)) else value
|
||
|
||
return result
|
||
|
||
def _get_month_by_code(self, params: dict):
|
||
"""Получение данных за конкретный месяц"""
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, MonitoringFuelMonthRequest)
|
||
|
||
month = validated_params["month"]
|
||
|
||
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
|
||
if hasattr(self, 'data_dict') and self.data_dict is not None:
|
||
# Данные из парсинга
|
||
data_source = self.data_dict
|
||
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
|
||
# Данные из загрузки - преобразуем DataFrame обратно в словарь
|
||
data_source = self._df_to_data_dict()
|
||
else:
|
||
return {}
|
||
|
||
# Получаем данные за конкретный месяц
|
||
df_month = self.get_month(data_source, month)
|
||
|
||
# Преобразуем в JSON-совместимый формат
|
||
result = {}
|
||
for idx, row in df_month.iterrows():
|
||
# Преобразуем название установки в ID, если это необходимо
|
||
if isinstance(idx, str) and not idx.startswith('SNPZ.'):
|
||
# Это название установки, нужно преобразовать в ID
|
||
object_id = get_object_by_name(idx)
|
||
if object_id is None:
|
||
# Если не удалось найти ID, используем название как есть
|
||
object_id = idx
|
||
else:
|
||
# Это уже ID или что-то другое
|
||
object_id = str(idx)
|
||
|
||
result[object_id] = {}
|
||
for col in df_month.columns:
|
||
value = row[col]
|
||
if pd.isna(value) or value == float('inf') or value == float('-inf'):
|
||
result[object_id][col] = None
|
||
else:
|
||
result[object_id][col] = float(value) if isinstance(value, (int, float)) else value
|
||
|
||
return result
|
||
|
||
def _get_series_by_id_and_columns(self, params: dict):
|
||
"""Получение временных рядов по колонкам для всех ID"""
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, MonitoringFuelSeriesRequest)
|
||
|
||
columns = validated_params["columns"]
|
||
|
||
# Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки)
|
||
if hasattr(self, 'data_dict') and self.data_dict is not None:
|
||
# Данные из парсинга
|
||
data_source = self.data_dict
|
||
elif hasattr(self, 'df') and self.df is not None and not self.df.empty:
|
||
# Данные из загрузки - преобразуем DataFrame обратно в словарь
|
||
data_source = self._df_to_data_dict()
|
||
else:
|
||
return {}
|
||
|
||
# Проверяем, что все колонки существуют хотя бы в одном месяце
|
||
valid_columns = set()
|
||
for month_df in data_source.values():
|
||
valid_columns.update(month_df.columns)
|
||
|
||
for col in columns:
|
||
if col not in valid_columns:
|
||
raise ValueError(f"Колонка '{col}' не найдена ни в одном месяце")
|
||
|
||
# Подготавливаем результат: словарь id → {col: [значения по месяцам]}
|
||
result = {}
|
||
|
||
# Обрабатываем месяцы от 01 до 12
|
||
for month_key in [f"{i:02d}" for i in range(1, 13)]:
|
||
if month_key not in data_source:
|
||
logger.warning(f"Месяц '{month_key}' не найден в df_monitorings, пропускаем.")
|
||
continue
|
||
|
||
df = data_source[month_key]
|
||
|
||
for col in columns:
|
||
if col not in df.columns:
|
||
continue # Пропускаем, если в этом месяце нет колонки
|
||
|
||
for idx, value in df[col].items():
|
||
if pd.isna(value):
|
||
continue # Пропускаем NaN
|
||
|
||
if idx not in result:
|
||
result[idx] = {c: [] for c in columns}
|
||
|
||
# Добавляем значение в массив для данного ID и колонки
|
||
if not pd.isna(value) and value != float('inf') and value != float('-inf'):
|
||
result[idx][col].append(float(value) if isinstance(value, (int, float)) else value)
|
||
|
||
# Преобразуем ключи id в строки (для JSON-совместимости)
|
||
result_str_keys = {str(k): v for k, v in result.items()}
|
||
return result_str_keys
|
||
|
||
def _df_to_data_dict(self):
|
||
"""Преобразование DataFrame обратно в словарь данных"""
|
||
if not hasattr(self, 'df') or self.df is None or self.df.empty:
|
||
return {}
|
||
|
||
data_dict = {}
|
||
|
||
# Группируем данные по месяцам
|
||
for _, row in self.df.iterrows():
|
||
month = row.get('month')
|
||
data = row.get('data')
|
||
|
||
if month and data is not None:
|
||
# data уже является DataFrame, поэтому используем его напрямую
|
||
if isinstance(data, pd.DataFrame):
|
||
data_dict[month] = data
|
||
else:
|
||
# Если data не DataFrame, пропускаем
|
||
logger.warning(f"Данные за месяц {month} не являются DataFrame, пропускаем")
|
||
|
||
return data_dict
|
||
|
||
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||
"""Парсинг файла и возврат DataFrame"""
|
||
# Парсим данные и сохраняем словарь для использования в геттерах
|
||
self.data_dict = self.parse_monitoring_fuel_files(file_path, params)
|
||
|
||
# Преобразуем словарь в DataFrame для совместимости с services.py
|
||
if self.data_dict:
|
||
# Создаем DataFrame с информацией о месяцах и данных
|
||
data_rows = []
|
||
for month, df_data in self.data_dict.items():
|
||
if df_data is not None and not df_data.empty:
|
||
data_rows.append({
|
||
'month': month,
|
||
'rows_count': len(df_data),
|
||
'data': df_data
|
||
})
|
||
|
||
if data_rows:
|
||
df = pd.DataFrame(data_rows)
|
||
self.df = df
|
||
return df
|
||
|
||
# Если данных нет, возвращаем пустой DataFrame
|
||
self.df = pd.DataFrame()
|
||
return self.df
|
||
|
||
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
|
||
"""Парсинг ZIP архива с файлами мониторинга топлива"""
|
||
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
|
||
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
|
||
file_list = zip_ref.namelist()
|
||
for month in range(1, 13):
|
||
|
||
mm = f"{month:02d}"
|
||
file_temp = f'monitoring_SNPZ_{mm}.xlsm'
|
||
candidates = [f for f in file_list if file_temp in f]
|
||
|
||
if len(candidates) == 1:
|
||
file = candidates[0]
|
||
|
||
logger.info(f'Загрузка {file}')
|
||
with zip_ref.open(file) as excel_file:
|
||
try:
|
||
df = self.parse_single(excel_file, 'Мониторинг потребления')
|
||
df_monitorings[mm] = df
|
||
|
||
logger.info(f"✅ Данные за месяц {mm} загружены")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при загрузке файла {file_temp}: {e}")
|
||
|
||
else:
|
||
logger.warning(f"⚠️ Файл не найден: {file_temp}")
|
||
|
||
return df_monitorings
|
||
|
||
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
|
||
"""Определение индекса заголовка в Excel по ключевому слову"""
|
||
# Читаем первые max_rows строк без заголовков
|
||
df_temp = pd.read_excel(
|
||
file_path,
|
||
sheet_name=sheet,
|
||
header=None,
|
||
nrows=max_rows,
|
||
engine='openpyxl'
|
||
)
|
||
|
||
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
||
for idx, row in df_temp.iterrows():
|
||
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
|
||
logger.debug(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
|
||
return idx + 1 # возвращаем индекс строки (0-based)
|
||
|
||
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
|
||
|
||
def parse_single(self, file, sheet, header_num=None):
|
||
''' Собственно парсер отчетов одного объекта'''
|
||
# Автоопределение header_num, если не передан
|
||
if header_num is None:
|
||
header_num = self.find_header_row(file, sheet, search_value="Установка")
|
||
# Читаем весь лист, начиная с найденной строки как заголовок
|
||
df_full = pd.read_excel(
|
||
file,
|
||
sheet_name=sheet,
|
||
header=header_num,
|
||
usecols=None,
|
||
index_col=None,
|
||
engine='openpyxl'
|
||
)
|
||
|
||
# === Удаление полностью пустых столбцов ===
|
||
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
|
||
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
|
||
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
|
||
|
||
# === Переименовываем нужные столбцы по позициям ===
|
||
if len(df_full.columns) < 2:
|
||
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
|
||
|
||
new_columns = df_full.columns.tolist()
|
||
|
||
new_columns[0] = 'name'
|
||
new_columns[1] = 'normativ'
|
||
new_columns[-2] = 'total'
|
||
new_columns[-1] = 'total_1'
|
||
|
||
df_full.columns = new_columns
|
||
|
||
# Проверяем, что колонка 'name' существует
|
||
if 'name' in df_full.columns:
|
||
# Применяем функцию get_object_by_name к каждой строке в колонке 'name'
|
||
df_full['id'] = df_full['name'].apply(get_object_by_name)
|
||
# Удаляем строки, где не удалось найти ID
|
||
df_full = df_full.dropna(subset=['id'])
|
||
else:
|
||
# Если нет колонки name, создаем id из индекса
|
||
df_full['id'] = df_full.index
|
||
|
||
# Устанавливаем id как индекс
|
||
df_full.set_index('id', inplace=True)
|
||
logger.debug(f"Окончательное количество столбцов: {len(df_full.columns)}")
|
||
return df_full
|
||
|
||
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
|
||
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
|
||
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
|
||
means = {} # Для хранения средних
|
||
|
||
for col in columns:
|
||
totals = [] # Список Series (по месяцам) для текущей колонки
|
||
|
||
for file_key, df in df_dict.items():
|
||
if col not in df.columns:
|
||
logger.warning(f"Колонка '{col}' не найдена в {file_key}, пропускаем.")
|
||
continue
|
||
|
||
# Берём колонку, оставляем id как индекс
|
||
series = df[col].copy()
|
||
|
||
# Определяем имя месяца: извлекаем номер из ключа (например, '03' из '03')
|
||
# Если ключ уже '03', '04' и т.п. — используем как есть
|
||
match = re.search(r'\d{2}', str(file_key))
|
||
month = match.group() if match else file_key
|
||
series.name = month
|
||
totals.append(series)
|
||
|
||
if not totals:
|
||
raise ValueError(f"Ни один DataFrame не содержит колонку '{col}'")
|
||
|
||
# Объединяем все Series в один DataFrame (id × месяцы)
|
||
df_combined = pd.concat(totals, axis=1)
|
||
all_data[col] = df_combined # Сохраняем
|
||
|
||
# Считаем среднее по строкам (по месяцам), игнорируя NaN
|
||
mean_series = df_combined.mean(axis=1)
|
||
mean_series = mean_series.dropna() # Убираем id, где нет данных вообще
|
||
means[col] = mean_series
|
||
|
||
# Собираем все средние в один DataFrame
|
||
df_means = pd.DataFrame(means)
|
||
|
||
return df_means, all_data
|
||
|
||
def get_month(self, df_monitorings, month_key):
|
||
''' Служебная функция. Выгрузить только конкретный месяц '''
|
||
if month_key not in df_monitorings:
|
||
raise KeyError(f"Месяц '{month_key}' не найден в df_monitorings. Доступные: {list(df_monitorings.keys())}")
|
||
|
||
df = df_monitorings[month_key]
|
||
|
||
# Создаём копию, чтобы не изменять оригинальный словарь
|
||
result_df = df.copy()
|
||
|
||
# Удаляем колонку 'name', если она существует
|
||
if 'name' in result_df.columns:
|
||
result_df = result_df.drop(columns=['name'])
|
||
|
||
return result_df
|
||
|
||
def aggregate_total_by_id(self, df_dict: Dict[str, pd.DataFrame], column: str):
|
||
"""Агрегация данных по ID"""
|
||
totals = []
|
||
|
||
for file, df in df_dict.items():
|
||
if column not in df.columns:
|
||
logger.warning(f"Колонка '{column}' не найдена в {file}, пропускаем.")
|
||
continue
|
||
|
||
# Берём колонку и сохраняем как Series с именем месяца
|
||
series = df[column].copy()
|
||
series.name = re.sub(r'\D', '', file) # Имя Series будет использовано как имя колонки в concat
|
||
totals.append(series)
|
||
|
||
if not totals:
|
||
raise ValueError(f"Ни один DataFrame не содержит колонку '{column}'")
|
||
|
||
df_combined = pd.concat(totals, axis=1)
|
||
|
||
# Считаем среднее по строкам (по месяцам)
|
||
total = df_combined.mean(axis=1)
|
||
total = total.dropna()
|
||
|
||
total.name = 'mean'
|
||
|
||
return total, df_combined
|