314 lines
14 KiB
Python
314 lines
14 KiB
Python
import pandas as pd
|
||
|
||
from core.ports import ParserPort
|
||
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
|
||
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
|
||
|
||
|
||
class SvodkaPMParser(ParserPort):
|
||
"""Парсер для сводок ПМ (план и факт)"""
|
||
|
||
name = "Сводки ПМ"
|
||
|
||
def _register_default_getters(self):
|
||
"""Регистрация геттеров по умолчанию"""
|
||
# Используем схемы Pydantic как единый источник правды
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="single_og",
|
||
method=self._get_single_og,
|
||
schema_class=SvodkaPMSingleOGRequest,
|
||
description="Получение данных по одному ОГ"
|
||
)
|
||
|
||
register_getter_from_schema(
|
||
parser_instance=self,
|
||
getter_name="total_ogs",
|
||
method=self._get_total_ogs,
|
||
schema_class=SvodkaPMTotalOGsRequest,
|
||
description="Получение данных по всем ОГ"
|
||
)
|
||
|
||
def _get_single_og(self, params: dict):
|
||
"""Получение данных по одному ОГ"""
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
|
||
|
||
og_id = validated_params["id"]
|
||
codes = validated_params["codes"]
|
||
columns = validated_params["columns"]
|
||
search = validated_params.get("search")
|
||
|
||
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
|
||
# TODO: Переделать под новую архитектуру
|
||
return self.get_svodka_og(self.df, og_id, codes, columns, search)
|
||
|
||
def _get_total_ogs(self, params: dict):
|
||
"""Получение данных по всем ОГ"""
|
||
# Валидируем параметры с помощью схемы Pydantic
|
||
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
|
||
|
||
codes = validated_params["codes"]
|
||
columns = validated_params["columns"]
|
||
search = validated_params.get("search")
|
||
|
||
# TODO: Переделать под новую архитектуру
|
||
return self.get_svodka_total(self.df, codes, columns, search)
|
||
|
||
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||
"""Парсинг файла и возврат DataFrame"""
|
||
# Сохраняем DataFrame для использования в геттерах
|
||
self.df = self.parse_svodka_pm_files(file_path, params)
|
||
return self.df
|
||
|
||
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
|
||
"""Определения индекса заголовка в excel по ключевому слову"""
|
||
# Читаем первые max_rows строк без заголовков
|
||
df_temp = pd.read_excel(
|
||
file,
|
||
sheet_name=sheet,
|
||
header=None,
|
||
nrows=max_rows,
|
||
engine='openpyxl'
|
||
)
|
||
|
||
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
||
for idx, row in df_temp.iterrows():
|
||
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
|
||
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
|
||
return idx # 0-based index — то, что нужно для header=
|
||
|
||
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
|
||
|
||
def parse_svodka_pm(self, file, sheet, header_num=None):
|
||
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
|
||
# Автоопределение header_num, если не передан
|
||
if header_num is None:
|
||
header_num = self.find_header_row(file, sheet, search_value="Итого")
|
||
|
||
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
|
||
df_probe = pd.read_excel(
|
||
file,
|
||
sheet_name=sheet,
|
||
header=header_num,
|
||
usecols=None,
|
||
nrows=2,
|
||
engine='openpyxl'
|
||
)
|
||
|
||
if df_probe.shape[0] == 0:
|
||
raise ValueError("Файл пуст или не содержит данных.")
|
||
|
||
first_data_row = df_probe.iloc[0]
|
||
|
||
# Находим столбец с 'INDICATOR_ID'
|
||
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
|
||
if len(indicator_cols) == 0:
|
||
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
|
||
|
||
indicator_col_name = indicator_cols.index[0]
|
||
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
|
||
|
||
# Читаем весь лист
|
||
df_full = pd.read_excel(
|
||
file,
|
||
sheet_name=sheet,
|
||
header=header_num,
|
||
usecols=None,
|
||
index_col=None,
|
||
engine='openpyxl'
|
||
)
|
||
|
||
if indicator_col_name not in df_full.columns:
|
||
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
|
||
|
||
# Перемещаем INDICATOR_ID в начало и делаем индексом
|
||
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
|
||
df_full = df_full[cols]
|
||
df_full.set_index(indicator_col_name, inplace=True)
|
||
|
||
# Обрезаем до "Итого" + 1
|
||
header_list = [str(h).strip() for h in df_full.columns]
|
||
try:
|
||
itogo_idx = header_list.index("Итого")
|
||
num_cols_needed = itogo_idx + 2
|
||
except ValueError:
|
||
print('Столбец "Итого" не найден. Оставляем все столбцы.')
|
||
num_cols_needed = len(header_list)
|
||
|
||
num_cols_needed = min(num_cols_needed, len(header_list))
|
||
df_final = df_full.iloc[:, :num_cols_needed]
|
||
|
||
# === Удаление полностью пустых столбцов ===
|
||
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
|
||
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
|
||
non_empty_mask = df_clean.notna().any()
|
||
df_final = df_final.loc[:, non_empty_mask]
|
||
|
||
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
|
||
new_columns = []
|
||
last_good_name = None
|
||
for col in df_final.columns:
|
||
col_str = str(col).strip()
|
||
|
||
# Проверяем, является ли колонка пустой/некорректной
|
||
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
|
||
|
||
if is_empty_or_unnamed:
|
||
# Если это пустая колонка, используем последнее хорошее имя
|
||
if last_good_name:
|
||
new_columns.append(last_good_name)
|
||
else:
|
||
# Если нет хорошего имени, пропускаем
|
||
continue
|
||
else:
|
||
# Это хорошая колонка
|
||
last_good_name = col_str
|
||
new_columns.append(col_str)
|
||
|
||
# Применяем новые заголовки
|
||
df_final.columns = new_columns
|
||
|
||
return df_final
|
||
|
||
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
|
||
"""Парсинг ZIP архива со сводками ПМ"""
|
||
import zipfile
|
||
pm_dict = {
|
||
"facts": {},
|
||
"plans": {}
|
||
}
|
||
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
|
||
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
|
||
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||
file_list = zip_ref.namelist()
|
||
for name, id in OG_IDS.items():
|
||
if id == 'BASH':
|
||
continue # пропускаем BASH
|
||
|
||
current_fact = replace_id_in_path(excel_fact_template, id)
|
||
fact_candidates = [f for f in file_list if current_fact in f]
|
||
if len(fact_candidates) == 1:
|
||
print(f'Загрузка {current_fact}')
|
||
with zip_ref.open(fact_candidates[0]) as excel_file:
|
||
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
|
||
print(f"✅ Факт загружен: {current_fact}")
|
||
else:
|
||
print(f"⚠️ Файл не найден (Факт): {current_fact}")
|
||
pm_dict['facts'][id] = None
|
||
|
||
current_plan = replace_id_in_path(excel_plan_template, id)
|
||
plan_candidates = [f for f in file_list if current_plan in f]
|
||
if len(plan_candidates) == 1:
|
||
print(f'Загрузка {current_plan}')
|
||
with zip_ref.open(plan_candidates[0]) as excel_file:
|
||
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
|
||
print(f"✅ План загружен: {current_plan}")
|
||
else:
|
||
print(f"⚠️ Файл не найден (План): {current_plan}")
|
||
pm_dict['plans'][id] = None
|
||
|
||
return pm_dict
|
||
|
||
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
|
||
''' Служебная функция получения значения по коду и столбцу '''
|
||
row_index = code
|
||
|
||
mask_value = df_svodka.iloc[0] == code
|
||
if search_value is None:
|
||
mask_name = df_svodka.columns != 'Итого'
|
||
else:
|
||
mask_name = df_svodka.columns == search_value
|
||
|
||
# Убедимся, что маски совпадают по длине
|
||
if len(mask_value) != len(mask_name):
|
||
raise ValueError(
|
||
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
|
||
)
|
||
|
||
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
|
||
col_positions = final_mask.values # numpy array или Series булевых значений
|
||
|
||
if not final_mask.any():
|
||
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
|
||
return 0
|
||
else:
|
||
if row_index in df_svodka.index:
|
||
# Получаем позицию строки
|
||
row_loc = df_svodka.index.get_loc(row_index)
|
||
|
||
# Извлекаем значения по позициям столбцов
|
||
values = df_svodka.iloc[row_loc, col_positions]
|
||
|
||
# Преобразуем в числовой формат
|
||
numeric_values = pd.to_numeric(values, errors='coerce')
|
||
|
||
# Агрегация данных (NaN игнорируются)
|
||
if search_value is None:
|
||
return numeric_values
|
||
else:
|
||
return numeric_values.iloc[0]
|
||
else:
|
||
return None
|
||
|
||
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
|
||
''' Служебная функция получения данных по одному ОГ '''
|
||
result = {}
|
||
|
||
fact_df = pm_dict['facts'][id]
|
||
plan_df = pm_dict['plans'][id]
|
||
|
||
# Определяем, какие столбцы из какого датафрейма брать
|
||
for col in columns:
|
||
col_result = {}
|
||
|
||
if col in ['ПП', 'БП']:
|
||
if plan_df is None:
|
||
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
|
||
else:
|
||
for code in codes:
|
||
val = self.get_svodka_value(plan_df, code, col, search_value)
|
||
col_result[code] = val
|
||
|
||
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
|
||
if fact_df is None:
|
||
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
|
||
else:
|
||
for code in codes:
|
||
val = self.get_svodka_value(fact_df, code, col, search_value)
|
||
col_result[code] = val
|
||
else:
|
||
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
|
||
col_result = {code: None for code in codes}
|
||
|
||
result[col] = col_result
|
||
|
||
return result
|
||
|
||
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
|
||
''' Служебная функция агрегации данные по всем ОГ '''
|
||
total_result = {}
|
||
|
||
for name, og_id in OG_IDS.items():
|
||
if og_id == 'BASH':
|
||
continue
|
||
|
||
# print(f"📊 Обработка: {name} ({og_id})")
|
||
try:
|
||
data = self.get_svodka_og(
|
||
pm_dict,
|
||
og_id,
|
||
codes,
|
||
columns,
|
||
search_value
|
||
)
|
||
total_result[og_id] = data
|
||
except Exception as e:
|
||
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
|
||
total_result[og_id] = None
|
||
|
||
return total_result
|
||
|
||
# Убираем старый метод get_value, так как он теперь в базовом классе
|