Доредачил прошлые arch изменения

This commit is contained in:
2025-09-01 20:22:07 +03:00
parent fc0b4356da
commit b98be22359
3 changed files with 393 additions and 268 deletions

View File

@@ -1,9 +1,9 @@
import pandas as pd import pandas as pd
import re import re
from typing import Dict import zipfile
from typing import Dict, Tuple
from core.ports import ParserPort from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name from adapters.pconfig import data_to_json
class MonitoringFuelParser(ParserPort): class MonitoringFuelParser(ParserPort):
@@ -11,71 +11,55 @@ class MonitoringFuelParser(ParserPort):
name = "Мониторинг топлива" name = "Мониторинг топлива"
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int: def _register_default_getters(self):
"""Определение индекса заголовка в Excel по ключевому слову""" """Регистрация геттеров по умолчанию"""
# Читаем первые max_rows строк без заголовков self.register_getter(
df_temp = pd.read_excel( name="total_by_columns",
file_path, method=self._get_total_by_columns,
sheet_name=sheet, required_params=["columns"],
header=None, optional_params=[],
nrows=max_rows description="Агрегация данных по колонкам"
) )
# Ищем строку, где хотя бы в одном столбце встречается искомое значение self.register_getter(
for idx, row in df_temp.iterrows(): name="month_by_code",
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): method=self._get_month_by_code,
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") required_params=["month"],
return idx + 1 # возвращаем индекс строки (0-based) optional_params=[],
description="Получение данных за конкретный месяц"
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
) )
# === Удаление полностью пустых столбцов === def _get_total_by_columns(self, params: dict):
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA """Агрегация по колонкам (обертка для совместимости)"""
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA columns = params["columns"]
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
# === Переименовываем нужные столбцы по позициям === # TODO: Переделать под новую архитектуру
if len(df_full.columns) < 2: df_means, _ = self.aggregate_by_columns(self.df, columns)
raise ValueError("DataFrame должен содержать как минимум 2 столбца.") return df_means.to_dict(orient='index')
new_columns = df_full.columns.tolist() def _get_month_by_code(self, params: dict):
"""Получение данных за месяц (обертка для совместимости)"""
month = params["month"]
if not month:
raise ValueError("Отсутствует идентификатор месяца")
new_columns[0] = 'name' # TODO: Переделать под новую архитектуру
new_columns[1] = 'normativ' df_month = self.get_month(self.df, month)
new_columns[-2] = 'total' return df_month.to_dict(orient='index')
new_columns[-1] = 'total_1'
df_full.columns = new_columns def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_monitoring_fuel_files(file_path, params)
return self.df
# Проверяем, что колонка 'name' существует def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
if 'name' in df_full.columns: """Парсинг ZIP архива с файлами мониторинга топлива"""
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref: with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist() file_list = zip_ref.namelist()
for month in range(1, 13): for month in range(1, 13):
@@ -103,7 +87,70 @@ class MonitoringFuelParser(ParserPort):
return df_monitorings return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns): def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
pass # Placeholder for new_code
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. ''' ''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних means = {} # Для хранения средних
@@ -185,22 +232,3 @@ class MonitoringFuelParser(ParserPort):
total.name = 'mean' total.name = 'mean'
return total, df_combined return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
json_result = data_to_json(data)
return json_result

View File

@@ -6,85 +6,48 @@ from adapters.pconfig import get_og_by_name
class SvodkaCAParser(ParserPort): class SvodkaCAParser(ParserPort):
"""Парсер для сводки СА""" """Парсер для сводок СА"""
name = "Сводка СА" name = "Сводки СА"
def extract_all_tables(self, file_path, sheet_name=0): def _register_default_getters(self):
"""Извлекает все таблицы из Excel файла""" """Регистрация геттеров по умолчанию"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) self.register_getter(
df_filled = df.fillna('') name="get_data",
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) method=self._get_data_wrapper,
required_params=["modes", "tables"],
optional_params=[],
description="Получение данных по режимам и таблицам"
)
non_empty_rows = ~(df_clean.eq('').all(axis=1)) def _get_data_wrapper(self, params: dict):
non_empty_cols = ~(df_clean.eq('').all(axis=0)) """Обертка для получения данных (для совместимости)"""
modes = params["modes"]
tables = params["tables"]
row_indices = non_empty_rows[non_empty_rows].index.tolist() if not isinstance(modes, list):
col_indices = non_empty_cols[non_empty_cols].index.tolist() raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
if not row_indices or not col_indices: # TODO: Переделать под новую архитектуру
return [] data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(self.df, mode, tables)
return self.data_dict_to_json(data_dict)
row_blocks = self._get_contiguous_blocks(row_indices) def parse(self, file_path: str, params: dict) -> pd.DataFrame:
col_blocks = self._get_contiguous_blocks(col_indices) """Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_ca(file_path, params)
return self.df
tables = [] def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
for r_start, r_end in row_blocks: """Парсинг сводки СА"""
for c_start, c_end in col_blocks: # Получаем параметры из params
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'})
continue
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path, sheet_name, inclusion_list):
"""Собственно функция парсинга отчета СА"""
# === Извлечение и фильтрация === # === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name) tables = self.extract_all_tables(file_path, sheet_name)
@@ -190,76 +153,185 @@ class SvodkaCAParser(ParserPort):
else: else:
return None return None
def parse(self, file_path: str, params: dict) -> dict: def extract_all_tables(self, file_path, sheet_name=0):
"""Парсинг файла сводки СА""" """Извлечение всех таблиц из Excel файла"""
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив === df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl')
# Выгружаем План в df_ca_plan df_filled = df.fillna('')
inclusion_list_plan = { df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА non_empty_rows = ~(df_clean.eq('').all(axis=1))
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---") non_empty_cols = ~(df_clean.eq('').all(axis=0))
# Выгружаем Факт row_indices = non_empty_rows[non_empty_rows].index.tolist()
inclusion_list_fact = { col_indices = non_empty_cols[non_empty_cols].index.tolist()
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА if not row_indices or not col_indices:
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---") return []
# Выгружаем План в df_ca_normativ row_blocks = self._get_contiguous_blocks(row_indices)
inclusion_list_normativ = { col_blocks = self._get_contiguous_blocks(col_indices)
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА tables = []
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ) for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---") if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
df_dict = { tables.append(block)
"plan": df_ca_plan,
"fact": df_ca_fact, return tables
"normativ": df_ca_normativ
} def _get_contiguous_blocks(self, indices):
return df_dict """Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame:
"""Парсинг листа Excel"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
filtered_tables = []
for table in tables:
if table.empty:
continue
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
if any(val in inclusion_list for val in first_row_values):
filtered_tables.append(table)
tables = filtered_tables
# === Итоговый список таблиц датафреймов ===
result_list = []
for table in tables:
if table.empty:
continue
# Получаем первую строку (до удаления)
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
# Находим, какой элемент из inclusion_list присутствует
matched_key = None
for val in first_row_values:
if val in inclusion_list:
matched_key = val
break # берём первый совпадающий заголовок
if matched_key is None:
continue # на всякий случай (хотя уже отфильтровано)
# Удаляем первую строку (заголовок) и сбрасываем индекс
df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
# Пропускаем, если таблица пустая
if df_cleaned.empty:
continue
# Первая строка становится заголовком
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
# Преобразуем заголовок: только первый столбец может быть заменён на "name"
cleaned_header = []
# Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
if first_item_str == "" or first_item_str == "nan":
cleaned_header.append("name")
else:
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def data_dict_to_json(self, data_dict): def data_dict_to_json(self, data_dict):
''' Служебная функция для парсинга словаря в json. ''' ''' Служебная функция для парсинга словаря в json. '''
@@ -308,17 +380,3 @@ class SvodkaCAParser(ParserPort):
filtered_df = df[df['table'].isin(table_values)].copy() filtered_df = df[df['table'].isin(table_values)].copy()
result_dict = {key: group for key, group in filtered_df.groupby('table')} result_dict = {key: group for key, group in filtered_df.groupby('table')}
return result_dict return result_dict
def get_value(self, df: pd.DataFrame, params: dict):
modes = params.get("modes")
tables = params.get("tables")
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
# Собираем данные
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(df, mode, tables)
return self.data_dict_to_json(data_dict)

View File

@@ -9,6 +9,60 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ" name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
self.register_getter(
name="single_og",
method=self._get_single_og,
required_params=["id", "codes", "columns"],
optional_params=["search"],
description="Получение данных по одному ОГ"
)
self.register_getter(
name="total_ogs",
method=self._get_total_ogs,
required_params=["codes", "columns"],
optional_params=["search"],
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ (обертка для совместимости)"""
og_id = params["id"]
codes = params["codes"]
columns = params["columns"]
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ (обертка для совместимости)"""
codes = params["codes"]
columns = params["columns"]
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову""" """Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков # Читаем первые max_rows строк без заголовков
@@ -16,7 +70,8 @@ class SvodkaPMParser(ParserPort):
file, file,
sheet_name=sheet, sheet_name=sheet,
header=None, header=None,
nrows=max_rows nrows=max_rows,
engine='openpyxl'
) )
# Ищем строку, где хотя бы в одном столбце встречается искомое значение # Ищем строку, где хотя бы в одном столбце встречается искомое значение
@@ -40,6 +95,7 @@ class SvodkaPMParser(ParserPort):
header=header_num, header=header_num,
usecols=None, usecols=None,
nrows=2, nrows=2,
engine='openpyxl'
) )
if df_probe.shape[0] == 0: if df_probe.shape[0] == 0:
@@ -61,7 +117,8 @@ class SvodkaPMParser(ParserPort):
sheet_name=sheet, sheet_name=sheet,
header=header_num, header=header_num,
usecols=None, usecols=None,
index_col=None index_col=None,
engine='openpyxl'
) )
if indicator_col_name not in df_full.columns: if indicator_col_name not in df_full.columns:
@@ -99,25 +156,25 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной # Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
# Проверяем, начинается ли на "Итого" if is_empty_or_unnamed:
if col_str.startswith('Итого'): # Если это пустая колонка, используем последнее хорошее имя
current_name = 'Итого' if last_good_name:
last_good_name = current_name # обновляем last_good_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name) new_columns.append(last_good_name)
else: else:
# Имя, полученное из exel # Если нет хорошего имени, пропускаем
continue
else:
# Это хорошая колонка
last_good_name = col_str last_good_name = col_str
new_columns.append(col_str) new_columns.append(col_str)
# Применяем новые заголовки
df_final.columns = new_columns df_final.columns = new_columns
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
return df_final return df_final
def parse(self, file_path: str, params: dict) -> dict: def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile import zipfile
pm_dict = { pm_dict = {
"facts": {}, "facts": {},
@@ -125,7 +182,7 @@ class SvodkaPMParser(ParserPort):
} }
excel_fact_template = 'svodka_fact_pm_ID.xlsm' excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx' excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(file_path, 'r') as zip_ref: with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist() file_list = zip_ref.namelist()
for name, id in OG_IDS.items(): for name, id in OG_IDS.items():
if id == 'BASH': if id == 'BASH':
@@ -155,9 +212,9 @@ class SvodkaPMParser(ParserPort):
return pm_dict return pm_dict
def get_svodka_value(self, df_svodka, id, code, search_value=None): def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция для простой выборке по сводке ''' ''' Служебная функция получения значения по коду и столбцу '''
row_index = id row_index = code
mask_value = df_svodka.iloc[0] == code mask_value = df_svodka.iloc[0] == code
if search_value is None: if search_value is None:
@@ -254,22 +311,4 @@ class SvodkaPMParser(ParserPort):
return total_result return total_result
def get_value(self, df, params): # Убираем старый метод get_value, так как он теперь в базовом классе
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
mode = params.get("mode", "total")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = None
if mode == "single":
if not og_id:
raise ValueError("Отсутствует идентификатор ОГ")
data = self.get_svodka_og(df, og_id, codes, columns, search)
elif mode == "total":
data = self.get_svodka_total(df, codes, columns, search)
json_result = data_to_json(data)
return json_result