Files
python_parser/python_parser/adapters/parsers/monitoring_fuel.py
2025-08-26 23:33:29 +03:00

207 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
import re
from typing import Dict
from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name
class MonitoringFuelParser(ParserPort):
"""Парсер для мониторинга топлива"""
name = "Мониторинг топлива"
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
mm = f"{month:02d}"
file_temp = f'monitoring_SNPZ_{mm}.xlsm'
candidates = [f for f in file_list if file_temp in f]
if len(candidates) == 1:
file = candidates[0]
print(f'Загрузка {file}')
with zip_ref.open(file) as excel_file:
try:
df = self.parse_single(excel_file, 'Мониторинг потребления')
df_monitorings[mm] = df
print(f"✅ Данные за месяц {mm} загружены")
except Exception as e:
print(f"Ошибка при загрузке файла {file_temp}: {e}")
else:
print(f"⚠️ Файл не найден: {file_temp}")
return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних
for col in columns:
totals = [] # Список Series (по месяцам) для текущей колонки
for file_key, df in df_dict.items():
if col not in df.columns:
print(f"Колонка '{col}' не найдена в {file_key}, пропускаем.")
continue
# Берём колонку, оставляем id как индекс
series = df[col].copy()
# Определяем имя месяца: извлекаем номер из ключа (например, '03' из '03')
# Если ключ уже '03', '04' и т.п. — используем как есть
match = re.search(r'\d{2}', str(file_key))
month = match.group() if match else file_key
series.name = month
totals.append(series)
if not totals:
raise ValueError(f"Ни один DataFrame не содержит колонку '{col}'")
# Объединяем все Series в один DataFrame (id × месяцы)
df_combined = pd.concat(totals, axis=1)
all_data[col] = df_combined # Сохраняем
# Считаем среднее по строкам (по месяцам), игнорируя NaN
mean_series = df_combined.mean(axis=1)
mean_series = mean_series.dropna() # Убираем id, где нет данных вообще
means[col] = mean_series
# Собираем все средние в один DataFrame
df_means = pd.DataFrame(means)
return df_means, all_data
def get_month(self, df_monitorings, month_key):
''' Служебная функция. Выгрузить только конкретный месяц '''
if month_key not in df_monitorings:
raise KeyError(f"Месяц '{month_key}' не найден в df_monitorings. Доступные: {list(df_monitorings.keys())}")
df = df_monitorings[month_key]
# Создаём копию, чтобы не изменять оригинальный словарь
result_df = df.copy()
# Удаляем колонку 'name', если она существует
if 'name' in result_df.columns:
result_df = result_df.drop(columns=['name'])
return result_df
def aggregate_total_by_id(self, df_dict: Dict[str, pd.DataFrame], column: str):
"""Агрегация данных по ID"""
totals = []
for file, df in df_dict.items():
if column not in df.columns:
print(f"Колонка '{column}' не найдена в {file}, пропускаем.")
continue
# Берём колонку и сохраняем как Series с именем месяца
series = df[column].copy()
series.name = re.sub(r'\D', '', file) # Имя Series будет использовано как имя колонки в concat
totals.append(series)
if not totals:
raise ValueError(f"Ни один DataFrame не содержит колонку '{column}'")
df_combined = pd.concat(totals, axis=1)
# Считаем среднее по строкам (по месяцам)
total = df_combined.mean(axis=1)
total = total.dropna()
total.name = 'mean'
return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
json_result = data_to_json(data)
return json_result