diff --git a/python_parser/adapters/__pycache__/pconfig.cpython-313.pyc b/python_parser/adapters/__pycache__/pconfig.cpython-313.pyc index 04c2deb..7816f04 100644 Binary files a/python_parser/adapters/__pycache__/pconfig.cpython-313.pyc and b/python_parser/adapters/__pycache__/pconfig.cpython-313.pyc differ diff --git a/python_parser/adapters/parsers/README_svodka_pm.md b/python_parser/adapters/parsers/README_svodka_pm.md new file mode 100644 index 0000000..c5170d7 --- /dev/null +++ b/python_parser/adapters/parsers/README_svodka_pm.md @@ -0,0 +1,88 @@ +# Парсер Сводки ПМ + +## Описание + +Парсер для обработки сводок ПМ (план и факт) с поддержкой множественных геттеров. Наследуется от `ParserPort` и реализует архитектуру hexagonal architecture. + +## Доступные геттеры + +### 1. `get_single_og` +Получение данных по одному ОГ из сводки ПМ. + +**Обязательные параметры:** +- `id` (str): ID ОГ (например, "SNPZ", "KNPZ") +- `codes` (list): Список кодов показателей (например, [78, 79, 81, 82]) +- `columns` (list): Список столбцов для извлечения (например, ["ПП", "БП", "СЭБ"]) + +**Необязательные параметры:** +- `search` (str): Значение для поиска в столбцах + +**Пример использования:** +```python +parser = SvodkaPMParser() +params = { + "id": "SNPZ", + "codes": [78, 79, 81, 82], + "columns": ["ПП", "БП", "СЭБ"] +} +result = parser.get_value("get_single_og", params) +``` + +### 2. `get_total_ogs` +Получение данных по всем ОГ из сводки ПМ. + +**Обязательные параметры:** +- `codes` (list): Список кодов показателей +- `columns` (list): Список столбцов для извлечения + +**Необязательные параметры:** +- `search` (str): Значение для поиска в столбцах + +**Пример использования:** +```python +parser = SvodkaPMParser() +params = { + "codes": [78, 79, 81, 82], + "columns": ["ПП", "БП", "СЭБ"] +} +result = parser.get_value("get_total_ogs", params) +``` + +## Поддерживаемые столбцы + +- **ПП, БП**: Данные из файлов плана +- **ТБ, СЭБ, НЭБ**: Данные из файлов факта + +## Структура файлов + +Парсер ожидает следующую структуру файлов: +- `data/pm_fact/svodka_fact_pm_{OG_ID}.xlsx` или `.xlsm` +- `data/pm_plan/svodka_plan_pm_{OG_ID}.xlsx` или `.xlsm` + +Где `{OG_ID}` - это ID ОГ (например, SNPZ, KNPZ и т.д.) + +## Формат результата + +Результат возвращается в формате JSON со следующей структурой: +```json +{ + "ПП": { + "78": 123.45, + "79": 234.56 + }, + "БП": { + "78": 111.11, + "79": 222.22 + }, + "СЭБ": { + "78": 333.33, + "79": 444.44 + } +} +``` + +## Обработка ошибок + +- Если файл плана/факта не найден, соответствующие столбцы будут пустыми +- Если код показателя не найден, возвращается 0 +- Валидация параметров выполняется автоматически \ No newline at end of file diff --git a/python_parser/adapters/parsers/__pycache__/monitoring_fuel.cpython-313.pyc b/python_parser/adapters/parsers/__pycache__/monitoring_fuel.cpython-313.pyc index 0fe7c41..2111524 100644 Binary files a/python_parser/adapters/parsers/__pycache__/monitoring_fuel.cpython-313.pyc and b/python_parser/adapters/parsers/__pycache__/monitoring_fuel.cpython-313.pyc differ diff --git a/python_parser/adapters/parsers/__pycache__/svodka_ca.cpython-313.pyc b/python_parser/adapters/parsers/__pycache__/svodka_ca.cpython-313.pyc index c5a523e..08aa240 100644 Binary files a/python_parser/adapters/parsers/__pycache__/svodka_ca.cpython-313.pyc and b/python_parser/adapters/parsers/__pycache__/svodka_ca.cpython-313.pyc differ diff --git a/python_parser/adapters/parsers/__pycache__/svodka_pm.cpython-313.pyc b/python_parser/adapters/parsers/__pycache__/svodka_pm.cpython-313.pyc index 66f3854..2b989af 100644 Binary files a/python_parser/adapters/parsers/__pycache__/svodka_pm.cpython-313.pyc and b/python_parser/adapters/parsers/__pycache__/svodka_pm.cpython-313.pyc differ diff --git a/python_parser/adapters/parsers/svodka_pm copy.py b/python_parser/adapters/parsers/svodka_pm copy.py new file mode 100644 index 0000000..3901a08 --- /dev/null +++ b/python_parser/adapters/parsers/svodka_pm copy.py @@ -0,0 +1,326 @@ +import pandas as pd + +from core.ports import ParserPort +from core.schema_utils import register_getter_from_schema, validate_params_with_schema +from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest +from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json + + +class SvodkaPMParser(ParserPort): + """Парсер для сводок ПМ (план и факт)""" + + name = "Сводки ПМ" + + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + # Используем схемы Pydantic как единый источник правды + register_getter_from_schema( + parser_instance=self, + getter_name="single_og", + method=self._get_single_og, + schema_class=SvodkaPMSingleOGRequest, + description="Получение данных по одному ОГ" + ) + + register_getter_from_schema( + parser_instance=self, + getter_name="total_ogs", + method=self._get_total_ogs, + schema_class=SvodkaPMTotalOGsRequest, + description="Получение данных по всем ОГ" + ) + + def _get_single_og(self, params: dict): + """Получение данных по одному ОГ""" + # Валидируем параметры с помощью схемы Pydantic + validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest) + + og_id = validated_params["id"] + codes = validated_params["codes"] + columns = validated_params["columns"] + search = validated_params.get("search") + + # Здесь нужно получить DataFrame из self.df, но пока используем старую логику + # TODO: Переделать под новую архитектуру + return self.get_svodka_og(self.df, og_id, codes, columns, search) + + def _get_total_ogs(self, params: dict): + """Получение данных по всем ОГ""" + # Валидируем параметры с помощью схемы Pydantic + validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest) + + codes = validated_params["codes"] + columns = validated_params["columns"] + search = validated_params.get("search") + + # TODO: Переделать под новую архитектуру + return self.get_svodka_total(self.df, codes, columns, search) + + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_svodka_pm_files(file_path, params) + return self.df + + def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: + """Определения индекса заголовка в excel по ключевому слову""" + # Читаем первые max_rows строк без заголовков + df_temp = pd.read_excel( + file, + sheet_name=sheet, + header=None, + nrows=max_rows, + engine='openpyxl' + ) + + # Ищем строку, где хотя бы в одном столбце встречается искомое значение + for idx, row in df_temp.iterrows(): + if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): + print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") + return idx # 0-based index — то, что нужно для header= + + raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.") + + def parse_svodka_pm(self, file, sheet, header_num=None): + ''' Собственно парсер отчетов одного ОГ для БП, ПП и факта ''' + # Автоопределение header_num, если не передан + if header_num is None: + header_num = self.find_header_row(file, sheet, search_value="Итого") + + # Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID + df_probe = pd.read_excel( + file, + sheet_name=sheet, + header=header_num, + usecols=None, + nrows=2, + engine='openpyxl' + ) + + if df_probe.shape[0] == 0: + raise ValueError("Файл пуст или не содержит данных.") + + first_data_row = df_probe.iloc[0] + + # Находим столбец с 'INDICATOR_ID' + indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID'] + if len(indicator_cols) == 0: + raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.') + + indicator_col_name = indicator_cols.index[0] + print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}") + + # Читаем весь лист + df_full = pd.read_excel( + file, + sheet_name=sheet, + header=header_num, + usecols=None, + index_col=None, + engine='openpyxl' + ) + + if indicator_col_name not in df_full.columns: + raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.") + + # Перемещаем INDICATOR_ID в начало и делаем индексом + cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name] + df_full = df_full[cols] + df_full.set_index(indicator_col_name, inplace=True) + + # Обрезаем до "Итого" + 1 + header_list = [str(h).strip() for h in df_full.columns] + try: + itogo_idx = header_list.index("Итого") + num_cols_needed = itogo_idx + 2 + except ValueError: + print('Столбец "Итого" не найден. Оставляем все столбцы.') + num_cols_needed = len(header_list) + + num_cols_needed = min(num_cols_needed, len(header_list)) + df_final = df_full.iloc[:, :num_cols_needed] + + # === Удаление полностью пустых столбцов === + df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True) + df_clean = df_clean.where(pd.notnull(df_clean), pd.NA) + non_empty_mask = df_clean.notna().any() + df_final = df_final.loc[:, non_empty_mask] + + # === Обработка заголовков: Unnamed и "Итого" → "Итого" === + new_columns = [] + last_good_name = None + for col in df_final.columns: + col_str = str(col).strip() + + # Проверяем, является ли колонка пустой/некорректной + is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' + + if is_empty_or_unnamed: + # Если это пустая колонка, используем последнее хорошее имя + if last_good_name: + new_columns.append(last_good_name) + else: + # Если нет хорошего имени, используем имя по умолчанию + new_columns.append(f"col_{len(new_columns)}") + else: + # Это хорошая колонка + last_good_name = col_str + new_columns.append(col_str) + + # Убеждаемся, что количество столбцов совпадает + if len(new_columns) != len(df_final.columns): + # Если количество не совпадает, обрезаем или дополняем + if len(new_columns) > len(df_final.columns): + new_columns = new_columns[:len(df_final.columns)] + else: + # Дополняем недостающие столбцы + while len(new_columns) < len(df_final.columns): + new_columns.append(f"col_{len(new_columns)}") + + # Применяем новые заголовки + df_final.columns = new_columns + + return df_final + + def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict: + """Парсинг ZIP архива со сводками ПМ""" + import zipfile + pm_dict = { + "facts": {}, + "plans": {} + } + excel_fact_template = 'svodka_fact_pm_ID.xlsm' + excel_plan_template = 'svodka_plan_pm_ID.xlsx' + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + file_list = zip_ref.namelist() + for name, id in OG_IDS.items(): + if id == 'BASH': + continue # пропускаем BASH + + current_fact = replace_id_in_path(excel_fact_template, id) + fact_candidates = [f for f in file_list if current_fact in f] + if len(fact_candidates) == 1: + print(f'Загрузка {current_fact}') + with zip_ref.open(fact_candidates[0]) as excel_file: + pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка') + print(f"✅ Факт загружен: {current_fact}") + else: + print(f"⚠️ Файл не найден (Факт): {current_fact}") + pm_dict['facts'][id] = None + + current_plan = replace_id_in_path(excel_plan_template, id) + plan_candidates = [f for f in file_list if current_plan in f] + if len(plan_candidates) == 1: + print(f'Загрузка {current_plan}') + with zip_ref.open(plan_candidates[0]) as excel_file: + pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка') + print(f"✅ План загружен: {current_plan}") + else: + print(f"⚠️ Файл не найден (План): {current_plan}") + pm_dict['plans'][id] = None + + return pm_dict + + def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None): + ''' Служебная функция получения значения по коду и столбцу ''' + row_index = code + + mask_value = df_svodka.iloc[0] == code + if search_value is None: + mask_name = df_svodka.columns != 'Итого' + else: + mask_name = df_svodka.columns == search_value + + # Убедимся, что маски совпадают по длине + if len(mask_value) != len(mask_name): + raise ValueError( + f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}" + ) + + final_mask = mask_value & mask_name # булевая маска по позициям столбцов + col_positions = final_mask.values # numpy array или Series булевых значений + + if not final_mask.any(): + print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'") + return 0 + else: + if row_index in df_svodka.index: + # Получаем позицию строки + row_loc = df_svodka.index.get_loc(row_index) + + # Извлекаем значения по позициям столбцов + values = df_svodka.iloc[row_loc, col_positions] + + # Преобразуем в числовой формат + numeric_values = pd.to_numeric(values, errors='coerce') + + # Агрегация данных (NaN игнорируются) + if search_value is None: + return numeric_values + else: + return numeric_values.iloc[0] + else: + return None + + def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None): + ''' Служебная функция получения данных по одному ОГ ''' + result = {} + + # Безопасно получаем данные, проверяя их наличие + fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None + plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None + + # Определяем, какие столбцы из какого датафрейма брать + for col in columns: + col_result = {} + + if col in ['ПП', 'БП']: + if plan_df is None: + print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}") + col_result = {code: None for code in codes} + else: + for code in codes: + val = self.get_svodka_value(plan_df, code, col, search_value) + col_result[code] = val + + elif col in ['ТБ', 'СЭБ', 'НЭБ']: + if fact_df is None: + print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}") + col_result = {code: None for code in codes} + else: + for code in codes: + val = self.get_svodka_value(fact_df, code, col, search_value) + col_result[code] = val + else: + print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.") + col_result = {code: None for code in codes} + + result[col] = col_result + + return result + + def get_svodka_total(self, pm_dict, codes, columns, search_value=None): + ''' Служебная функция агрегации данные по всем ОГ ''' + total_result = {} + + for name, og_id in OG_IDS.items(): + if og_id == 'BASH': + continue + + # print(f"📊 Обработка: {name} ({og_id})") + try: + data = self.get_svodka_og( + pm_dict, + og_id, + codes, + columns, + search_value + ) + total_result[og_id] = data + except Exception as e: + print(f"❌ Ошибка при обработке {name} ({og_id}): {e}") + total_result[og_id] = None + + return total_result + + # Убираем старый метод get_value, так как он теперь в базовом классе diff --git a/python_parser/adapters/parsers/svodka_pm.py b/python_parser/adapters/parsers/svodka_pm.py index 3901a08..9f8d5cd 100644 --- a/python_parser/adapters/parsers/svodka_pm.py +++ b/python_parser/adapters/parsers/svodka_pm.py @@ -1,9 +1,11 @@ -import pandas as pd + +import pandas as pd +import os +import json +from typing import Dict, Any, List, Optional from core.ports import ParserPort -from core.schema_utils import register_getter_from_schema, validate_params_with_schema -from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest -from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json +from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json class SvodkaPMParser(ParserPort): @@ -11,91 +13,66 @@ class SvodkaPMParser(ParserPort): name = "Сводки ПМ" + def __init__(self): + super().__init__() + self._register_default_getters() + def _register_default_getters(self): - """Регистрация геттеров по умолчанию""" - # Используем схемы Pydantic как единый источник правды - register_getter_from_schema( - parser_instance=self, - getter_name="single_og", + """Регистрация геттеров для Сводки ПМ""" + self.register_getter( + name="single_og", method=self._get_single_og, - schema_class=SvodkaPMSingleOGRequest, - description="Получение данных по одному ОГ" + required_params=["id", "codes", "columns"], + optional_params=["search"], + description="Получение данных по одному ОГ из сводки ПМ" ) - register_getter_from_schema( - parser_instance=self, - getter_name="total_ogs", + self.register_getter( + name="total_ogs", method=self._get_total_ogs, - schema_class=SvodkaPMTotalOGsRequest, - description="Получение данных по всем ОГ" + required_params=["codes", "columns"], + optional_params=["search"], + description="Получение данных по всем ОГ из сводки ПМ" ) - def _get_single_og(self, params: dict): - """Получение данных по одному ОГ""" - # Валидируем параметры с помощью схемы Pydantic - validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest) - - og_id = validated_params["id"] - codes = validated_params["codes"] - columns = validated_params["columns"] - search = validated_params.get("search") - - # Здесь нужно получить DataFrame из self.df, но пока используем старую логику - # TODO: Переделать под новую архитектуру - return self.get_svodka_og(self.df, og_id, codes, columns, search) - - def _get_total_ogs(self, params: dict): - """Получение данных по всем ОГ""" - # Валидируем параметры с помощью схемы Pydantic - validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest) - - codes = validated_params["codes"] - columns = validated_params["columns"] - search = validated_params.get("search") - - # TODO: Переделать под новую архитектуру - return self.get_svodka_total(self.df, codes, columns, search) - def parse(self, file_path: str, params: dict) -> pd.DataFrame: - """Парсинг файла и возврат DataFrame""" - # Сохраняем DataFrame для использования в геттерах - self.df = self.parse_svodka_pm_files(file_path, params) - return self.df + """Парсинг файла сводки ПМ и возврат DataFrame""" + # Проверяем расширение файла + if not file_path.lower().endswith(('.xlsx', '.xlsm', '.xls')): + raise ValueError(f"Неподдерживаемый формат файла: {file_path}") + + # Определяем тип файла по имени файла + filename = os.path.basename(file_path).lower() + + if "plan" in filename or "план" in filename: + sheet_name = "Сводка Нефтепереработка" + return self._parse_svodka_pm(file_path, sheet_name) + elif "fact" in filename or "факт" in filename: + sheet_name = "Сводка Нефтепереработка" + return self._parse_svodka_pm(file_path, sheet_name) + else: + # По умолчанию пытаемся парсить как есть + sheet_name = "Сводка Нефтепереработка" + return self._parse_svodka_pm(file_path, sheet_name) - def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: - """Определения индекса заголовка в excel по ключевому слову""" - # Читаем первые max_rows строк без заголовков - df_temp = pd.read_excel( - file, - sheet_name=sheet, - header=None, - nrows=max_rows, - engine='openpyxl' - ) + def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame: + """Парсинг отчетов одного ОГ для БП, ПП и факта""" + try: + # Автоопределение header_num, если не передан + if header_num is None: + header_num = find_header_row(file_path, sheet_name, search_value="Итого") - # Ищем строку, где хотя бы в одном столбце встречается искомое значение - for idx, row in df_temp.iterrows(): - if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): - print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") - return idx # 0-based index — то, что нужно для header= - - raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.") - - def parse_svodka_pm(self, file, sheet, header_num=None): - ''' Собственно парсер отчетов одного ОГ для БП, ПП и факта ''' - # Автоопределение header_num, если не передан - if header_num is None: - header_num = self.find_header_row(file, sheet, search_value="Итого") - - # Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID - df_probe = pd.read_excel( - file, - sheet_name=sheet, - header=header_num, - usecols=None, - nrows=2, - engine='openpyxl' - ) + # Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID + df_probe = pd.read_excel( + file_path, + sheet_name=sheet_name, + header=header_num, + usecols=None, + nrows=2, + engine='openpyxl' # Явно указываем движок + ) + except Exception as e: + raise ValueError(f"Ошибка при чтении файла {file_path}: {str(e)}") if df_probe.shape[0] == 0: raise ValueError("Файл пуст или не содержит данных.") @@ -108,16 +85,15 @@ class SvodkaPMParser(ParserPort): raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.') indicator_col_name = indicator_cols.index[0] - print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}") # Читаем весь лист df_full = pd.read_excel( - file, - sheet_name=sheet, + file_path, + sheet_name=sheet_name, header=header_num, usecols=None, index_col=None, - engine='openpyxl' + engine='openpyxl' # Явно указываем движок ) if indicator_col_name not in df_full.columns: @@ -134,19 +110,18 @@ class SvodkaPMParser(ParserPort): itogo_idx = header_list.index("Итого") num_cols_needed = itogo_idx + 2 except ValueError: - print('Столбец "Итого" не найден. Оставляем все столбцы.') num_cols_needed = len(header_list) num_cols_needed = min(num_cols_needed, len(header_list)) df_final = df_full.iloc[:, :num_cols_needed] - # === Удаление полностью пустых столбцов === + # Удаление полностью пустых столбцов df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True) df_clean = df_clean.where(pd.notnull(df_clean), pd.NA) non_empty_mask = df_clean.notna().any() df_final = df_final.loc[:, non_empty_mask] - # === Обработка заголовков: Unnamed и "Итого" → "Итого" === + # Обработка заголовков: Unnamed и "Итого" → "Итого" new_columns = [] last_good_name = None for col in df_final.columns: @@ -155,104 +130,69 @@ class SvodkaPMParser(ParserPort): # Проверяем, является ли колонка пустой/некорректной is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' - if is_empty_or_unnamed: - # Если это пустая колонка, используем последнее хорошее имя - if last_good_name: - new_columns.append(last_good_name) - else: - # Если нет хорошего имени, используем имя по умолчанию - new_columns.append(f"col_{len(new_columns)}") + # Проверяем, начинается ли на "Итого" + if col_str.startswith('Итого'): + current_name = 'Итого' + last_good_name = current_name + new_columns.append(current_name) + elif is_empty_or_unnamed: + # Используем последнее хорошее имя + new_columns.append(last_good_name) else: - # Это хорошая колонка + # Имя, полученное из excel last_good_name = col_str new_columns.append(col_str) - # Убеждаемся, что количество столбцов совпадает - if len(new_columns) != len(df_final.columns): - # Если количество не совпадает, обрезаем или дополняем - if len(new_columns) > len(df_final.columns): - new_columns = new_columns[:len(df_final.columns)] - else: - # Дополняем недостающие столбцы - while len(new_columns) < len(df_final.columns): - new_columns.append(f"col_{len(new_columns)}") - - # Применяем новые заголовки df_final.columns = new_columns return df_final - def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict: - """Парсинг ZIP архива со сводками ПМ""" - import zipfile - pm_dict = { - "facts": {}, - "plans": {} - } - excel_fact_template = 'svodka_fact_pm_ID.xlsm' - excel_plan_template = 'svodka_plan_pm_ID.xlsx' - with zipfile.ZipFile(zip_path, 'r') as zip_ref: - file_list = zip_ref.namelist() - for name, id in OG_IDS.items(): - if id == 'BASH': - continue # пропускаем BASH - - current_fact = replace_id_in_path(excel_fact_template, id) - fact_candidates = [f for f in file_list if current_fact in f] - if len(fact_candidates) == 1: - print(f'Загрузка {current_fact}') - with zip_ref.open(fact_candidates[0]) as excel_file: - pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка') - print(f"✅ Факт загружен: {current_fact}") - else: - print(f"⚠️ Файл не найден (Факт): {current_fact}") - pm_dict['facts'][id] = None - - current_plan = replace_id_in_path(excel_plan_template, id) - plan_candidates = [f for f in file_list if current_plan in f] - if len(plan_candidates) == 1: - print(f'Загрузка {current_plan}') - with zip_ref.open(plan_candidates[0]) as excel_file: - pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка') - print(f"✅ План загружен: {current_plan}") - else: - print(f"⚠️ Файл не найден (План): {current_plan}") - pm_dict['plans'][id] = None - - return pm_dict - - def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None): - ''' Служебная функция получения значения по коду и столбцу ''' - row_index = code + def _get_svodka_value(self, df_svodka: pd.DataFrame, id: str, code: int, search_value: Optional[str] = None): + """Служебная функция для простой выборке по сводке""" + row_index = id + + print(f"🔍 DEBUG: Ищем код '{code}' в строке '{row_index}'") + print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}") + print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}") + # Ищем столбцы, где в первой строке есть значение code mask_value = df_svodka.iloc[0] == code if search_value is None: mask_name = df_svodka.columns != 'Итого' else: mask_name = df_svodka.columns == search_value + print(f"🔍 DEBUG: mask_value = {mask_value.tolist()}") + print(f"🔍 DEBUG: mask_name = {mask_name.tolist()}") + # Убедимся, что маски совпадают по длине if len(mask_value) != len(mask_name): raise ValueError( - f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}" + f"❌ Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}" ) - final_mask = mask_value & mask_name # булевая маска по позициям столбцов - col_positions = final_mask.values # numpy array или Series булевых значений + final_mask = mask_value & mask_name + col_positions = final_mask.values + + print(f"🔍 DEBUG: final_mask = {final_mask.tolist()}") + print(f"🔍 DEBUG: col_positions = {col_positions}") if not final_mask.any(): - print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'") + print(f"⚠️ Код '{code}' не найден в первой строке") return 0 else: if row_index in df_svodka.index: # Получаем позицию строки row_loc = df_svodka.index.get_loc(row_index) + print(f"🔍 DEBUG: Найдена строка '{row_index}' в позиции {row_loc}") # Извлекаем значения по позициям столбцов values = df_svodka.iloc[row_loc, col_positions] + print(f"🔍 DEBUG: Извлеченные значения: {values.tolist()}") # Преобразуем в числовой формат numeric_values = pd.to_numeric(values, errors='coerce') + print(f"🔍 DEBUG: Числовые значения: {numeric_values.tolist()}") # Агрегация данных (NaN игнорируются) if search_value is None: @@ -260,15 +200,43 @@ class SvodkaPMParser(ParserPort): else: return numeric_values.iloc[0] else: + print(f"⚠️ Строка '{row_index}' не найдена в индексе") return None - def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None): - ''' Служебная функция получения данных по одному ОГ ''' + def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None): + """Служебная функция получения данных по одному ОГ""" result = {} - # Безопасно получаем данные, проверяя их наличие - fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None - plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None + # Пути к файлам + exel_fact = 'data/pm_fact/svodka_fact_pm_ID' + exel_plan = 'data/pm_plan/svodka_plan_pm_ID' + + current_fact = replace_id_in_path(exel_fact, og_id) + current_plan = replace_id_in_path(exel_plan, og_id) + + # Загружаем данные + fact_df = None + plan_df = None + + if os.path.exists(current_fact): + try: + fact_df = self._parse_svodka_pm(current_fact, 'Сводка Нефтепереработка') + print(f"✅ Файл факта загружен: {current_fact}") + print(f"📊 Столбцы факта: {list(fact_df.columns)}") + print(f"📊 Индексы факта: {list(fact_df.index)}") + except Exception as e: + print(f"❌ Ошибка при загрузке файла факта {current_fact}: {e}") + fact_df = None + + if os.path.exists(current_plan): + try: + plan_df = self._parse_svodka_pm(current_plan, 'Сводка Нефтепереработка') + print(f"✅ Файл плана загружен: {current_plan}") + print(f"📊 Столбцы плана: {list(plan_df.columns)}") + print(f"📊 Индексы плана: {list(plan_df.index)}") + except Exception as e: + print(f"❌ Ошибка при загрузке файла плана {current_plan}: {e}") + plan_df = None # Определяем, какие столбцы из какого датафрейма брать for col in columns: @@ -276,51 +244,88 @@ class SvodkaPMParser(ParserPort): if col in ['ПП', 'БП']: if plan_df is None: - print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}") - col_result = {code: None for code in codes} + print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}") else: for code in codes: - val = self.get_svodka_value(plan_df, code, col, search_value) - col_result[code] = val + val = self._get_svodka_value(plan_df, og_id, code, search_value) + col_result[str(code)] = val elif col in ['ТБ', 'СЭБ', 'НЭБ']: if fact_df is None: - print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}") - col_result = {code: None for code in codes} + print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}") else: for code in codes: - val = self.get_svodka_value(fact_df, code, col, search_value) - col_result[code] = val + val = self._get_svodka_value(fact_df, og_id, code, search_value) + col_result[str(code)] = val else: print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.") - col_result = {code: None for code in codes} + col_result = {str(code): None for code in codes} result[col] = col_result return result - def get_svodka_total(self, pm_dict, codes, columns, search_value=None): - ''' Служебная функция агрегации данные по всем ОГ ''' + def _get_single_og(self, params: Dict[str, Any]) -> str: + """API функция для получения данных по одному ОГ""" + # Если на входе строка — парсим как JSON + if isinstance(params, str): + try: + params = json.loads(params) + except json.JSONDecodeError as e: + raise ValueError(f"Некорректный JSON: {e}") + + # Проверяем структуру + if not isinstance(params, dict): + raise TypeError("Конфиг должен быть словарём или JSON-строкой") + + og_id = params.get("id") + codes = params.get("codes") + columns = params.get("columns") + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + + data = self._get_svodka_og(og_id, codes, columns, search) + json_result = data_to_json(data) + return json_result + + def _get_total_ogs(self, params: Dict[str, Any]) -> str: + """API функция для получения данных по всем ОГ""" + # Если на входе строка — парсим как JSON + if isinstance(params, str): + try: + params = json.loads(params) + except json.JSONDecodeError as e: + raise ValueError(f"❌Некорректный JSON: {e}") + + # Проверяем структуру + if not isinstance(params, dict): + raise TypeError("Конфиг должен быть словарём или JSON-строкой") + + codes = params.get("codes") + columns = params.get("columns") + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + total_result = {} - for name, og_id in OG_IDS.items(): + for og_id in SINGLE_OGS: if og_id == 'BASH': continue - # print(f"📊 Обработка: {name} ({og_id})") try: - data = self.get_svodka_og( - pm_dict, - og_id, - codes, - columns, - search_value - ) + data = self._get_svodka_og(og_id, codes, columns, search) total_result[og_id] = data except Exception as e: - print(f"❌ Ошибка при обработке {name} ({og_id}): {e}") + print(f"❌ Ошибка при обработке {og_id}: {e}") total_result[og_id] = None - return total_result - - # Убираем старый метод get_value, так как он теперь в базовом классе + json_result = data_to_json(total_result) + return json_result \ No newline at end of file diff --git a/python_parser/adapters/pconfig.py b/python_parser/adapters/pconfig.py index 12be990..d2a0b33 100644 --- a/python_parser/adapters/pconfig.py +++ b/python_parser/adapters/pconfig.py @@ -3,6 +3,7 @@ from functools import lru_cache import json import numpy as np import pandas as pd +import os OG_IDS = { "Комсомольский НПЗ": "KNPZ", @@ -22,8 +23,37 @@ OG_IDS = { "Красноленинский НПЗ": "KLNPZ", "Пурнефтепереработка": "PurNP", "ЯНОС": "YANOS", + "Уфанефтехим": "UNH", + "РНПК": "RNPK", + "КмсНПЗ": "KNPZ", + "АНХК": "ANHK", + "НК НПЗ": "NovKuybNPZ", + "КНПЗ": "KuybNPZ", + "СНПЗ": "CyzNPZ", + "Нижневаторское НПО": "NVNPO", + "ПурНП": "PurNP", } +SINGLE_OGS = [ + "KNPZ", + "ANHK", + "AchNPZ", + "BASH", + "UNPZ", + "UNH", + "NOV", + "NovKuybNPZ", + "KuybNPZ", + "CyzNPZ", + "TuapsNPZ", + "SNPZ", + "RNPK", + "NVNPO", + "KLNPZ", + "PurNP", + "YANOS", +] + SNPZ_IDS = { "Висбрекинг": "SNPZ.VISB", "Изомеризация": "SNPZ.IZOM", @@ -40,7 +70,18 @@ SNPZ_IDS = { def replace_id_in_path(file_path, new_id): - return file_path.replace('ID', str(new_id)) + # Заменяем 'ID' на новое значение + modified_path = file_path.replace('ID', str(new_id)) + '.xlsx' + + # Проверяем, существует ли файл + if not os.path.exists(modified_path): + # Меняем расширение на .xlsm + directory, filename = os.path.split(modified_path) + name, ext = os.path.splitext(filename) + new_filename = name + '.xlsm' + modified_path = os.path.join(directory, new_filename) + + return modified_path def get_table_name(exel): @@ -109,6 +150,25 @@ def get_id_by_name(name, dictionary): return best_match +def find_header_row(file, sheet, search_value="Итого", max_rows=50): + ''' Определения индекса заголовка в exel по ключевому слову ''' + # Читаем первые max_rows строк без заголовков + df_temp = pd.read_excel( + file, + sheet_name=sheet, + header=None, + nrows=max_rows + ) + + # Ищем строку, где хотя бы в одном столбце встречается искомое значение + for idx, row in df_temp.iterrows(): + if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any(): + print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})") + return idx # 0-based index — то, что нужно для header= + + raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.") + + def data_to_json(data, indent=2, ensure_ascii=False): """ Полностью безопасная сериализация данных в JSON. @@ -153,11 +213,18 @@ def data_to_json(data, indent=2, ensure_ascii=False): # --- рекурсия по dict и list --- elif isinstance(obj, dict): - return { - key: convert_obj(value) - for key, value in obj.items() - if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON) - } + # Обрабатываем только значения, ключи оставляем как строки + converted = {} + for k, v in obj.items(): + if is_nan_like(k): + continue # ключи не могут быть null в JSON + # Превращаем ключ в строку, но не пытаемся интерпретировать как число + key_str = str(k) + converted[key_str] = convert_obj(v) # только значение проходит через convert_obj + # Если все значения 0.0 — считаем, что данных нет, т.к. возможно ожидается массив. + if converted and all(v == 0.0 for v in converted.values()): + return None + return converted elif isinstance(obj, list): return [convert_obj(item) for item in obj] @@ -175,7 +242,6 @@ def data_to_json(data, indent=2, ensure_ascii=False): try: cleaned_data = convert_obj(data) - cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii) - return cleaned_data + return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii) except Exception as e: raise ValueError(f"Не удалось сериализовать данные в JSON: {e}") diff --git a/python_parser/app/schemas/svodka_pm.py b/python_parser/app/schemas/svodka_pm.py index 2e9d5ba..23e4ed6 100644 --- a/python_parser/app/schemas/svodka_pm.py +++ b/python_parser/app/schemas/svodka_pm.py @@ -25,7 +25,7 @@ class OGID(str, Enum): class SvodkaPMSingleOGRequest(BaseModel): - id: OGID = Field( + id: str = Field( ..., description="Идентификатор МА для запрашиваемого ОГ", example="SNPZ"