diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2cca422 --- /dev/null +++ b/.gitignore @@ -0,0 +1,90 @@ +data/ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# VS Code +.vscode/ + +# PyCharm +.idea/ + +# Local envs +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# MacOS +.DS_Store + +# Windows +Thumbs.db +ehthumbs.db +Desktop.ini + +# MinIO test data +minio_data/ +minio_test/ +minio/ + +# Logs +*.log + +# Streamlit cache +.streamlit/ diff --git a/python_parser/Dockerfile_ b/python_parser/Dockerfile_ new file mode 100644 index 0000000..d1409f7 --- /dev/null +++ b/python_parser/Dockerfile_ @@ -0,0 +1,20 @@ +FROM repo-dev.predix.rosneft.ru/python:3.11-slim + +WORKDIR /app + +# RUN pip install kafka-python==2.0.2 +# RUN pip freeze > /app/requirements.txt + +# ADD . /app +COPY requirements.txt . + +RUN mkdir -p vendor +RUN pip download -r /app/requirements.txt --no-binary=:none: -d /app/vendor + +# ADD . /app + +# ENV KAFKA_BROKER=10.234.160.10:9093,10.234.160.10:9094,10.234.160.10:9095 +# ENV KAFKA_UPDATE_ALGORITHM_RULES_TOPIC=algorithm-rule-update +# ENV KAFKA_CLIENT_USERNAME=cf-service + +# CMD ["python", "/app/run_dev.py"] \ No newline at end of file diff --git a/python_parser/adapters/parsers/monitoring_fuel.py b/python_parser/adapters/parsers/monitoring_fuel.py index 8453d2e..4536b98 100644 --- a/python_parser/adapters/parsers/monitoring_fuel.py +++ b/python_parser/adapters/parsers/monitoring_fuel.py @@ -1,9 +1,9 @@ import pandas as pd import re -from typing import Dict - +import zipfile +from typing import Dict, Tuple from core.ports import ParserPort -from adapters.pconfig import data_to_json, get_object_by_name +from adapters.pconfig import data_to_json class MonitoringFuelParser(ParserPort): @@ -11,6 +11,82 @@ class MonitoringFuelParser(ParserPort): name = "Мониторинг топлива" + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="total_by_columns", + method=self._get_total_by_columns, + required_params=["columns"], + optional_params=[], + description="Агрегация данных по колонкам" + ) + + self.register_getter( + name="month_by_code", + method=self._get_month_by_code, + required_params=["month"], + optional_params=[], + description="Получение данных за конкретный месяц" + ) + + def _get_total_by_columns(self, params: dict): + """Агрегация по колонкам (обертка для совместимости)""" + columns = params["columns"] + if not columns: + raise ValueError("Отсутствуют идентификаторы столбцов") + + # TODO: Переделать под новую архитектуру + df_means, _ = self.aggregate_by_columns(self.df, columns) + return df_means.to_dict(orient='index') + + def _get_month_by_code(self, params: dict): + """Получение данных за месяц (обертка для совместимости)""" + month = params["month"] + if not month: + raise ValueError("Отсутствует идентификатор месяца") + + # TODO: Переделать под новую архитектуру + df_month = self.get_month(self.df, month) + return df_month.to_dict(orient='index') + + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_monitoring_fuel_files(file_path, params) + return self.df + + def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]: + """Парсинг ZIP архива с файлами мониторинга топлива""" + df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ + + with zipfile.ZipFile(zip_path, 'r') as zip_ref: + + file_list = zip_ref.namelist() + for month in range(1, 13): + + mm = f"{month:02d}" + file_temp = f'monitoring_SNPZ_{mm}.xlsm' + candidates = [f for f in file_list if file_temp in f] + + if len(candidates) == 1: + file = candidates[0] + + print(f'Загрузка {file}') + with zip_ref.open(file) as excel_file: + try: + df = self.parse_single(excel_file, 'Мониторинг потребления') + df_monitorings[mm] = df + + print(f"✅ Данные за месяц {mm} загружены") + + except Exception as e: + print(f"Ошибка при загрузке файла {file_temp}: {e}") + + else: + print(f"⚠️ Файл не найден: {file_temp}") + + return df_monitorings + def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int: """Определение индекса заголовка в Excel по ключевому слову""" # Читаем первые max_rows строк без заголовков @@ -64,46 +140,15 @@ class MonitoringFuelParser(ParserPort): # Проверяем, что колонка 'name' существует if 'name' in df_full.columns: # Применяем функцию get_id_by_name к каждой строке в колонке 'name' - df_full['id'] = df_full['name'].apply(get_object_by_name) + # df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code + pass # Placeholder for new_code # Устанавливаем id как индекс df_full.set_index('id', inplace=True) print(f"Окончательное количество столбцов: {len(df_full.columns)}") return df_full - def parse(self, file_path: str, params: dict) -> dict: - import zipfile - df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ - - with zipfile.ZipFile(file_path, 'r') as zip_ref: - - file_list = zip_ref.namelist() - for month in range(1, 13): - - mm = f"{month:02d}" - file_temp = f'monitoring_SNPZ_{mm}.xlsm' - candidates = [f for f in file_list if file_temp in f] - - if len(candidates) == 1: - file = candidates[0] - - print(f'Загрузка {file}') - with zip_ref.open(file) as excel_file: - try: - df = self.parse_single(excel_file, 'Мониторинг потребления') - df_monitorings[mm] = df - - print(f"✅ Данные за месяц {mm} загружены") - - except Exception as e: - print(f"Ошибка при загрузке файла {file_temp}: {e}") - - else: - print(f"⚠️ Файл не найден: {file_temp}") - - return df_monitorings - - def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns): + def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]: ''' Служебная функция. Агрегация данных по среднему по определенным колонкам. ''' all_data = {} # Для хранения полных данных (месяцы) по каждой колонке means = {} # Для хранения средних @@ -185,22 +230,3 @@ class MonitoringFuelParser(ParserPort): total.name = 'mean' return total, df_combined - - def get_value(self, df, params): - mode = params.get("mode", "total") - columns = params.get("columns", None) - month = params.get("month", None) - data = None - if mode == "total": - if not columns: - raise ValueError("Отсутствуют идентификаторы столбцов") - df_means, _ = self.aggregate_by_columns(df, columns) - data = df_means.to_dict(orient='index') - elif mode == "month": - if not month: - raise ValueError("Отсутствуют идентификатор месяца") - df_month = self.get_month(df, month) - data = df_month.to_dict(orient='index') - - json_result = data_to_json(data) - return json_result diff --git a/python_parser/adapters/parsers/svodka_ca.py b/python_parser/adapters/parsers/svodka_ca.py index 029bf48..e557542 100644 --- a/python_parser/adapters/parsers/svodka_ca.py +++ b/python_parser/adapters/parsers/svodka_ca.py @@ -6,85 +6,44 @@ from adapters.pconfig import get_og_by_name class SvodkaCAParser(ParserPort): - """Парсер для сводки СА""" + """Парсер для сводок СА""" - name = "Сводка СА" + name = "Сводки СА" - def extract_all_tables(self, file_path, sheet_name=0): - """Извлекает все таблицы из Excel файла""" - df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) - df_filled = df.fillna('') - df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="get_data", + method=self._get_data_wrapper, + required_params=["modes", "tables"], + optional_params=[], + description="Получение данных по режимам и таблицам" + ) - non_empty_rows = ~(df_clean.eq('').all(axis=1)) - non_empty_cols = ~(df_clean.eq('').all(axis=0)) + def _get_data_wrapper(self, params: dict): + """Обертка для получения данных (для совместимости)""" + modes = params["modes"] + tables = params["tables"] + + if not isinstance(modes, list): + raise ValueError("Поле 'modes' должно быть списком") + if not isinstance(tables, list): + raise ValueError("Поле 'tables' должно быть списком") + + # TODO: Переделать под новую архитектуру + data_dict = {} + for mode in modes: + data_dict[mode] = self.get_data(self.df, mode, tables) + return self.data_dict_to_json(data_dict) - row_indices = non_empty_rows[non_empty_rows].index.tolist() - col_indices = non_empty_cols[non_empty_cols].index.tolist() + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_svodka_ca(file_path, params) + return self.df - if not row_indices or not col_indices: - return [] - - row_blocks = self._get_contiguous_blocks(row_indices) - col_blocks = self._get_contiguous_blocks(col_indices) - - tables = [] - for r_start, r_end in row_blocks: - for c_start, c_end in col_blocks: - block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] - if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): - continue - - if self._is_header_row(block.iloc[0]): - block.columns = block.iloc[0] - block = block.iloc[1:].reset_index(drop=True) - else: - block = block.reset_index(drop=True) - block.columns = [f"col_{i}" for i in range(block.shape[1])] - - tables.append(block) - - return tables - - def _get_contiguous_blocks(self, indices): - """Группирует индексы в непрерывные блоки""" - if not indices: - return [] - blocks = [] - start = indices[0] - for i in range(1, len(indices)): - if indices[i] != indices[i-1] + 1: - blocks.append((start, indices[i-1])) - start = indices[i] - blocks.append((start, indices[-1])) - return blocks - - def _is_header_row(self, series): - """Определяет, похожа ли строка на заголовок""" - series_str = series.astype(str).str.strip() - non_empty = series_str[series_str != ''] - if len(non_empty) == 0: - return False - - def is_not_numeric(val): - try: - float(val.replace(',', '.')) - return False - except (ValueError, TypeError): - return True - - not_numeric_count = non_empty.apply(is_not_numeric).sum() - return not_numeric_count / len(non_empty) > 0.6 - - def _get_og_by_name(self, name): - """Функция для получения ID по имени (упрощенная версия)""" - # Упрощенная версия - возвращаем имя как есть - if not name or not isinstance(name, str): - return None - return name.strip() - - def parse_sheet(self, file_path, sheet_name, inclusion_list): - """Собственно функция парсинга отчета СА""" + def parse_svodka_ca(self, file_path: str, params: dict) -> dict: + """Парсинг сводки СА""" # === Извлечение и фильтрация === tables = self.extract_all_tables(file_path, sheet_name) @@ -190,76 +149,185 @@ class SvodkaCAParser(ParserPort): else: return None - def parse(self, file_path: str, params: dict) -> dict: - """Парсинг файла сводки СА""" - # === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив === - # Выгружаем План в df_ca_plan - inclusion_list_plan = { - "ТиП, %", - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн**", - "в т.ч. Идентифицированные безвозвратные потери, %**", - "в т.ч. Неидентифицированные потери, тонн**", - "в т.ч. Неидентифицированные потери, %**" - } + def extract_all_tables(self, file_path, sheet_name=0): + """Извлекает все таблицы из Excel файла""" + df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) + df_filled = df.fillna('') + df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True) - df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА - print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---") + non_empty_rows = ~(df_clean.eq('').all(axis=1)) + non_empty_cols = ~(df_clean.eq('').all(axis=0)) - # Выгружаем Факт - inclusion_list_fact = { - "ТиП, %", - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн", - "в т.ч. Идентифицированные безвозвратные потери, %", - "в т.ч. Неидентифицированные потери, тонн", - "в т.ч. Неидентифицированные потери, %" - } + row_indices = non_empty_rows[non_empty_rows].index.tolist() + col_indices = non_empty_cols[non_empty_cols].index.tolist() - df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА - print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---") + if not row_indices or not col_indices: + return [] - # Выгружаем План в df_ca_normativ - inclusion_list_normativ = { - "Топливо итого, тонн", - "Топливо итого, %", - "Топливо на технологию, тонн", - "Топливо на технологию, %", - "Топливо на энергетику, тонн", - "Топливо на энергетику, %", - "Потери итого, тонн", - "Потери итого, %", - "в т.ч. Идентифицированные безвозвратные потери, тонн**", - "в т.ч. Идентифицированные безвозвратные потери, %**", - "в т.ч. Неидентифицированные потери, тонн**", - "в т.ч. Неидентифицированные потери, %**" - } + row_blocks = self._get_contiguous_blocks(row_indices) + col_blocks = self._get_contiguous_blocks(col_indices) - # ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА - df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ) + tables = [] + for r_start, r_end in row_blocks: + for c_start, c_end in col_blocks: + block = df.iloc[r_start:r_end + 1, c_start:c_end + 1] + if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all(): + continue - print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---") + if self._is_header_row(block.iloc[0]): + block.columns = block.iloc[0] + block = block.iloc[1:].reset_index(drop=True) + else: + block = block.reset_index(drop=True) + block.columns = [f"col_{i}" for i in range(block.shape[1])] - df_dict = { - "plan": df_ca_plan, - "fact": df_ca_fact, - "normativ": df_ca_normativ - } - return df_dict + tables.append(block) + + return tables + + def _get_contiguous_blocks(self, indices): + """Группирует индексы в непрерывные блоки""" + if not indices: + return [] + blocks = [] + start = indices[0] + for i in range(1, len(indices)): + if indices[i] != indices[i-1] + 1: + blocks.append((start, indices[i-1])) + start = indices[i] + blocks.append((start, indices[-1])) + return blocks + + def _is_header_row(self, series): + """Определяет, похожа ли строка на заголовок""" + series_str = series.astype(str).str.strip() + non_empty = series_str[series_str != ''] + if len(non_empty) == 0: + return False + + def is_not_numeric(val): + try: + float(val.replace(',', '.')) + return False + except (ValueError, TypeError): + return True + + not_numeric_count = non_empty.apply(is_not_numeric).sum() + return not_numeric_count / len(non_empty) > 0.6 + + def _get_og_by_name(self, name): + """Функция для получения ID по имени (упрощенная версия)""" + # Упрощенная версия - возвращаем имя как есть + if not name or not isinstance(name, str): + return None + return name.strip() + + def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame: + """Парсинг листа Excel""" + # === Извлечение и фильтрация === + tables = self.extract_all_tables(file_path, sheet_name) + + # Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки + filtered_tables = [] + for table in tables: + if table.empty: + continue + first_row_values = table.iloc[0].astype(str).str.strip().tolist() + if any(val in inclusion_list for val in first_row_values): + filtered_tables.append(table) + + tables = filtered_tables + + # === Итоговый список таблиц датафреймов === + result_list = [] + + for table in tables: + if table.empty: + continue + + # Получаем первую строку (до удаления) + first_row_values = table.iloc[0].astype(str).str.strip().tolist() + + # Находим, какой элемент из inclusion_list присутствует + matched_key = None + for val in first_row_values: + if val in inclusion_list: + matched_key = val + break # берём первый совпадающий заголовок + + if matched_key is None: + continue # на всякий случай (хотя уже отфильтровано) + + # Удаляем первую строку (заголовок) и сбрасываем индекс + df_cleaned = table.iloc[1:].copy().reset_index(drop=True) + + # Пропускаем, если таблица пустая + if df_cleaned.empty: + continue + + # Первая строка становится заголовком + new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов + + # Преобразуем заголовок: только первый столбец может быть заменён на "name" + cleaned_header = [] + + # Обрабатываем первый столбец отдельно + first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0] + first_item_str = str(first_item).strip() if pd.notna(first_item) else "" + if first_item_str == "" or first_item_str == "nan": + cleaned_header.append("name") + else: + cleaned_header.append(first_item_str) + + # Остальные столбцы добавляем без изменений (или с минимальной очисткой) + for item in new_header[1:]: + # Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name" + item_str = str(item).strip() if pd.notna(item) else "" + cleaned_header.append(item_str) + + # Применяем очищенные названия столбцов + df_cleaned = df_cleaned[1:] # удаляем строку с заголовком + df_cleaned.columns = cleaned_header + df_cleaned = df_cleaned.reset_index(drop=True) + + if matched_key.endswith('**'): + cleaned_key = matched_key[:-2] # удаляем последние ** + else: + cleaned_key = matched_key + + # Добавляем новую колонку с именем параметра + df_cleaned["table"] = cleaned_key + + # Проверяем, что колонка 'name' существует + if 'name' not in df_cleaned.columns: + print( + f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.") + continue # или обработать по-другому + else: + # Применяем функцию get_id_by_name к каждой строке в колонке 'name' + df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name) + + # Удаляем строки, где id — None, NaN или пустой + df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN + # Дополнительно: удаляем None (если не поймал dropna) + df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')] + + # Добавляем в словарь + result_list.append(df_cleaned) + + # === Объединение и сортировка по id (индекс) и table === + if result_list: + combined_df = pd.concat(result_list, axis=0) + + # Сортируем по индексу (id) и по столбцу 'table' + combined_df = combined_df.sort_values(by=['id', 'table'], axis=0) + + # Устанавливаем id как индекс + # combined_df.set_index('id', inplace=True) + + return combined_df + else: + return None def data_dict_to_json(self, data_dict): ''' Служебная функция для парсинга словаря в json. ''' @@ -308,17 +376,3 @@ class SvodkaCAParser(ParserPort): filtered_df = df[df['table'].isin(table_values)].copy() result_dict = {key: group for key, group in filtered_df.groupby('table')} return result_dict - - def get_value(self, df: pd.DataFrame, params: dict): - - modes = params.get("modes") - tables = params.get("tables") - if not isinstance(modes, list): - raise ValueError("Поле 'modes' должно быть списком") - if not isinstance(tables, list): - raise ValueError("Поле 'tables' должно быть списком") - # Собираем данные - data_dict = {} - for mode in modes: - data_dict[mode] = self.get_data(df, mode, tables) - return self.data_dict_to_json(data_dict) diff --git a/python_parser/adapters/parsers/svodka_pm.py b/python_parser/adapters/parsers/svodka_pm.py index e794d6c..5e913d1 100644 --- a/python_parser/adapters/parsers/svodka_pm.py +++ b/python_parser/adapters/parsers/svodka_pm.py @@ -9,6 +9,60 @@ class SvodkaPMParser(ParserPort): name = "Сводки ПМ" + def _register_default_getters(self): + """Регистрация геттеров по умолчанию""" + self.register_getter( + name="single_og", + method=self._get_single_og, + required_params=["id", "codes", "columns"], + optional_params=["search"], + description="Получение данных по одному ОГ" + ) + + self.register_getter( + name="total_ogs", + method=self._get_total_ogs, + required_params=["codes", "columns"], + optional_params=["search"], + description="Получение данных по всем ОГ" + ) + + def _get_single_og(self, params: dict): + """Получение данных по одному ОГ (обертка для совместимости)""" + og_id = params["id"] + codes = params["codes"] + columns = params["columns"] + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + + # Здесь нужно получить DataFrame из self.df, но пока используем старую логику + # TODO: Переделать под новую архитектуру + return self.get_svodka_og(self.df, og_id, codes, columns, search) + + def _get_total_ogs(self, params: dict): + """Получение данных по всем ОГ (обертка для совместимости)""" + codes = params["codes"] + columns = params["columns"] + search = params.get("search") + + if not isinstance(codes, list): + raise ValueError("Поле 'codes' должно быть списком") + if not isinstance(columns, list): + raise ValueError("Поле 'columns' должно быть списком") + + # TODO: Переделать под новую архитектуру + return self.get_svodka_total(self.df, codes, columns, search) + + def parse(self, file_path: str, params: dict) -> pd.DataFrame: + """Парсинг файла и возврат DataFrame""" + # Сохраняем DataFrame для использования в геттерах + self.df = self.parse_svodka_pm_files(file_path, params) + return self.df + def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int: """Определения индекса заголовка в excel по ключевому слову""" # Читаем первые max_rows строк без заголовков @@ -99,25 +153,25 @@ class SvodkaPMParser(ParserPort): # Проверяем, является ли колонка пустой/некорректной is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan' - # Проверяем, начинается ли на "Итого" - if col_str.startswith('Итого'): - current_name = 'Итого' - last_good_name = current_name # обновляем last_good_name - new_columns.append(current_name) - elif is_empty_or_unnamed: - # Используем последнее хорошее имя - new_columns.append(last_good_name) + if is_empty_or_unnamed: + # Если это пустая колонка, используем последнее хорошее имя + if last_good_name: + new_columns.append(last_good_name) + else: + # Если нет хорошего имени, пропускаем + continue else: - # Имя, полученное из exel + # Это хорошая колонка last_good_name = col_str new_columns.append(col_str) + # Применяем новые заголовки df_final.columns = new_columns - print(f"Окончательное количество столбцов: {len(df_final.columns)}") return df_final - def parse(self, file_path: str, params: dict) -> dict: + def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict: + """Парсинг ZIP архива со сводками ПМ""" import zipfile pm_dict = { "facts": {}, @@ -125,7 +179,7 @@ class SvodkaPMParser(ParserPort): } excel_fact_template = 'svodka_fact_pm_ID.xlsm' excel_plan_template = 'svodka_plan_pm_ID.xlsx' - with zipfile.ZipFile(file_path, 'r') as zip_ref: + with zipfile.ZipFile(zip_path, 'r') as zip_ref: file_list = zip_ref.namelist() for name, id in OG_IDS.items(): if id == 'BASH': @@ -155,9 +209,9 @@ class SvodkaPMParser(ParserPort): return pm_dict - def get_svodka_value(self, df_svodka, id, code, search_value=None): - ''' Служебная функция для простой выборке по сводке ''' - row_index = id + def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None): + ''' Служебная функция получения значения по коду и столбцу ''' + row_index = code mask_value = df_svodka.iloc[0] == code if search_value is None: @@ -254,22 +308,4 @@ class SvodkaPMParser(ParserPort): return total_result - def get_value(self, df, params): - og_id = params.get("id") - codes = params.get("codes") - columns = params.get("columns") - search = params.get("search") - mode = params.get("mode", "total") - if not isinstance(codes, list): - raise ValueError("Поле 'codes' должно быть списком") - if not isinstance(columns, list): - raise ValueError("Поле 'columns' должно быть списком") - data = None - if mode == "single": - if not og_id: - raise ValueError("Отсутствует идентификатор ОГ") - data = self.get_svodka_og(df, og_id, codes, columns, search) - elif mode == "total": - data = self.get_svodka_total(df, codes, columns, search) - json_result = data_to_json(data) - return json_result + # Убираем старый метод get_value, так как он теперь в базовом классе diff --git a/python_parser/app/main.py b/python_parser/app/main.py index 8f6e7e8..e645660 100644 --- a/python_parser/app/main.py +++ b/python_parser/app/main.py @@ -96,6 +96,54 @@ async def get_available_parsers(): return {"parsers": parsers} +@app.get("/parsers/{parser_name}/getters", tags=["Общее"], + summary="Информация о геттерах парсера", + description="Возвращает информацию о доступных геттерах для указанного парсера", + responses={ + 200: { + "content": { + "application/json": { + "example": { + "parser": "svodka_pm", + "getters": { + "single_og": { + "required_params": ["id", "codes", "columns"], + "optional_params": ["search"], + "description": "Получение данных по одному ОГ" + }, + "total_ogs": { + "required_params": ["codes", "columns"], + "optional_params": ["search"], + "description": "Получение данных по всем ОГ" + } + } + } + } + } + }, + 404: { + "description": "Парсер не найден" + } + }) +async def get_parser_getters(parser_name: str): + """Получение информации о геттерах парсера""" + if parser_name not in PARSERS: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail=f"Парсер '{parser_name}' не найден" + ) + + parser_class = PARSERS[parser_name] + parser_instance = parser_class() + + getters_info = parser_instance.get_available_getters() + + return { + "parser": parser_name, + "getters": getters_info + } + + @app.get("/server-info", tags=["Общее"], summary="Информация о сервере", response_model=ServerInfoResponse,) diff --git a/python_parser/core/ports.py b/python_parser/core/ports.py index 4519374..c3a5c67 100644 --- a/python_parser/core/ports.py +++ b/python_parser/core/ports.py @@ -2,28 +2,93 @@ Порты (интерфейсы) для hexagonal architecture """ from abc import ABC, abstractmethod -from typing import Optional +from typing import Optional, Dict, List, Any, Callable import pandas as pd class ParserPort(ABC): - """Интерфейс для парсеров""" + """Интерфейс для парсеров с поддержкой множественных геттеров""" + + def __init__(self): + """Инициализация с пустым словарем геттеров""" + self.getters: Dict[str, Dict[str, Any]] = {} + self._register_default_getters() + + def _register_default_getters(self): + """Регистрация геттеров по умолчанию - переопределяется в наследниках""" + pass + + def register_getter(self, name: str, method: Callable, required_params: List[str], + optional_params: List[str] = None, description: str = ""): + """ + Регистрация нового геттера + + Args: + name: Имя геттера + method: Метод для выполнения + required_params: Список обязательных параметров + optional_params: Список необязательных параметров + description: Описание геттера + """ + if optional_params is None: + optional_params = [] + + self.getters[name] = { + "method": method, + "required_params": required_params, + "optional_params": optional_params, + "description": description + } + + def get_available_getters(self) -> Dict[str, Dict[str, Any]]: + """Получение списка доступных геттеров с их описанием""" + return { + name: { + "required_params": info["required_params"], + "optional_params": info["optional_params"], + "description": info["description"] + } + for name, info in self.getters.items() + } + + # Добавить схему + def get_value(self, getter_name: str, params: Dict[str, Any]): + """ + Получение значения через указанный геттер + + Args: + getter_name: Имя геттера + params: Параметры для геттера + + Returns: + Результат выполнения геттера + + Raises: + ValueError: Если геттер не найден или параметры неверны + """ + if getter_name not in self.getters: + available = list(self.getters.keys()) + raise ValueError(f"Геттер '{getter_name}' не найден. Доступные: {available}") + + getter_info = self.getters[getter_name] + required = getter_info["required_params"] + + # Проверка обязательных параметров + missing = [p for p in required if p not in params] + if missing: + raise ValueError(f"Отсутствуют обязательные параметры для геттера '{getter_name}': {missing}") + + # Вызов метода геттера + try: + return getter_info["method"](params) + except Exception as e: + raise ValueError(f"Ошибка выполнения геттера '{getter_name}': {str(e)}") @abstractmethod def parse(self, file_path: str, params: dict) -> pd.DataFrame: """Парсинг файла и возврат DataFrame""" pass - @abstractmethod - def get_value(self, df: pd.DataFrame, params: dict): - """Получение значения из DataFrame по параметрам""" - pass - - # @abstractmethod - # def get_schema(self) -> dict: - # """Возвращает схему входных параметров для парсера""" - # pass - class StoragePort(ABC): """Интерфейс для хранилища данных""" diff --git a/python_parser/core/services.py b/python_parser/core/services.py index 0d3f791..16e7da0 100644 --- a/python_parser/core/services.py +++ b/python_parser/core/services.py @@ -99,9 +99,35 @@ class ReportService: # Получаем парсер parser = get_parser(request.report_type) + + # Устанавливаем DataFrame в парсер для использования в геттерах + parser.df = df - # Получаем значение - value = parser.get_value(df, request.get_params) + # Получаем параметры запроса + get_params = request.get_params or {} + + # Определяем имя геттера (по умолчанию используем первый доступный) + getter_name = get_params.pop("getter", None) + if not getter_name: + # Если геттер не указан, берем первый доступный + available_getters = list(parser.getters.keys()) + if available_getters: + getter_name = available_getters[0] + print(f"⚠️ Геттер не указан, используем первый доступный: {getter_name}") + else: + return DataResult( + success=False, + message="Парсер не имеет доступных геттеров" + ) + + # Получаем значение через указанный геттер + try: + value = parser.get_value(getter_name, get_params) + except ValueError as e: + return DataResult( + success=False, + message=f"Ошибка параметров: {str(e)}" + ) # Формируем результат if value is not None: diff --git a/python_parser/streamlit_app.py b/python_parser/streamlit_app.py index ec49848..a11c130 100644 --- a/python_parser/streamlit_app.py +++ b/python_parser/streamlit_app.py @@ -254,8 +254,8 @@ def main(): modes = st.multiselect( "Выберите режимы", - ["План", "Факт", "Норматив"], - default=["План", "Факт"], + ["plan", "fact", "normativ"], + default=["plan", "fact"], key="ca_modes" )