This commit is contained in:
2025-09-01 12:08:16 +03:00
parent 908fb330f4
commit ce228d9756
9 changed files with 622 additions and 257 deletions

90
.gitignore vendored Normal file
View File

@@ -0,0 +1,90 @@
data/
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# VS Code
.vscode/
# PyCharm
.idea/
# Local envs
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# MacOS
.DS_Store
# Windows
Thumbs.db
ehthumbs.db
Desktop.ini
# MinIO test data
minio_data/
minio_test/
minio/
# Logs
*.log
# Streamlit cache
.streamlit/

20
python_parser/Dockerfile_ Normal file
View File

@@ -0,0 +1,20 @@
FROM repo-dev.predix.rosneft.ru/python:3.11-slim
WORKDIR /app
# RUN pip install kafka-python==2.0.2
# RUN pip freeze > /app/requirements.txt
# ADD . /app
COPY requirements.txt .
RUN mkdir -p vendor
RUN pip download -r /app/requirements.txt --no-binary=:none: -d /app/vendor
# ADD . /app
# ENV KAFKA_BROKER=10.234.160.10:9093,10.234.160.10:9094,10.234.160.10:9095
# ENV KAFKA_UPDATE_ALGORITHM_RULES_TOPIC=algorithm-rule-update
# ENV KAFKA_CLIENT_USERNAME=cf-service
# CMD ["python", "/app/run_dev.py"]

View File

@@ -1,9 +1,9 @@
import pandas as pd
import re
from typing import Dict
import zipfile
from typing import Dict, Tuple
from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name
from adapters.pconfig import data_to_json
class MonitoringFuelParser(ParserPort):
@@ -11,6 +11,82 @@ class MonitoringFuelParser(ParserPort):
name = "Мониторинг топлива"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
self.register_getter(
name="total_by_columns",
method=self._get_total_by_columns,
required_params=["columns"],
optional_params=[],
description="Агрегация данных по колонкам"
)
self.register_getter(
name="month_by_code",
method=self._get_month_by_code,
required_params=["month"],
optional_params=[],
description="Получение данных за конкретный месяц"
)
def _get_total_by_columns(self, params: dict):
"""Агрегация по колонкам (обертка для совместимости)"""
columns = params["columns"]
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
# TODO: Переделать под новую архитектуру
df_means, _ = self.aggregate_by_columns(self.df, columns)
return df_means.to_dict(orient='index')
def _get_month_by_code(self, params: dict):
"""Получение данных за месяц (обертка для совместимости)"""
month = params["month"]
if not month:
raise ValueError("Отсутствует идентификатор месяца")
# TODO: Переделать под новую архитектуру
df_month = self.get_month(self.df, month)
return df_month.to_dict(orient='index')
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_monitoring_fuel_files(file_path, params)
return self.df
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Парсинг ZIP архива с файлами мониторинга топлива"""
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
mm = f"{month:02d}"
file_temp = f'monitoring_SNPZ_{mm}.xlsm'
candidates = [f for f in file_list if file_temp in f]
if len(candidates) == 1:
file = candidates[0]
print(f'Загрузка {file}')
with zip_ref.open(file) as excel_file:
try:
df = self.parse_single(excel_file, 'Мониторинг потребления')
df_monitorings[mm] = df
print(f"✅ Данные за месяц {mm} загружены")
except Exception as e:
print(f"Ошибка при загрузке файла {file_temp}: {e}")
else:
print(f"⚠️ Файл не найден: {file_temp}")
return df_monitorings
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
@@ -64,46 +140,15 @@ class MonitoringFuelParser(ParserPort):
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
pass # Placeholder for new_code
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
mm = f"{month:02d}"
file_temp = f'monitoring_SNPZ_{mm}.xlsm'
candidates = [f for f in file_list if file_temp in f]
if len(candidates) == 1:
file = candidates[0]
print(f'Загрузка {file}')
with zip_ref.open(file) as excel_file:
try:
df = self.parse_single(excel_file, 'Мониторинг потребления')
df_monitorings[mm] = df
print(f"✅ Данные за месяц {mm} загружены")
except Exception as e:
print(f"Ошибка при загрузке файла {file_temp}: {e}")
else:
print(f"⚠️ Файл не найден: {file_temp}")
return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних
@@ -185,22 +230,3 @@ class MonitoringFuelParser(ParserPort):
total.name = 'mean'
return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
json_result = data_to_json(data)
return json_result

View File

@@ -6,85 +6,44 @@ from adapters.pconfig import get_og_by_name
class SvodkaCAParser(ParserPort):
"""Парсер для сводки СА"""
"""Парсер для сводок СА"""
name = "Сводка СА"
name = "Сводки СА"
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлекает все таблицы из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
self.register_getter(
name="get_data",
method=self._get_data_wrapper,
required_params=["modes", "tables"],
optional_params=[],
description="Получение данных по режимам и таблицам"
)
non_empty_rows = ~(df_clean.eq('').all(axis=1))
non_empty_cols = ~(df_clean.eq('').all(axis=0))
def _get_data_wrapper(self, params: dict):
"""Обертка для получения данных (для совместимости)"""
modes = params["modes"]
tables = params["tables"]
row_indices = non_empty_rows[non_empty_rows].index.tolist()
col_indices = non_empty_cols[non_empty_cols].index.tolist()
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
if not row_indices or not col_indices:
return []
# TODO: Переделать под новую архитектуру
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(self.df, mode, tables)
return self.data_dict_to_json(data_dict)
row_blocks = self._get_contiguous_blocks(row_indices)
col_blocks = self._get_contiguous_blocks(col_indices)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_ca(file_path, params)
return self.df
tables = []
for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path, sheet_name, inclusion_list):
"""Собственно функция парсинга отчета СА"""
def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
"""Парсинг сводки СА"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
@@ -190,76 +149,185 @@ class SvodkaCAParser(ParserPort):
else:
return None
def parse(self, file_path: str, params: dict) -> dict:
"""Парсинг файла сводки СА"""
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
# Выгружаем План в df_ca_plan
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлекает все таблицы из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---")
non_empty_rows = ~(df_clean.eq('').all(axis=1))
non_empty_cols = ~(df_clean.eq('').all(axis=0))
# Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
row_indices = non_empty_rows[non_empty_rows].index.tolist()
col_indices = non_empty_cols[non_empty_cols].index.tolist()
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---")
if not row_indices or not col_indices:
return []
# Выгружаем План в df_ca_normativ
inclusion_list_normativ = {
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
row_blocks = self._get_contiguous_blocks(row_indices)
col_blocks = self._get_contiguous_blocks(col_indices)
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
tables = []
for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---")
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
df_dict = {
"plan": df_ca_plan,
"fact": df_ca_fact,
"normativ": df_ca_normativ
}
return df_dict
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame:
"""Парсинг листа Excel"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
filtered_tables = []
for table in tables:
if table.empty:
continue
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
if any(val in inclusion_list for val in first_row_values):
filtered_tables.append(table)
tables = filtered_tables
# === Итоговый список таблиц датафреймов ===
result_list = []
for table in tables:
if table.empty:
continue
# Получаем первую строку (до удаления)
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
# Находим, какой элемент из inclusion_list присутствует
matched_key = None
for val in first_row_values:
if val in inclusion_list:
matched_key = val
break # берём первый совпадающий заголовок
if matched_key is None:
continue # на всякий случай (хотя уже отфильтровано)
# Удаляем первую строку (заголовок) и сбрасываем индекс
df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
# Пропускаем, если таблица пустая
if df_cleaned.empty:
continue
# Первая строка становится заголовком
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
# Преобразуем заголовок: только первый столбец может быть заменён на "name"
cleaned_header = []
# Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
if first_item_str == "" or first_item_str == "nan":
cleaned_header.append("name")
else:
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def data_dict_to_json(self, data_dict):
''' Служебная функция для парсинга словаря в json. '''
@@ -308,17 +376,3 @@ class SvodkaCAParser(ParserPort):
filtered_df = df[df['table'].isin(table_values)].copy()
result_dict = {key: group for key, group in filtered_df.groupby('table')}
return result_dict
def get_value(self, df: pd.DataFrame, params: dict):
modes = params.get("modes")
tables = params.get("tables")
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
# Собираем данные
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(df, mode, tables)
return self.data_dict_to_json(data_dict)

View File

@@ -9,6 +9,60 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
self.register_getter(
name="single_og",
method=self._get_single_og,
required_params=["id", "codes", "columns"],
optional_params=["search"],
description="Получение данных по одному ОГ"
)
self.register_getter(
name="total_ogs",
method=self._get_total_ogs,
required_params=["codes", "columns"],
optional_params=["search"],
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ (обертка для совместимости)"""
og_id = params["id"]
codes = params["codes"]
columns = params["columns"]
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ (обертка для совместимости)"""
codes = params["codes"]
columns = params["columns"]
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
@@ -99,25 +153,25 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
# Проверяем, начинается ли на "Итого"
if col_str.startswith('Итого'):
current_name = 'Итого'
last_good_name = current_name # обновляем last_good_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Имя, полученное из exel
# Если нет хорошего имени, пропускаем
continue
else:
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Применяем новые заголовки
df_final.columns = new_columns
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
return df_final
def parse(self, file_path: str, params: dict) -> dict:
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
@@ -125,7 +179,7 @@ class SvodkaPMParser(ParserPort):
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(file_path, 'r') as zip_ref:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
@@ -155,9 +209,9 @@ class SvodkaPMParser(ParserPort):
return pm_dict
def get_svodka_value(self, df_svodka, id, code, search_value=None):
''' Служебная функция для простой выборке по сводке '''
row_index = id
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
@@ -254,22 +308,4 @@ class SvodkaPMParser(ParserPort):
return total_result
def get_value(self, df, params):
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
mode = params.get("mode", "total")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = None
if mode == "single":
if not og_id:
raise ValueError("Отсутствует идентификатор ОГ")
data = self.get_svodka_og(df, og_id, codes, columns, search)
elif mode == "total":
data = self.get_svodka_total(df, codes, columns, search)
json_result = data_to_json(data)
return json_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -96,6 +96,54 @@ async def get_available_parsers():
return {"parsers": parsers}
@app.get("/parsers/{parser_name}/getters", tags=["Общее"],
summary="Информация о геттерах парсера",
description="Возвращает информацию о доступных геттерах для указанного парсера",
responses={
200: {
"content": {
"application/json": {
"example": {
"parser": "svodka_pm",
"getters": {
"single_og": {
"required_params": ["id", "codes", "columns"],
"optional_params": ["search"],
"description": "Получение данных по одному ОГ"
},
"total_ogs": {
"required_params": ["codes", "columns"],
"optional_params": ["search"],
"description": "Получение данных по всем ОГ"
}
}
}
}
}
},
404: {
"description": "Парсер не найден"
}
})
async def get_parser_getters(parser_name: str):
"""Получение информации о геттерах парсера"""
if parser_name not in PARSERS:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Парсер '{parser_name}' не найден"
)
parser_class = PARSERS[parser_name]
parser_instance = parser_class()
getters_info = parser_instance.get_available_getters()
return {
"parser": parser_name,
"getters": getters_info
}
@app.get("/server-info", tags=["Общее"],
summary="Информация о сервере",
response_model=ServerInfoResponse,)

View File

@@ -2,28 +2,93 @@
Порты (интерфейсы) для hexagonal architecture
"""
from abc import ABC, abstractmethod
from typing import Optional
from typing import Optional, Dict, List, Any, Callable
import pandas as pd
class ParserPort(ABC):
"""Интерфейс для парсеров"""
"""Интерфейс для парсеров с поддержкой множественных геттеров"""
def __init__(self):
"""Инициализация с пустым словарем геттеров"""
self.getters: Dict[str, Dict[str, Any]] = {}
self._register_default_getters()
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию - переопределяется в наследниках"""
pass
def register_getter(self, name: str, method: Callable, required_params: List[str],
optional_params: List[str] = None, description: str = ""):
"""
Регистрация нового геттера
Args:
name: Имя геттера
method: Метод для выполнения
required_params: Список обязательных параметров
optional_params: Список необязательных параметров
description: Описание геттера
"""
if optional_params is None:
optional_params = []
self.getters[name] = {
"method": method,
"required_params": required_params,
"optional_params": optional_params,
"description": description
}
def get_available_getters(self) -> Dict[str, Dict[str, Any]]:
"""Получение списка доступных геттеров с их описанием"""
return {
name: {
"required_params": info["required_params"],
"optional_params": info["optional_params"],
"description": info["description"]
}
for name, info in self.getters.items()
}
# Добавить схему
def get_value(self, getter_name: str, params: Dict[str, Any]):
"""
Получение значения через указанный геттер
Args:
getter_name: Имя геттера
params: Параметры для геттера
Returns:
Результат выполнения геттера
Raises:
ValueError: Если геттер не найден или параметры неверны
"""
if getter_name not in self.getters:
available = list(self.getters.keys())
raise ValueError(f"Геттер '{getter_name}' не найден. Доступные: {available}")
getter_info = self.getters[getter_name]
required = getter_info["required_params"]
# Проверка обязательных параметров
missing = [p for p in required if p not in params]
if missing:
raise ValueError(f"Отсутствуют обязательные параметры для геттера '{getter_name}': {missing}")
# Вызов метода геттера
try:
return getter_info["method"](params)
except Exception as e:
raise ValueError(f"Ошибка выполнения геттера '{getter_name}': {str(e)}")
@abstractmethod
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
pass
@abstractmethod
def get_value(self, df: pd.DataFrame, params: dict):
"""Получение значения из DataFrame по параметрам"""
pass
# @abstractmethod
# def get_schema(self) -> dict:
# """Возвращает схему входных параметров для парсера"""
# pass
class StoragePort(ABC):
"""Интерфейс для хранилища данных"""

View File

@@ -100,8 +100,34 @@ class ReportService:
# Получаем парсер
parser = get_parser(request.report_type)
# Получаем значение
value = parser.get_value(df, request.get_params)
# Устанавливаем DataFrame в парсер для использования в геттерах
parser.df = df
# Получаем параметры запроса
get_params = request.get_params or {}
# Определяем имя геттера (по умолчанию используем первый доступный)
getter_name = get_params.pop("getter", None)
if not getter_name:
# Если геттер не указан, берем первый доступный
available_getters = list(parser.getters.keys())
if available_getters:
getter_name = available_getters[0]
print(f"⚠️ Геттер не указан, используем первый доступный: {getter_name}")
else:
return DataResult(
success=False,
message="Парсер не имеет доступных геттеров"
)
# Получаем значение через указанный геттер
try:
value = parser.get_value(getter_name, get_params)
except ValueError as e:
return DataResult(
success=False,
message=f"Ошибка параметров: {str(e)}"
)
# Формируем результат
if value is not None:

View File

@@ -254,8 +254,8 @@ def main():
modes = st.multiselect(
"Выберите режимы",
["План", "Факт", "Норматив"],
default=["План", "Факт"],
["plan", "fact", "normativ"],
default=["plan", "fact"],
key="ca_modes"
)