import pandas as pd import numpy as np import logging from core.ports import ParserPort from core.schema_utils import register_getter_from_schema, validate_params_with_schema from app.schemas.svodka_ca import SvodkaCARequest from adapters.pconfig import get_og_by_name # Настройка логгера для модуля logger = logging.getLogger(__name__) class SvodkaCAParser(ParserPort): """Парсер для сводок СА""" name = "Сводки СА" def _register_default_getters(self): """Регистрация геттеров по умолчанию""" # Используем схемы Pydantic как единый источник правды register_getter_from_schema( parser_instance=self, getter_name="get_ca_data", method=self._get_data_wrapper, schema_class=SvodkaCARequest, description="Получение данных по режимам и таблицам" ) def _get_data_wrapper(self, params: dict): """Получение данных по режимам и таблицам""" logger.debug(f"🔍 _get_data_wrapper вызван с параметрами: {params}") # Валидируем параметры с помощью схемы Pydantic validated_params = validate_params_with_schema(params, SvodkaCARequest) modes = validated_params["modes"] tables = validated_params["tables"] logger.debug(f"🔍 Запрошенные режимы: {modes}") logger.debug(f"🔍 Запрошенные таблицы: {tables}") # Проверяем, есть ли данные в data_dict (из парсинга) или в df (из загрузки) if hasattr(self, 'data_dict') and self.data_dict is not None: # Данные из парсинга data_source = self.data_dict logger.debug(f"🔍 Используем data_dict с режимами: {list(data_source.keys())}") elif hasattr(self, 'df') and self.df is not None and not self.df.empty: # Данные из загрузки - преобразуем DataFrame обратно в словарь data_source = self._df_to_data_dict() logger.debug(f"🔍 Используем df, преобразованный в data_dict с режимами: {list(data_source.keys())}") else: logger.warning(f"🔍 Нет данных! data_dict={getattr(self, 'data_dict', 'None')}, df={getattr(self, 'df', 'None')}") return {} # Фильтруем данные по запрошенным режимам и таблицам result_data = {} for mode in modes: if mode in data_source: result_data[mode] = {} available_tables = list(data_source[mode].keys()) logger.debug(f"🔍 Режим '{mode}' содержит таблицы: {available_tables}") for table_name, table_data in data_source[mode].items(): # Ищем таблицы по частичному совпадению for requested_table in tables: if requested_table in table_name: result_data[mode][table_name] = table_data logger.debug(f"🔍 Добавлена таблица '{table_name}' (совпадение с '{requested_table}') с {len(table_data)} записями") break # Найдено совпадение, переходим к следующей таблице else: logger.warning(f"🔍 Режим '{mode}' не найден в data_source") logger.debug(f"🔍 Итоговый результат содержит режимы: {list(result_data.keys())}") return result_data def _df_to_data_dict(self): """Преобразование DataFrame обратно в словарь данных""" if not hasattr(self, 'df') or self.df is None or self.df.empty: return {} data_dict = {} # Группируем данные по режимам и таблицам for _, row in self.df.iterrows(): mode = row.get('mode') table = row.get('table') data = row.get('data') if mode and table and data is not None: if mode not in data_dict: data_dict[mode] = {} data_dict[mode][table] = data return data_dict def parse(self, file_path: str, params: dict) -> pd.DataFrame: """Парсинг файла и возврат DataFrame""" logger.debug(f"🔍 SvodkaCAParser.parse вызван с файлом: {file_path}") # Парсим данные и сохраняем словарь для использования в геттерах self.data_dict = self.parse_svodka_ca(file_path, params) # Преобразуем словарь в DataFrame для совместимости с services.py # Создаем простой DataFrame с информацией о загруженных данных if self.data_dict: # Создаем DataFrame с информацией о режимах и таблицах data_rows = [] for mode, tables in self.data_dict.items(): for table_name, table_data in tables.items(): if table_data: data_rows.append({ 'mode': mode, 'table': table_name, 'rows_count': len(table_data), 'data': table_data }) if data_rows: df = pd.DataFrame(data_rows) self.df = df logger.debug(f"🔍 Создан DataFrame с {len(data_rows)} записями") return df # Если данных нет, возвращаем пустой DataFrame self.df = pd.DataFrame() logger.debug(f"🔍 Возвращаем пустой DataFrame") return self.df def parse_svodka_ca(self, file_path: str, params: dict) -> dict: """Парсинг сводки СА - работает с тремя листами: План, Факт, Норматив""" logger.debug(f"🔍 Начинаем парсинг сводки СА из файла: {file_path}") # === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив === # Выгружаем План inclusion_list_plan = { "ТиП, %", "Топливо итого, тонн", "Топливо итого, %", "Топливо на технологию, тонн", "Топливо на технологию, %", "Топливо на энергетику, тонн", "Топливо на энергетику, %", "Потери итого, тонн", "Потери итого, %", "в т.ч. Идентифицированные безвозвратные потери, тонн**", "в т.ч. Идентифицированные безвозвратные потери, %**", "в т.ч. Неидентифицированные потери, тонн**", "в т.ч. Неидентифицированные потери, %**" } df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) logger.debug(f"🔍 Объединённый и отсортированный План: {df_ca_plan.shape if df_ca_plan is not None else 'None'}") # Выгружаем Факт inclusion_list_fact = { "ТиП, %", "Топливо итого, тонн", "Топливо итого, %", "Топливо на технологию, тонн", "Топливо на технологию, %", "Топливо на энергетику, тонн", "Топливо на энергетику, %", "Потери итого, тонн", "Потери итого, %", "в т.ч. Идентифицированные безвозвратные потери, тонн", "в т.ч. Идентифицированные безвозвратные потери, %", "в т.ч. Неидентифицированные потери, тонн", "в т.ч. Неидентифицированные потери, %" } df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) logger.debug(f"🔍 Объединённый и отсортированный Факт: {df_ca_fact.shape if df_ca_fact is not None else 'None'}") # Выгружаем Норматив inclusion_list_normativ = { "Топливо итого, тонн", "Топливо итого, %", "Топливо на технологию, тонн", "Топливо на технологию, %", "Топливо на энергетику, тонн", "Топливо на энергетику, %", "Потери итого, тонн", "Потери итого, %", "в т.ч. Идентифицированные безвозвратные потери, тонн**", "в т.ч. Идентифицированные безвозвратные потери, %**", "в т.ч. Неидентифицированные потери, тонн**", "в т.ч. Неидентифицированные потери, %**" } df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ) logger.debug(f"🔍 Объединённый и отсортированный Норматив: {df_ca_normativ.shape if df_ca_normativ is not None else 'None'}") # Преобразуем DataFrame в словарь по режимам и таблицам data_dict = {} # Обрабатываем План if df_ca_plan is not None and not df_ca_plan.empty: data_dict['plan'] = {} for table_name, group_df in df_ca_plan.groupby('table'): table_data = group_df.drop('table', axis=1) data_dict['plan'][table_name] = table_data.to_dict('records') # Обрабатываем Факт if df_ca_fact is not None and not df_ca_fact.empty: data_dict['fact'] = {} for table_name, group_df in df_ca_fact.groupby('table'): table_data = group_df.drop('table', axis=1) data_dict['fact'][table_name] = table_data.to_dict('records') # Обрабатываем Норматив if df_ca_normativ is not None and not df_ca_normativ.empty: data_dict['normativ'] = {} for table_name, group_df in df_ca_normativ.groupby('table'): table_data = group_df.drop('table', axis=1) data_dict['normativ'][table_name] = table_data.to_dict('records') logger.debug(f"🔍 Итоговый data_dict содержит режимы: {list(data_dict.keys())}") for mode, tables in data_dict.items(): logger.debug(f"🔍 Режим '{mode}' содержит таблицы: {list(tables.keys())}") return data_dict def extract_all_tables(self, file_path, sheet_name=0): """Извлечение всех таблиц из Excel файла""" df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl') df_filled = df.fillna('') df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) non_empty_rows = ~(df_clean.eq('').all(axis=1)) non_empty_cols = ~(df_clean.eq('').all(axis=0)) row_indices = non_empty_rows[non_empty_rows].index.tolist() col_indices = non_empty_cols[non_empty_cols].index.tolist() if not row_indices or not col_indices: return [] row_blocks = self._get_contiguous_blocks(row_indices) col_blocks = self._get_contiguous_blocks(col_indices) tables = [] for r_start, r_end in row_blocks: for c_start, c_end in col_blocks: block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): continue if self._is_header_row(block.iloc[0]): block.columns = block.iloc[0] block = block.iloc[1:].reset_index(drop=True) else: block = block.reset_index(drop=True) block.columns = [f"col_{i}" for i in range(block.shape[1])] tables.append(block) return tables def _get_contiguous_blocks(self, indices): """Группирует индексы в непрерывные блоки""" if not indices: return [] blocks = [] start = indices[0] for i in range(1, len(indices)): if indices[i] != indices[i-1] + 1: blocks.append((start, indices[i-1])) start = indices[i] blocks.append((start, indices[-1])) return blocks def _is_header_row(self, series): """Определяет, похожа ли строка на заголовок""" series_str = series.astype(str).str.strip() non_empty = series_str[series_str != ''] if len(non_empty) == 0: return False def is_not_numeric(val): try: float(val.replace(',', '.')) return False except (ValueError, TypeError): return True not_numeric_count = non_empty.apply(is_not_numeric).sum() return not_numeric_count / len(non_empty) > 0.6 def _get_og_by_name(self, name): """Функция для получения ID по имени (упрощенная версия)""" # Упрощенная версия - возвращаем имя как есть if not name or not isinstance(name, str): return None return name.strip() def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame: """Парсинг листа Excel""" # === Извлечение и фильтрация === tables = self.extract_all_tables(file_path, sheet_name) # Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки filtered_tables = [] for table in tables: if table.empty: continue first_row_values = table.iloc[0].astype(str).str.strip().tolist() if any(val in inclusion_list for val in first_row_values): filtered_tables.append(table) tables = filtered_tables # === Итоговый список таблиц датафреймов === result_list = [] for table in tables: if table.empty: continue # Получаем первую строку (до удаления) first_row_values = table.iloc[0].astype(str).str.strip().tolist() # Находим, какой элемент из inclusion_list присутствует matched_key = None for val in first_row_values: if val in inclusion_list: matched_key = val break # берём первый совпадающий заголовок if matched_key is None: continue # на всякий случай (хотя уже отфильтровано) # Удаляем первую строку (заголовок) и сбрасываем индекс df_cleaned = table.iloc[1:].copy().reset_index(drop=True) # Пропускаем, если таблица пустая if df_cleaned.empty: continue # Первая строка становится заголовком new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов # Преобразуем заголовок: только первый столбец может быть заменён на "name" cleaned_header = [] # Обрабатываем первый столбец отдельно first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0] first_item_str = str(first_item).strip() if pd.notna(first_item) else "" if first_item_str == "" or first_item_str == "nan": cleaned_header.append("name") else: cleaned_header.append(first_item_str) # Остальные столбцы добавляем без изменений (или с минимальной очисткой) for item in new_header[1:]: # Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name" item_str = str(item).strip() if pd.notna(item) else "" cleaned_header.append(item_str) # Применяем очищенные названия столбцов df_cleaned = df_cleaned[1:] # удаляем строку с заголовком df_cleaned.columns = cleaned_header df_cleaned = df_cleaned.reset_index(drop=True) if matched_key.endswith('**'): cleaned_key = matched_key[:-2] # удаляем последние ** else: cleaned_key = matched_key # Добавляем новую колонку с именем параметра df_cleaned["table"] = cleaned_key # Проверяем, что колонка 'name' существует if 'name' not in df_cleaned.columns: logger.debug( f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.") continue # или обработать по-другому else: # Применяем функцию get_id_by_name к каждой строке в колонке 'name' df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name) # Удаляем строки, где id — None, NaN или пустой df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN # Дополнительно: удаляем None (если не поймал dropna) df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')] # Добавляем в словарь result_list.append(df_cleaned) # === Объединение и сортировка по id (индекс) и table === if result_list: combined_df = pd.concat(result_list, axis=0) # Сортируем по индексу (id) и по столбцу 'table' combined_df = combined_df.sort_values(by=['id', 'table'], axis=0) # Устанавливаем id как индекс # combined_df.set_index('id', inplace=True) return combined_df else: return None def data_dict_to_json(self, data_dict): ''' Служебная функция для парсинга словаря в json. ''' def convert_types(obj): if isinstance(obj, (np.integer, np.int64)): return int(obj) elif isinstance(obj, (np.floating, np.float64)): return float(obj) if not np.isnan(obj) else None elif isinstance(obj, (np.ndarray,)): return obj.tolist() elif pd.isna(obj): return None elif isinstance(obj, (pd.Timestamp, np.datetime64)): return obj.isoformat() else: return obj # Новый словарь для JSON serializable_dict = {} for source, table_dict in data_dict.items(): # source: 'plan', 'fact', 'normativ' serializable_dict[source] = {} for table_name, df in table_dict.items(): # table_name: 'ТиП, %' и т.п., df: DataFrame if isinstance(df, pd.DataFrame): records = df.to_dict(orient='records') cleaned_records = [] for record in records: cleaned_record = { str(k): convert_types(v) for k, v in record.items() } cleaned_records.append(cleaned_record) serializable_dict[source][table_name] = cleaned_records else: # На всякий случай, если попался не DataFrame serializable_dict[source][table_name] = None return serializable_dict def get_data(self, df_dict, df_type, table_values): ''' Служебная функция. Выборка данных из датафрейма. ''' df = df_dict.get(df_type, {}) if 'table' not in df.columns: raise KeyError("В датафрейме отсутствует колонка 'table'") filtered_df = df[df['table'].isin(table_values)].copy() result_dict = {key: group for key, group in filtered_df.groupby('table')} return result_dict