diff --git a/python_parser/adapters/parsers/monitoring_fuel.py b/python_parser/adapters/parsers/monitoring_fuel.py index 8453d2e..129a812 100644 --- a/python_parser/adapters/parsers/monitoring_fuel.py +++ b/python_parser/adapters/parsers/monitoring_fuel.py @@ -1,9 +1,9 @@ import pandas as pd import re -from typing import Dict - +import zipfile +from typing import Dict, Tuple from core.ports import ParserPort -from adapters.pconfig import data_to_json, get_object_by_name +from adapters.pconfig import data_to_json class MonitoringFuelParser(ParserPort): @@ -11,71 +11,55 @@ class MonitoringFuelParser(ParserPort): name = "Мониторинг топлива" - def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int: - """Определение индекса заголовка в Excel по ключевому слову""" - # Читаем первые max_rows строк без заголовков - df_temp = pd.read_excel( - file_path, - sheet_name=sheet, - header=None, - nrows=max_rows + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="total_by_columns", + method=self._get_total_by_columns, + required_params=["columns"], + optional_params=[], + description="Агрегация данных по колонкам" + ) + + self.register_getter( + name="month_by_code", + method=self._get_month_by_code, + required_params=["month"], + optional_params=[], + description="Получение данных за конкретный месяц" ) - # Ищем строку, где хотя бы в одном столбце встречается искомое значение - for idx, row in df_temp.iterrows(): - if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): - print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") - return idx + 1 # возвращаем индекс строки (0-based) + def _get_total_by_columns(self, params: dict): + """Агрегация по колонкам (обертка для совместимости)""" + columns = params["columns"] + if not columns: + raise ValueError("Отсутствуют идентификаторы столбцов") + + # TODO: Переделать под новую архитектуру + df_means, _ = self.aggregate_by_columns(self.df, columns) + return df_means.to_dict(orient='index') - raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.") + def _get_month_by_code(self, params: dict): + """Получение данных за месяц (обертка для совместимости)""" + month = params["month"] + if not month: + raise ValueError("Отсутствует идентификатор месяца") + + # TODO: Переделать под новую архитектуру + df_month = self.get_month(self.df, month) + return df_month.to_dict(orient='index') - def parse_single(self, file, sheet, header_num=None): - ''' Собственно парсер отчетов одного объекта''' - # Автоопределение header_num, если не передан - if header_num is None: - header_num = self.find_header_row(file, sheet, search_value="Установка") - # Читаем весь лист, начиная с найденной строки как заголовок - df_full = pd.read_excel( - file, - sheet_name=sheet, - header=header_num, - usecols=None, - index_col=None - ) + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_monitoring_fuel_files(file_path, params) + return self.df - # === Удаление полностью пустых столбцов === - df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA - df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA - df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы - - # === Переименовываем нужные столбцы по позициям === - if len(df_full.columns) < 2: - raise ValueError("DataFrame должен содержать как минимум 2 столбца.") - - new_columns = df_full.columns.tolist() - - new_columns[0] = 'name' - new_columns[1] = 'normativ' - new_columns[-2] = 'total' - new_columns[-1] = 'total_1' - - df_full.columns = new_columns - - # Проверяем, что колонка 'name' существует - if 'name' in df_full.columns: - # Применяем функцию get_id_by_name к каждой строке в колонке 'name' - df_full['id'] = df_full['name'].apply(get_object_by_name) - - # Устанавливаем id как индекс - df_full.set_index('id', inplace=True) - print(f"Окончательное количество столбцов: {len(df_full.columns)}") - return df_full - - def parse(self, file_path: str, params: dict) -> dict: - import zipfile + def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]: + """Парсинг ZIP архива с файлами мониторинга топлива""" df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ - with zipfile.ZipFile(file_path, 'r') as zip_ref: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: file_list = zip_ref.namelist() for month in range(1, 13): @@ -103,7 +87,70 @@ class MonitoringFuelParser(ParserPort): return df_monitorings - def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns): + def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int: + """Определение индекса заголовка в Excel по ключевому слову""" + # Читаем первые max_rows строк без заголовков + df_temp = pd.read_excel( + file_path, + sheet_name=sheet, + header=None, + nrows=max_rows, + engine='openpyxl' + ) + + # Ищем строку, где хотя бы в одном столбце встречается искомое значение + for idx, row in df_temp.iterrows(): + if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): + print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") + return idx + 1 # возвращаем индекс строки (0-based) + + raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.") + + def parse_single(self, file, sheet, header_num=None): + ''' Собственно парсер отчетов одного объекта''' + # Автоопределение header_num, если не передан + if header_num is None: + header_num = self.find_header_row(file, sheet, search_value="Установка") + # Читаем весь лист, начиная с найденной строки как заголовок + df_full = pd.read_excel( + file, + sheet_name=sheet, + header=header_num, + usecols=None, + index_col=None, + engine='openpyxl' + ) + + # === Удаление полностью пустых столбцов === + df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA + df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA + df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы + + # === Переименовываем нужные столбцы по позициям === + if len(df_full.columns) < 2: + raise ValueError("DataFrame должен содержать как минимум 2 столбца.") + + new_columns = df_full.columns.tolist() + + new_columns[0] = 'name' + new_columns[1] = 'normativ' + new_columns[-2] = 'total' + new_columns[-1] = 'total_1' + + df_full.columns = new_columns + + # Проверяем, что колонка 'name' существует + if 'name' in df_full.columns: + # Применяем функцию get_id_by_name к каждой строке в колонке 'name' + # df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code + pass # Placeholder for new_code + + # Устанавливаем id как индекс + df_full.set_index('id', inplace=True) + print(f"Окончательное количество столбцов: {len(df_full.columns)}") + return df_full + + def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]: ''' Служебная функция. Агрегация данных по среднему по определенным колонкам. ''' all_data = {} # Для хранения полных данных (месяцы) по каждой колонке means = {} # Для хранения средних @@ -185,22 +232,3 @@ class MonitoringFuelParser(ParserPort): total.name = 'mean' return total, df_combined - - def get_value(self, df, params): - mode = params.get("mode", "total") - columns = params.get("columns", None) - month = params.get("month", None) - data = None - if mode == "total": - if not columns: - raise ValueError("Отсутствуют идентификаторы столбцов") - df_means, _ = self.aggregate_by_columns(df, columns) - data = df_means.to_dict(orient='index') - elif mode == "month": - if not month: - raise ValueError("Отсутствуют идентификатор месяца") - df_month = self.get_month(df, month) - data = df_month.to_dict(orient='index') - - json_result = data_to_json(data) - return json_result diff --git a/python_parser/adapters/parsers/svodka_ca.py b/python_parser/adapters/parsers/svodka_ca.py index 029bf48..289e25b 100644 --- a/python_parser/adapters/parsers/svodka_ca.py +++ b/python_parser/adapters/parsers/svodka_ca.py @@ -6,85 +6,48 @@ from adapters.pconfig import get_og_by_name class SvodkaCAParser(ParserPort): - """Парсер для сводки СА""" + """Парсер для сводок СА""" - name = "Сводка СА" + name = "Сводки СА" - def extract_all_tables(self, file_path, sheet_name=0): - """Извлекает все таблицы из Excel файла""" - df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) - df_filled = df.fillna('') - df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="get_data", + method=self._get_data_wrapper, + required_params=["modes", "tables"], + optional_params=[], + description="Получение данных по режимам и таблицам" + ) - non_empty_rows = ~(df_clean.eq('').all(axis=1)) - non_empty_cols = ~(df_clean.eq('').all(axis=0)) + def _get_data_wrapper(self, params: dict): + """Обертка для получения данных (для совместимости)""" + modes = params["modes"] + tables = params["tables"] + + if not isinstance(modes, list): + raise ValueError("Поле 'modes' должно быть списком") + if not isinstance(tables, list): + raise ValueError("Поле 'tables' должно быть списком") + + # TODO: Переделать под новую архитектуру + data_dict = {} + for mode in modes: + data_dict[mode] = self.get_data(self.df, mode, tables) + return self.data_dict_to_json(data_dict) - row_indices = non_empty_rows[non_empty_rows].index.tolist() - col_indices = non_empty_cols[non_empty_cols].index.tolist() + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_svodka_ca(file_path, params) + return self.df - if not row_indices or not col_indices: - return [] - - row_blocks = self._get_contiguous_blocks(row_indices) - col_blocks = self._get_contiguous_blocks(col_indices) - - tables = [] - for r_start, r_end in row_blocks: - for c_start, c_end in col_blocks: - block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] - if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): - continue - - if self._is_header_row(block.iloc[0]): - block.columns = block.iloc[0] - block = block.iloc[1:].reset_index(drop=True) - else: - block = block.reset_index(drop=True) - block.columns = [f"col_{i}" for i in range(block.shape[1])] - - tables.append(block) - - return tables - - def _get_contiguous_blocks(self, indices): - """Группирует индексы в непрерывные блоки""" - if not indices: - return [] - blocks = [] - start = indices[0] - for i in range(1, len(indices)): - if indices[i] != indices[i-1] + 1: - blocks.append((start, indices[i-1])) - start = indices[i] - blocks.append((start, indices[-1])) - return blocks - - def _is_header_row(self, series): - """Определяет, похожа ли строка на заголовок""" - series_str = series.astype(str).str.strip() - non_empty = series_str[series_str != ''] - if len(non_empty) == 0: - return False - - def is_not_numeric(val): - try: - float(val.replace(',', '.')) - return False - except (ValueError, TypeError): - return True - - not_numeric_count = non_empty.apply(is_not_numeric).sum() - return not_numeric_count / len(non_empty) > 0.6 - - def _get_og_by_name(self, name): - """Функция для получения ID по имени (упрощенная версия)""" - # Упрощенная версия - возвращаем имя как есть - if not name or not isinstance(name, str): - return None - return name.strip() - - def parse_sheet(self, file_path, sheet_name, inclusion_list): - """Собственно функция парсинга отчета СА""" + def parse_svodka_ca(self, file_path: str, params: dict) -> dict: + """Парсинг сводки СА""" + # Получаем параметры из params + sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист + inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'}) + # === Извлечение и фильтрация === tables = self.extract_all_tables(file_path, sheet_name) @@ -190,76 +153,185 @@ class SvodkaCAParser(ParserPort): else: return None - def parse(self, file_path: str, params: dict) -> dict: - """Парсинг файла сводки СА""" - # === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив === - # Выгружаем План в df_ca_plan - inclusion_list_plan = { - "ТиП, %", - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн**", - "в т.ч. Идентифицированные безвозвратные потери, %**", - "в т.ч. Неидентифицированные потери, тонн**", - "в т.ч. Неидентифицированные потери, %**" - } + def extract_all_tables(self, file_path, sheet_name=0): + """Извлечение всех таблиц из Excel файла""" + df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl') + df_filled = df.fillna('') + df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) - df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА - print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---") + non_empty_rows = ~(df_clean.eq('').all(axis=1)) + non_empty_cols = ~(df_clean.eq('').all(axis=0)) - # Выгружаем Факт - inclusion_list_fact = { - "ТиП, %", - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн", - "в т.ч. Идентифицированные безвозвратные потери, %", - "в т.ч. Неидентифицированные потери, тонн", - "в т.ч. Неидентифицированные потери, %" - } + row_indices = non_empty_rows[non_empty_rows].index.tolist() + col_indices = non_empty_cols[non_empty_cols].index.tolist() - df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА - print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---") + if not row_indices or not col_indices: + return [] - # Выгружаем План в df_ca_normativ - inclusion_list_normativ = { - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн**", - "в т.ч. Идентифицированные безвозвратные потери, %**", - "в т.ч. Неидентифицированные потери, тонн**", - "в т.ч. Неидентифицированные потери, %**" - } + row_blocks = self._get_contiguous_blocks(row_indices) + col_blocks = self._get_contiguous_blocks(col_indices) - # ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА - df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ) + tables = [] + for r_start, r_end in row_blocks: + for c_start, c_end in col_blocks: + block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] + if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): + continue - print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---") + if self._is_header_row(block.iloc[0]): + block.columns = block.iloc[0] + block = block.iloc[1:].reset_index(drop=True) + else: + block = block.reset_index(drop=True) + block.columns = [f"col_{i}" for i in range(block.shape[1])] - df_dict = { - "plan": df_ca_plan, - "fact": df_ca_fact, - "normativ": df_ca_normativ - } - return df_dict + tables.append(block) + + return tables + + def _get_contiguous_blocks(self, indices): + """Группирует индексы в непрерывные блоки""" + if not indices: + return [] + blocks = [] + start = indices[0] + for i in range(1, len(indices)): + if indices[i] != indices[i-1] + 1: + blocks.append((start, indices[i-1])) + start = indices[i] + blocks.append((start, indices[-1])) + return blocks + + def _is_header_row(self, series): + """Определяет, похожа ли строка на заголовок""" + series_str = series.astype(str).str.strip() + non_empty = series_str[series_str != ''] + if len(non_empty) == 0: + return False + + def is_not_numeric(val): + try: + float(val.replace(',', '.')) + return False + except (ValueError, TypeError): + return True + + not_numeric_count = non_empty.apply(is_not_numeric).sum() + return not_numeric_count / len(non_empty) > 0.6 + + def _get_og_by_name(self, name): + """Функция для получения ID по имени (упрощенная версия)""" + # Упрощенная версия - возвращаем имя как есть + if not name or not isinstance(name, str): + return None + return name.strip() + + def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame: + """Парсинг листа Excel""" + # === Извлечение и фильтрация === + tables = self.extract_all_tables(file_path, sheet_name) + + # Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки + filtered_tables = [] + for table in tables: + if table.empty: + continue + first_row_values = table.iloc[0].astype(str).str.strip().tolist() + if any(val in inclusion_list for val in first_row_values): + filtered_tables.append(table) + + tables = filtered_tables + + # === Итоговый список таблиц датафреймов === + result_list = [] + + for table in tables: + if table.empty: + continue + + # Получаем первую строку (до удаления) + first_row_values = table.iloc[0].astype(str).str.strip().tolist() + + # Находим, какой элемент из inclusion_list присутствует + matched_key = None + for val in first_row_values: + if val in inclusion_list: + matched_key = val + break # берём первый совпадающий заголовок + + if matched_key is None: + continue # на всякий случай (хотя уже отфильтровано) + + # Удаляем первую строку (заголовок) и сбрасываем индекс + df_cleaned = table.iloc[1:].copy().reset_index(drop=True) + + # Пропускаем, если таблица пустая + if df_cleaned.empty: + continue + + # Первая строка становится заголовком + new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов + + # Преобразуем заголовок: только первый столбец может быть заменён на "name" + cleaned_header = [] + + # Обрабатываем первый столбец отдельно + first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0] + first_item_str = str(first_item).strip() if pd.notna(first_item) else "" + if first_item_str == "" or first_item_str == "nan": + cleaned_header.append("name") + else: + cleaned_header.append(first_item_str) + + # Остальные столбцы добавляем без изменений (или с минимальной очисткой) + for item in new_header[1:]: + # Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name" + item_str = str(item).strip() if pd.notna(item) else "" + cleaned_header.append(item_str) + + # Применяем очищенные названия столбцов + df_cleaned = df_cleaned[1:] # удаляем строку с заголовком + df_cleaned.columns = cleaned_header + df_cleaned = df_cleaned.reset_index(drop=True) + + if matched_key.endswith('**'): + cleaned_key = matched_key[:-2] # удаляем последние ** + else: + cleaned_key = matched_key + + # Добавляем новую колонку с именем параметра + df_cleaned["table"] = cleaned_key + + # Проверяем, что колонка 'name' существует + if 'name' not in df_cleaned.columns: + print( + f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.") + continue # или обработать по-другому + else: + # Применяем функцию get_id_by_name к каждой строке в колонке 'name' + df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name) + + # Удаляем строки, где id — None, NaN или пустой + df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN + # Дополнительно: удаляем None (если не поймал dropna) + df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')] + + # Добавляем в словарь + result_list.append(df_cleaned) + + # === Объединение и сортировка по id (индекс) и table === + if result_list: + combined_df = pd.concat(result_list, axis=0) + + # Сортируем по индексу (id) и по столбцу 'table' + combined_df = combined_df.sort_values(by=['id', 'table'], axis=0) + + # Устанавливаем id как индекс + # combined_df.set_index('id', inplace=True) + + return combined_df + else: + return None def data_dict_to_json(self, data_dict): ''' Служебная функция для парсинга словаря в json. ''' @@ -308,17 +380,3 @@ class SvodkaCAParser(ParserPort): filtered_df = df[df['table'].isin(table_values)].copy() result_dict = {key: group for key, group in filtered_df.groupby('table')} return result_dict - - def get_value(self, df: pd.DataFrame, params: dict): - - modes = params.get("modes") - tables = params.get("tables") - if not isinstance(modes, list): - raise ValueError("Поле 'modes' должно быть списком") - if not isinstance(tables, list): - raise ValueError("Поле 'tables' должно быть списком") - # Собираем данные - data_dict = {} - for mode in modes: - data_dict[mode] = self.get_data(df, mode, tables) - return self.data_dict_to_json(data_dict) diff --git a/python_parser/adapters/parsers/svodka_pm.py b/python_parser/adapters/parsers/svodka_pm.py index e794d6c..008c2f9 100644 --- a/python_parser/adapters/parsers/svodka_pm.py +++ b/python_parser/adapters/parsers/svodka_pm.py @@ -9,6 +9,60 @@ class SvodkaPMParser(ParserPort): name = "Сводки ПМ" + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="single_og", + method=self._get_single_og, + required_params=["id", "codes", "columns"], + optional_params=["search"], + description="Получение данных по одному ОГ" + ) + + self.register_getter( + name="total_ogs", + method=self._get_total_ogs, + required_params=["codes", "columns"], + optional_params=["search"], + description="Получение данных по всем ОГ" + ) + + def _get_single_og(self, params: dict): + """Получение данных по одному ОГ (обертка для совместимости)""" + og_id = params["id"] + codes = params["codes"] + columns = params["columns"] + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + + # Здесь нужно получить DataFrame из self.df, но пока используем старую логику + # TODO: Переделать под новую архитектуру + return self.get_svodka_og(self.df, og_id, codes, columns, search) + + def _get_total_ogs(self, params: dict): + """Получение данных по всем ОГ (обертка для совместимости)""" + codes = params["codes"] + columns = params["columns"] + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + + # TODO: Переделать под новую архитектуру + return self.get_svodka_total(self.df, codes, columns, search) + + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_svodka_pm_files(file_path, params) + return self.df + def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: """Определения индекса заголовка в excel по ключевому слову""" # Читаем первые max_rows строк без заголовков @@ -16,7 +70,8 @@ class SvodkaPMParser(ParserPort): file, sheet_name=sheet, header=None, - nrows=max_rows + nrows=max_rows, + engine='openpyxl' ) # Ищем строку, где хотя бы в одном столбце встречается искомое значение @@ -40,6 +95,7 @@ class SvodkaPMParser(ParserPort): header=header_num, usecols=None, nrows=2, + engine='openpyxl' ) if df_probe.shape[0] == 0: @@ -61,7 +117,8 @@ class SvodkaPMParser(ParserPort): sheet_name=sheet, header=header_num, usecols=None, - index_col=None + index_col=None, + engine='openpyxl' ) if indicator_col_name not in df_full.columns: @@ -99,25 +156,25 @@ class SvodkaPMParser(ParserPort): # Проверяем, является ли колонка пустой/некорректной is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' - # Проверяем, начинается ли на "Итого" - if col_str.startswith('Итого'): - current_name = 'Итого' - last_good_name = current_name # обновляем last_good_name - new_columns.append(current_name) - elif is_empty_or_unnamed: - # Используем последнее хорошее имя - new_columns.append(last_good_name) + if is_empty_or_unnamed: + # Если это пустая колонка, используем последнее хорошее имя + if last_good_name: + new_columns.append(last_good_name) + else: + # Если нет хорошего имени, пропускаем + continue else: - # Имя, полученное из exel + # Это хорошая колонка last_good_name = col_str new_columns.append(col_str) + # Применяем новые заголовки df_final.columns = new_columns - print(f"Окончательное количество столбцов: {len(df_final.columns)}") return df_final - def parse(self, file_path: str, params: dict) -> dict: + def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict: + """Парсинг ZIP архива со сводками ПМ""" import zipfile pm_dict = { "facts": {}, @@ -125,7 +182,7 @@ class SvodkaPMParser(ParserPort): } excel_fact_template = 'svodka_fact_pm_ID.xlsm' excel_plan_template = 'svodka_plan_pm_ID.xlsx' - with zipfile.ZipFile(file_path, 'r') as zip_ref: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: file_list = zip_ref.namelist() for name, id in OG_IDS.items(): if id == 'BASH': @@ -155,9 +212,9 @@ class SvodkaPMParser(ParserPort): return pm_dict - def get_svodka_value(self, df_svodka, id, code, search_value=None): - ''' Служебная функция для простой выборке по сводке ''' - row_index = id + def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None): + ''' Служебная функция получения значения по коду и столбцу ''' + row_index = code mask_value = df_svodka.iloc[0] == code if search_value is None: @@ -254,22 +311,4 @@ class SvodkaPMParser(ParserPort): return total_result - def get_value(self, df, params): - og_id = params.get("id") - codes = params.get("codes") - columns = params.get("columns") - search = params.get("search") - mode = params.get("mode", "total") - if not isinstance(codes, list): - raise ValueError("Поле 'codes' должно быть списком") - if not isinstance(columns, list): - raise ValueError("Поле 'columns' должно быть списком") - data = None - if mode == "single": - if not og_id: - raise ValueError("Отсутствует идентификатор ОГ") - data = self.get_svodka_og(df, og_id, codes, columns, search) - elif mode == "total": - data = self.get_svodka_total(df, codes, columns, search) - json_result = data_to_json(data) - return json_result + # Убираем старый метод get_value, так как он теперь в базовом классе