Compare commits
2 Commits
fc0b4356da
...
79ab91c700
| Author | SHA1 | Date | |
|---|---|---|---|
| 79ab91c700 | |||
| b98be22359 |
18
.gitignore
vendored
18
.gitignore
vendored
@@ -1,5 +1,21 @@
|
|||||||
# Python
|
# Python
|
||||||
|
__pycache__
|
||||||
__pycache__/
|
__pycache__/
|
||||||
|
python_parser/__pycache__/
|
||||||
|
python_parser/core/__pycache__/
|
||||||
|
python_parser/adapters/__pycache__/
|
||||||
|
python_parser/tests/__pycache__/
|
||||||
|
python_parser/tests/test_core/__pycache__/
|
||||||
|
python_parser/tests/test_adapters/__pycache__/
|
||||||
|
python_parser/tests/test_app/__pycache__/
|
||||||
|
python_parser/app/__pycache__/
|
||||||
|
python_parser/app/schemas/__pycache__/
|
||||||
|
python_parser/app/schemas/test_schemas/__pycache__/
|
||||||
|
python_parser/app/schemas/test_schemas/test_core/__pycache__/
|
||||||
|
python_parser/app/schemas/test_schemas/test_adapters/__pycache__/
|
||||||
|
python_parser/app/schemas/test_schemas/test_app/__pycache__/
|
||||||
|
|
||||||
|
|
||||||
*.py[cod]
|
*.py[cod]
|
||||||
*$py.class
|
*$py.class
|
||||||
*.so
|
*.so
|
||||||
@@ -153,3 +169,5 @@ node_modules/
|
|||||||
npm-debug.log*
|
npm-debug.log*
|
||||||
yarn-debug.log*
|
yarn-debug.log*
|
||||||
yarn-error.log*
|
yarn-error.log*
|
||||||
|
|
||||||
|
__pycache__/
|
||||||
|
|||||||
135
python_parser/SCHEMA_INTEGRATION.md
Normal file
135
python_parser/SCHEMA_INTEGRATION.md
Normal file
@@ -0,0 +1,135 @@
|
|||||||
|
# Интеграция схем Pydantic с парсерами
|
||||||
|
|
||||||
|
## Обзор
|
||||||
|
|
||||||
|
Этот документ описывает решение для устранения дублирования логики между схемами Pydantic и парсерами. Теперь схемы Pydantic являются единым источником правды для определения параметров парсеров.
|
||||||
|
|
||||||
|
## Проблема
|
||||||
|
|
||||||
|
Ранее в парсерах дублировалась информация о параметрах:
|
||||||
|
|
||||||
|
```python
|
||||||
|
# В парсере
|
||||||
|
self.register_getter(
|
||||||
|
name="single_og",
|
||||||
|
method=self._get_single_og,
|
||||||
|
required_params=["id", "codes", "columns"], # Дублирование
|
||||||
|
optional_params=["search"], # Дублирование
|
||||||
|
description="Получение данных по одному ОГ"
|
||||||
|
)
|
||||||
|
|
||||||
|
# В схеме
|
||||||
|
class SvodkaPMSingleOGRequest(BaseModel):
|
||||||
|
id: OGID = Field(...) # Обязательное поле
|
||||||
|
codes: List[int] = Field(...) # Обязательное поле
|
||||||
|
columns: List[str] = Field(...) # Обязательное поле
|
||||||
|
search: Optional[str] = Field(None) # Необязательное поле
|
||||||
|
```
|
||||||
|
|
||||||
|
## Решение
|
||||||
|
|
||||||
|
### 1. Утилиты для работы со схемами
|
||||||
|
|
||||||
|
Создан модуль `core/schema_utils.py` с функциями:
|
||||||
|
|
||||||
|
- `get_required_fields_from_schema()` - извлекает обязательные поля
|
||||||
|
- `get_optional_fields_from_schema()` - извлекает необязательные поля
|
||||||
|
- `register_getter_from_schema()` - регистрирует геттер с использованием схемы
|
||||||
|
- `validate_params_with_schema()` - валидирует параметры с помощью схемы
|
||||||
|
|
||||||
|
### 2. Обновленные парсеры
|
||||||
|
|
||||||
|
Теперь парсеры используют схемы как единый источник правды:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _register_default_getters(self):
|
||||||
|
"""Регистрация геттеров по умолчанию"""
|
||||||
|
# Используем схемы Pydantic как единый источник правды
|
||||||
|
register_getter_from_schema(
|
||||||
|
parser_instance=self,
|
||||||
|
getter_name="single_og",
|
||||||
|
method=self._get_single_og,
|
||||||
|
schema_class=SvodkaPMSingleOGRequest,
|
||||||
|
description="Получение данных по одному ОГ"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### 3. Валидация параметров
|
||||||
|
|
||||||
|
Методы геттеров теперь автоматически валидируют параметры:
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _get_single_og(self, params: dict):
|
||||||
|
"""Получение данных по одному ОГ"""
|
||||||
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
|
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
|
||||||
|
|
||||||
|
og_id = validated_params["id"]
|
||||||
|
codes = validated_params["codes"]
|
||||||
|
columns = validated_params["columns"]
|
||||||
|
search = validated_params.get("search")
|
||||||
|
|
||||||
|
# ... остальная логика
|
||||||
|
```
|
||||||
|
|
||||||
|
## Преимущества
|
||||||
|
|
||||||
|
1. **Единый источник правды** - информация о параметрах хранится только в схемах Pydantic
|
||||||
|
2. **Автоматическая валидация** - параметры автоматически валидируются с помощью Pydantic
|
||||||
|
3. **Синхронизация** - изменения в схемах автоматически отражаются в парсерах
|
||||||
|
4. **Типобезопасность** - использование типов Pydantic обеспечивает типобезопасность
|
||||||
|
5. **Документация** - Swagger документация автоматически генерируется из схем
|
||||||
|
|
||||||
|
## Совместимость
|
||||||
|
|
||||||
|
Решение работает с:
|
||||||
|
- Pydantic v1 (через `__fields__`)
|
||||||
|
- Pydantic v2 (через `model_fields` и `is_required()`)
|
||||||
|
|
||||||
|
## Использование
|
||||||
|
|
||||||
|
### Для новых парсеров
|
||||||
|
|
||||||
|
1. Создайте схему Pydantic с нужными полями
|
||||||
|
2. Используйте `register_getter_from_schema()` для регистрации геттера
|
||||||
|
3. Используйте `validate_params_with_schema()` в методах геттеров
|
||||||
|
|
||||||
|
### Для существующих парсеров
|
||||||
|
|
||||||
|
1. Убедитесь, что у вас есть соответствующая схема Pydantic
|
||||||
|
2. Замените ручную регистрацию геттеров на `register_getter_from_schema()`
|
||||||
|
3. Добавьте валидацию параметров в методы геттеров
|
||||||
|
|
||||||
|
## Примеры
|
||||||
|
|
||||||
|
### Схема с обязательными и необязательными полями
|
||||||
|
|
||||||
|
```python
|
||||||
|
class ExampleRequest(BaseModel):
|
||||||
|
required_field: str = Field(..., description="Обязательное поле")
|
||||||
|
optional_field: Optional[str] = Field(None, description="Необязательное поле")
|
||||||
|
```
|
||||||
|
|
||||||
|
### Регистрация геттера
|
||||||
|
|
||||||
|
```python
|
||||||
|
register_getter_from_schema(
|
||||||
|
parser_instance=self,
|
||||||
|
getter_name="example_getter",
|
||||||
|
method=self._example_method,
|
||||||
|
schema_class=ExampleRequest,
|
||||||
|
description="Пример геттера"
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Валидация в методе
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _example_method(self, params: dict):
|
||||||
|
validated_params = validate_params_with_schema(params, ExampleRequest)
|
||||||
|
# validated_params содержит валидированные данные
|
||||||
|
```
|
||||||
|
|
||||||
|
## Заключение
|
||||||
|
|
||||||
|
Это решение устраняет дублирование кода и обеспечивает единообразие между API схемами и парсерами. Теперь изменения в схемах автоматически отражаются в парсерах, что упрощает поддержку и развитие системы.
|
||||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,11 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
import re
|
import re
|
||||||
from typing import Dict
|
import zipfile
|
||||||
|
from typing import Dict, Tuple
|
||||||
from core.ports import ParserPort
|
from core.ports import ParserPort
|
||||||
from adapters.pconfig import data_to_json, get_object_by_name
|
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||||||
|
from app.schemas.monitoring_fuel import MonitoringFuelTotalRequest, MonitoringFuelMonthRequest
|
||||||
|
from adapters.pconfig import data_to_json
|
||||||
|
|
||||||
|
|
||||||
class MonitoringFuelParser(ParserPort):
|
class MonitoringFuelParser(ParserPort):
|
||||||
@@ -11,71 +13,58 @@ class MonitoringFuelParser(ParserPort):
|
|||||||
|
|
||||||
name = "Мониторинг топлива"
|
name = "Мониторинг топлива"
|
||||||
|
|
||||||
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
|
def _register_default_getters(self):
|
||||||
"""Определение индекса заголовка в Excel по ключевому слову"""
|
"""Регистрация геттеров по умолчанию"""
|
||||||
# Читаем первые max_rows строк без заголовков
|
# Используем схемы Pydantic как единый источник правды
|
||||||
df_temp = pd.read_excel(
|
register_getter_from_schema(
|
||||||
file_path,
|
parser_instance=self,
|
||||||
sheet_name=sheet,
|
getter_name="total_by_columns",
|
||||||
header=None,
|
method=self._get_total_by_columns,
|
||||||
nrows=max_rows
|
schema_class=MonitoringFuelTotalRequest,
|
||||||
|
description="Агрегация данных по колонкам"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
register_getter_from_schema(
|
||||||
for idx, row in df_temp.iterrows():
|
parser_instance=self,
|
||||||
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
|
getter_name="month_by_code",
|
||||||
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
|
method=self._get_month_by_code,
|
||||||
return idx + 1 # возвращаем индекс строки (0-based)
|
schema_class=MonitoringFuelMonthRequest,
|
||||||
|
description="Получение данных за конкретный месяц"
|
||||||
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
|
|
||||||
|
|
||||||
def parse_single(self, file, sheet, header_num=None):
|
|
||||||
''' Собственно парсер отчетов одного объекта'''
|
|
||||||
# Автоопределение header_num, если не передан
|
|
||||||
if header_num is None:
|
|
||||||
header_num = self.find_header_row(file, sheet, search_value="Установка")
|
|
||||||
# Читаем весь лист, начиная с найденной строки как заголовок
|
|
||||||
df_full = pd.read_excel(
|
|
||||||
file,
|
|
||||||
sheet_name=sheet,
|
|
||||||
header=header_num,
|
|
||||||
usecols=None,
|
|
||||||
index_col=None
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# === Удаление полностью пустых столбцов ===
|
def _get_total_by_columns(self, params: dict):
|
||||||
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
|
"""Агрегация данных по колонкам"""
|
||||||
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
|
validated_params = validate_params_with_schema(params, MonitoringFuelTotalRequest)
|
||||||
|
|
||||||
# === Переименовываем нужные столбцы по позициям ===
|
columns = validated_params["columns"]
|
||||||
if len(df_full.columns) < 2:
|
|
||||||
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
|
|
||||||
|
|
||||||
new_columns = df_full.columns.tolist()
|
# TODO: Переделать под новую архитектуру
|
||||||
|
df_means, _ = self.aggregate_by_columns(self.df, columns)
|
||||||
|
return df_means.to_dict(orient='index')
|
||||||
|
|
||||||
new_columns[0] = 'name'
|
def _get_month_by_code(self, params: dict):
|
||||||
new_columns[1] = 'normativ'
|
"""Получение данных за конкретный месяц"""
|
||||||
new_columns[-2] = 'total'
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
new_columns[-1] = 'total_1'
|
validated_params = validate_params_with_schema(params, MonitoringFuelMonthRequest)
|
||||||
|
|
||||||
df_full.columns = new_columns
|
month = validated_params["month"]
|
||||||
|
|
||||||
# Проверяем, что колонка 'name' существует
|
# TODO: Переделать под новую архитектуру
|
||||||
if 'name' in df_full.columns:
|
df_month = self.get_month(self.df, month)
|
||||||
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
|
return df_month.to_dict(orient='index')
|
||||||
df_full['id'] = df_full['name'].apply(get_object_by_name)
|
|
||||||
|
|
||||||
# Устанавливаем id как индекс
|
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||||||
df_full.set_index('id', inplace=True)
|
"""Парсинг файла и возврат DataFrame"""
|
||||||
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
|
# Сохраняем DataFrame для использования в геттерах
|
||||||
return df_full
|
self.df = self.parse_monitoring_fuel_files(file_path, params)
|
||||||
|
return self.df
|
||||||
|
|
||||||
def parse(self, file_path: str, params: dict) -> dict:
|
def parse_monitoring_fuel_files(self, zip_path: str, params: dict) -> Dict[str, pd.DataFrame]:
|
||||||
import zipfile
|
"""Парсинг ZIP архива с файлами мониторинга топлива"""
|
||||||
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
|
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
|
||||||
|
|
||||||
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||||
|
|
||||||
file_list = zip_ref.namelist()
|
file_list = zip_ref.namelist()
|
||||||
for month in range(1, 13):
|
for month in range(1, 13):
|
||||||
@@ -103,7 +92,70 @@ class MonitoringFuelParser(ParserPort):
|
|||||||
|
|
||||||
return df_monitorings
|
return df_monitorings
|
||||||
|
|
||||||
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
|
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
|
||||||
|
"""Определение индекса заголовка в Excel по ключевому слову"""
|
||||||
|
# Читаем первые max_rows строк без заголовков
|
||||||
|
df_temp = pd.read_excel(
|
||||||
|
file_path,
|
||||||
|
sheet_name=sheet,
|
||||||
|
header=None,
|
||||||
|
nrows=max_rows,
|
||||||
|
engine='openpyxl'
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
||||||
|
for idx, row in df_temp.iterrows():
|
||||||
|
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
|
||||||
|
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
|
||||||
|
return idx + 1 # возвращаем индекс строки (0-based)
|
||||||
|
|
||||||
|
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
|
||||||
|
|
||||||
|
def parse_single(self, file, sheet, header_num=None):
|
||||||
|
''' Собственно парсер отчетов одного объекта'''
|
||||||
|
# Автоопределение header_num, если не передан
|
||||||
|
if header_num is None:
|
||||||
|
header_num = self.find_header_row(file, sheet, search_value="Установка")
|
||||||
|
# Читаем весь лист, начиная с найденной строки как заголовок
|
||||||
|
df_full = pd.read_excel(
|
||||||
|
file,
|
||||||
|
sheet_name=sheet,
|
||||||
|
header=header_num,
|
||||||
|
usecols=None,
|
||||||
|
index_col=None,
|
||||||
|
engine='openpyxl'
|
||||||
|
)
|
||||||
|
|
||||||
|
# === Удаление полностью пустых столбцов ===
|
||||||
|
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
|
||||||
|
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
|
||||||
|
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
|
||||||
|
|
||||||
|
# === Переименовываем нужные столбцы по позициям ===
|
||||||
|
if len(df_full.columns) < 2:
|
||||||
|
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
|
||||||
|
|
||||||
|
new_columns = df_full.columns.tolist()
|
||||||
|
|
||||||
|
new_columns[0] = 'name'
|
||||||
|
new_columns[1] = 'normativ'
|
||||||
|
new_columns[-2] = 'total'
|
||||||
|
new_columns[-1] = 'total_1'
|
||||||
|
|
||||||
|
df_full.columns = new_columns
|
||||||
|
|
||||||
|
# Проверяем, что колонка 'name' существует
|
||||||
|
if 'name' in df_full.columns:
|
||||||
|
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
|
||||||
|
# df_full['id'] = df_full['name'].apply(get_object_by_name) # This line was removed as per new_code
|
||||||
|
pass # Placeholder for new_code
|
||||||
|
|
||||||
|
# Устанавливаем id как индекс
|
||||||
|
df_full.set_index('id', inplace=True)
|
||||||
|
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
|
||||||
|
return df_full
|
||||||
|
|
||||||
|
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns: list) -> Tuple[pd.DataFrame, Dict[str, pd.DataFrame]]:
|
||||||
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
|
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
|
||||||
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
|
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
|
||||||
means = {} # Для хранения средних
|
means = {} # Для хранения средних
|
||||||
@@ -185,22 +237,3 @@ class MonitoringFuelParser(ParserPort):
|
|||||||
total.name = 'mean'
|
total.name = 'mean'
|
||||||
|
|
||||||
return total, df_combined
|
return total, df_combined
|
||||||
|
|
||||||
def get_value(self, df, params):
|
|
||||||
mode = params.get("mode", "total")
|
|
||||||
columns = params.get("columns", None)
|
|
||||||
month = params.get("month", None)
|
|
||||||
data = None
|
|
||||||
if mode == "total":
|
|
||||||
if not columns:
|
|
||||||
raise ValueError("Отсутствуют идентификаторы столбцов")
|
|
||||||
df_means, _ = self.aggregate_by_columns(df, columns)
|
|
||||||
data = df_means.to_dict(orient='index')
|
|
||||||
elif mode == "month":
|
|
||||||
if not month:
|
|
||||||
raise ValueError("Отсутствуют идентификатор месяца")
|
|
||||||
df_month = self.get_month(df, month)
|
|
||||||
data = df_month.to_dict(orient='index')
|
|
||||||
|
|
||||||
json_result = data_to_json(data)
|
|
||||||
return json_result
|
|
||||||
|
|||||||
@@ -2,89 +2,53 @@ import pandas as pd
|
|||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from core.ports import ParserPort
|
from core.ports import ParserPort
|
||||||
|
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||||||
|
from app.schemas.svodka_ca import SvodkaCARequest
|
||||||
from adapters.pconfig import get_og_by_name
|
from adapters.pconfig import get_og_by_name
|
||||||
|
|
||||||
|
|
||||||
class SvodkaCAParser(ParserPort):
|
class SvodkaCAParser(ParserPort):
|
||||||
"""Парсер для сводки СА"""
|
"""Парсер для сводок СА"""
|
||||||
|
|
||||||
name = "Сводка СА"
|
name = "Сводки СА"
|
||||||
|
|
||||||
def extract_all_tables(self, file_path, sheet_name=0):
|
def _register_default_getters(self):
|
||||||
"""Извлекает все таблицы из Excel файла"""
|
"""Регистрация геттеров по умолчанию"""
|
||||||
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
|
# Используем схемы Pydantic как единый источник правды
|
||||||
df_filled = df.fillna('')
|
register_getter_from_schema(
|
||||||
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
|
parser_instance=self,
|
||||||
|
getter_name="get_data",
|
||||||
|
method=self._get_data_wrapper,
|
||||||
|
schema_class=SvodkaCARequest,
|
||||||
|
description="Получение данных по режимам и таблицам"
|
||||||
|
)
|
||||||
|
|
||||||
non_empty_rows = ~(df_clean.eq('').all(axis=1))
|
def _get_data_wrapper(self, params: dict):
|
||||||
non_empty_cols = ~(df_clean.eq('').all(axis=0))
|
"""Получение данных по режимам и таблицам"""
|
||||||
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
|
validated_params = validate_params_with_schema(params, SvodkaCARequest)
|
||||||
|
|
||||||
row_indices = non_empty_rows[non_empty_rows].index.tolist()
|
modes = validated_params["modes"]
|
||||||
col_indices = non_empty_cols[non_empty_cols].index.tolist()
|
tables = validated_params["tables"]
|
||||||
|
|
||||||
if not row_indices or not col_indices:
|
# TODO: Переделать под новую архитектуру
|
||||||
return []
|
data_dict = {}
|
||||||
|
for mode in modes:
|
||||||
|
data_dict[mode] = self.get_data(self.df, mode, tables)
|
||||||
|
return self.data_dict_to_json(data_dict)
|
||||||
|
|
||||||
row_blocks = self._get_contiguous_blocks(row_indices)
|
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||||||
col_blocks = self._get_contiguous_blocks(col_indices)
|
"""Парсинг файла и возврат DataFrame"""
|
||||||
|
# Сохраняем DataFrame для использования в геттерах
|
||||||
|
self.df = self.parse_svodka_ca(file_path, params)
|
||||||
|
return self.df
|
||||||
|
|
||||||
tables = []
|
def parse_svodka_ca(self, file_path: str, params: dict) -> dict:
|
||||||
for r_start, r_end in row_blocks:
|
"""Парсинг сводки СА"""
|
||||||
for c_start, c_end in col_blocks:
|
# Получаем параметры из params
|
||||||
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
|
sheet_name = params.get('sheet_name', 0) # По умолчанию первый лист
|
||||||
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
|
inclusion_list = params.get('inclusion_list', {'ТиП', 'Топливо', 'Потери'})
|
||||||
continue
|
|
||||||
|
|
||||||
if self._is_header_row(block.iloc[0]):
|
|
||||||
block.columns = block.iloc[0]
|
|
||||||
block = block.iloc[1:].reset_index(drop=True)
|
|
||||||
else:
|
|
||||||
block = block.reset_index(drop=True)
|
|
||||||
block.columns = [f"col_{i}" for i in range(block.shape[1])]
|
|
||||||
|
|
||||||
tables.append(block)
|
|
||||||
|
|
||||||
return tables
|
|
||||||
|
|
||||||
def _get_contiguous_blocks(self, indices):
|
|
||||||
"""Группирует индексы в непрерывные блоки"""
|
|
||||||
if not indices:
|
|
||||||
return []
|
|
||||||
blocks = []
|
|
||||||
start = indices[0]
|
|
||||||
for i in range(1, len(indices)):
|
|
||||||
if indices[i] != indices[i-1] + 1:
|
|
||||||
blocks.append((start, indices[i-1]))
|
|
||||||
start = indices[i]
|
|
||||||
blocks.append((start, indices[-1]))
|
|
||||||
return blocks
|
|
||||||
|
|
||||||
def _is_header_row(self, series):
|
|
||||||
"""Определяет, похожа ли строка на заголовок"""
|
|
||||||
series_str = series.astype(str).str.strip()
|
|
||||||
non_empty = series_str[series_str != '']
|
|
||||||
if len(non_empty) == 0:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def is_not_numeric(val):
|
|
||||||
try:
|
|
||||||
float(val.replace(',', '.'))
|
|
||||||
return False
|
|
||||||
except (ValueError, TypeError):
|
|
||||||
return True
|
|
||||||
|
|
||||||
not_numeric_count = non_empty.apply(is_not_numeric).sum()
|
|
||||||
return not_numeric_count / len(non_empty) > 0.6
|
|
||||||
|
|
||||||
def _get_og_by_name(self, name):
|
|
||||||
"""Функция для получения ID по имени (упрощенная версия)"""
|
|
||||||
# Упрощенная версия - возвращаем имя как есть
|
|
||||||
if not name or not isinstance(name, str):
|
|
||||||
return None
|
|
||||||
return name.strip()
|
|
||||||
|
|
||||||
def parse_sheet(self, file_path, sheet_name, inclusion_list):
|
|
||||||
"""Собственно функция парсинга отчета СА"""
|
|
||||||
# === Извлечение и фильтрация ===
|
# === Извлечение и фильтрация ===
|
||||||
tables = self.extract_all_tables(file_path, sheet_name)
|
tables = self.extract_all_tables(file_path, sheet_name)
|
||||||
|
|
||||||
@@ -190,76 +154,185 @@ class SvodkaCAParser(ParserPort):
|
|||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def parse(self, file_path: str, params: dict) -> dict:
|
def extract_all_tables(self, file_path, sheet_name=0):
|
||||||
"""Парсинг файла сводки СА"""
|
"""Извлечение всех таблиц из Excel файла"""
|
||||||
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
|
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None, engine='openpyxl')
|
||||||
# Выгружаем План в df_ca_plan
|
df_filled = df.fillna('')
|
||||||
inclusion_list_plan = {
|
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
|
||||||
"ТиП, %",
|
|
||||||
"Топливо итого, тонн",
|
|
||||||
"Топливо итого, %",
|
|
||||||
"Топливо на технологию, тонн",
|
|
||||||
"Топливо на технологию, %",
|
|
||||||
"Топливо на энергетику, тонн",
|
|
||||||
"Топливо на энергетику, %",
|
|
||||||
"Потери итого, тонн",
|
|
||||||
"Потери итого, %",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, %**",
|
|
||||||
"в т.ч. Неидентифицированные потери, тонн**",
|
|
||||||
"в т.ч. Неидентифицированные потери, %**"
|
|
||||||
}
|
|
||||||
|
|
||||||
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА
|
non_empty_rows = ~(df_clean.eq('').all(axis=1))
|
||||||
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---")
|
non_empty_cols = ~(df_clean.eq('').all(axis=0))
|
||||||
|
|
||||||
# Выгружаем Факт
|
row_indices = non_empty_rows[non_empty_rows].index.tolist()
|
||||||
inclusion_list_fact = {
|
col_indices = non_empty_cols[non_empty_cols].index.tolist()
|
||||||
"ТиП, %",
|
|
||||||
"Топливо итого, тонн",
|
|
||||||
"Топливо итого, %",
|
|
||||||
"Топливо на технологию, тонн",
|
|
||||||
"Топливо на технологию, %",
|
|
||||||
"Топливо на энергетику, тонн",
|
|
||||||
"Топливо на энергетику, %",
|
|
||||||
"Потери итого, тонн",
|
|
||||||
"Потери итого, %",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, тонн",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, %",
|
|
||||||
"в т.ч. Неидентифицированные потери, тонн",
|
|
||||||
"в т.ч. Неидентифицированные потери, %"
|
|
||||||
}
|
|
||||||
|
|
||||||
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА
|
if not row_indices or not col_indices:
|
||||||
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---")
|
return []
|
||||||
|
|
||||||
# Выгружаем План в df_ca_normativ
|
row_blocks = self._get_contiguous_blocks(row_indices)
|
||||||
inclusion_list_normativ = {
|
col_blocks = self._get_contiguous_blocks(col_indices)
|
||||||
"Топливо итого, тонн",
|
|
||||||
"Топливо итого, %",
|
|
||||||
"Топливо на технологию, тонн",
|
|
||||||
"Топливо на технологию, %",
|
|
||||||
"Топливо на энергетику, тонн",
|
|
||||||
"Топливо на энергетику, %",
|
|
||||||
"Потери итого, тонн",
|
|
||||||
"Потери итого, %",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
|
|
||||||
"в т.ч. Идентифицированные безвозвратные потери, %**",
|
|
||||||
"в т.ч. Неидентифицированные потери, тонн**",
|
|
||||||
"в т.ч. Неидентифицированные потери, %**"
|
|
||||||
}
|
|
||||||
|
|
||||||
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА
|
tables = []
|
||||||
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
|
for r_start, r_end in row_blocks:
|
||||||
|
for c_start, c_end in col_blocks:
|
||||||
|
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
|
||||||
|
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
|
||||||
|
continue
|
||||||
|
|
||||||
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---")
|
if self._is_header_row(block.iloc[0]):
|
||||||
|
block.columns = block.iloc[0]
|
||||||
|
block = block.iloc[1:].reset_index(drop=True)
|
||||||
|
else:
|
||||||
|
block = block.reset_index(drop=True)
|
||||||
|
block.columns = [f"col_{i}" for i in range(block.shape[1])]
|
||||||
|
|
||||||
df_dict = {
|
tables.append(block)
|
||||||
"plan": df_ca_plan,
|
|
||||||
"fact": df_ca_fact,
|
return tables
|
||||||
"normativ": df_ca_normativ
|
|
||||||
}
|
def _get_contiguous_blocks(self, indices):
|
||||||
return df_dict
|
"""Группирует индексы в непрерывные блоки"""
|
||||||
|
if not indices:
|
||||||
|
return []
|
||||||
|
blocks = []
|
||||||
|
start = indices[0]
|
||||||
|
for i in range(1, len(indices)):
|
||||||
|
if indices[i] != indices[i-1] + 1:
|
||||||
|
blocks.append((start, indices[i-1]))
|
||||||
|
start = indices[i]
|
||||||
|
blocks.append((start, indices[-1]))
|
||||||
|
return blocks
|
||||||
|
|
||||||
|
def _is_header_row(self, series):
|
||||||
|
"""Определяет, похожа ли строка на заголовок"""
|
||||||
|
series_str = series.astype(str).str.strip()
|
||||||
|
non_empty = series_str[series_str != '']
|
||||||
|
if len(non_empty) == 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_not_numeric(val):
|
||||||
|
try:
|
||||||
|
float(val.replace(',', '.'))
|
||||||
|
return False
|
||||||
|
except (ValueError, TypeError):
|
||||||
|
return True
|
||||||
|
|
||||||
|
not_numeric_count = non_empty.apply(is_not_numeric).sum()
|
||||||
|
return not_numeric_count / len(non_empty) > 0.6
|
||||||
|
|
||||||
|
def _get_og_by_name(self, name):
|
||||||
|
"""Функция для получения ID по имени (упрощенная версия)"""
|
||||||
|
# Упрощенная версия - возвращаем имя как есть
|
||||||
|
if not name or not isinstance(name, str):
|
||||||
|
return None
|
||||||
|
return name.strip()
|
||||||
|
|
||||||
|
def parse_sheet(self, file_path: str, sheet_name: str, inclusion_list: set) -> pd.DataFrame:
|
||||||
|
"""Парсинг листа Excel"""
|
||||||
|
# === Извлечение и фильтрация ===
|
||||||
|
tables = self.extract_all_tables(file_path, sheet_name)
|
||||||
|
|
||||||
|
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
|
||||||
|
filtered_tables = []
|
||||||
|
for table in tables:
|
||||||
|
if table.empty:
|
||||||
|
continue
|
||||||
|
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
|
||||||
|
if any(val in inclusion_list for val in first_row_values):
|
||||||
|
filtered_tables.append(table)
|
||||||
|
|
||||||
|
tables = filtered_tables
|
||||||
|
|
||||||
|
# === Итоговый список таблиц датафреймов ===
|
||||||
|
result_list = []
|
||||||
|
|
||||||
|
for table in tables:
|
||||||
|
if table.empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Получаем первую строку (до удаления)
|
||||||
|
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
|
||||||
|
|
||||||
|
# Находим, какой элемент из inclusion_list присутствует
|
||||||
|
matched_key = None
|
||||||
|
for val in first_row_values:
|
||||||
|
if val in inclusion_list:
|
||||||
|
matched_key = val
|
||||||
|
break # берём первый совпадающий заголовок
|
||||||
|
|
||||||
|
if matched_key is None:
|
||||||
|
continue # на всякий случай (хотя уже отфильтровано)
|
||||||
|
|
||||||
|
# Удаляем первую строку (заголовок) и сбрасываем индекс
|
||||||
|
df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
|
||||||
|
|
||||||
|
# Пропускаем, если таблица пустая
|
||||||
|
if df_cleaned.empty:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Первая строка становится заголовком
|
||||||
|
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
|
||||||
|
|
||||||
|
# Преобразуем заголовок: только первый столбец может быть заменён на "name"
|
||||||
|
cleaned_header = []
|
||||||
|
|
||||||
|
# Обрабатываем первый столбец отдельно
|
||||||
|
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
|
||||||
|
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
|
||||||
|
if first_item_str == "" or first_item_str == "nan":
|
||||||
|
cleaned_header.append("name")
|
||||||
|
else:
|
||||||
|
cleaned_header.append(first_item_str)
|
||||||
|
|
||||||
|
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
|
||||||
|
for item in new_header[1:]:
|
||||||
|
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
|
||||||
|
item_str = str(item).strip() if pd.notna(item) else ""
|
||||||
|
cleaned_header.append(item_str)
|
||||||
|
|
||||||
|
# Применяем очищенные названия столбцов
|
||||||
|
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
|
||||||
|
df_cleaned.columns = cleaned_header
|
||||||
|
df_cleaned = df_cleaned.reset_index(drop=True)
|
||||||
|
|
||||||
|
if matched_key.endswith('**'):
|
||||||
|
cleaned_key = matched_key[:-2] # удаляем последние **
|
||||||
|
else:
|
||||||
|
cleaned_key = matched_key
|
||||||
|
|
||||||
|
# Добавляем новую колонку с именем параметра
|
||||||
|
df_cleaned["table"] = cleaned_key
|
||||||
|
|
||||||
|
# Проверяем, что колонка 'name' существует
|
||||||
|
if 'name' not in df_cleaned.columns:
|
||||||
|
print(
|
||||||
|
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
|
||||||
|
continue # или обработать по-другому
|
||||||
|
else:
|
||||||
|
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
|
||||||
|
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
|
||||||
|
|
||||||
|
# Удаляем строки, где id — None, NaN или пустой
|
||||||
|
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
|
||||||
|
# Дополнительно: удаляем None (если не поймал dropna)
|
||||||
|
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
|
||||||
|
|
||||||
|
# Добавляем в словарь
|
||||||
|
result_list.append(df_cleaned)
|
||||||
|
|
||||||
|
# === Объединение и сортировка по id (индекс) и table ===
|
||||||
|
if result_list:
|
||||||
|
combined_df = pd.concat(result_list, axis=0)
|
||||||
|
|
||||||
|
# Сортируем по индексу (id) и по столбцу 'table'
|
||||||
|
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
|
||||||
|
|
||||||
|
# Устанавливаем id как индекс
|
||||||
|
# combined_df.set_index('id', inplace=True)
|
||||||
|
|
||||||
|
return combined_df
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def data_dict_to_json(self, data_dict):
|
def data_dict_to_json(self, data_dict):
|
||||||
''' Служебная функция для парсинга словаря в json. '''
|
''' Служебная функция для парсинга словаря в json. '''
|
||||||
@@ -308,17 +381,3 @@ class SvodkaCAParser(ParserPort):
|
|||||||
filtered_df = df[df['table'].isin(table_values)].copy()
|
filtered_df = df[df['table'].isin(table_values)].copy()
|
||||||
result_dict = {key: group for key, group in filtered_df.groupby('table')}
|
result_dict = {key: group for key, group in filtered_df.groupby('table')}
|
||||||
return result_dict
|
return result_dict
|
||||||
|
|
||||||
def get_value(self, df: pd.DataFrame, params: dict):
|
|
||||||
|
|
||||||
modes = params.get("modes")
|
|
||||||
tables = params.get("tables")
|
|
||||||
if not isinstance(modes, list):
|
|
||||||
raise ValueError("Поле 'modes' должно быть списком")
|
|
||||||
if not isinstance(tables, list):
|
|
||||||
raise ValueError("Поле 'tables' должно быть списком")
|
|
||||||
# Собираем данные
|
|
||||||
data_dict = {}
|
|
||||||
for mode in modes:
|
|
||||||
data_dict[mode] = self.get_data(df, mode, tables)
|
|
||||||
return self.data_dict_to_json(data_dict)
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import pandas as pd
|
import pandas as pd
|
||||||
|
|
||||||
from core.ports import ParserPort
|
from core.ports import ParserPort
|
||||||
|
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
|
||||||
|
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
|
||||||
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
|
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
|
||||||
|
|
||||||
|
|
||||||
@@ -9,6 +11,57 @@ class SvodkaPMParser(ParserPort):
|
|||||||
|
|
||||||
name = "Сводки ПМ"
|
name = "Сводки ПМ"
|
||||||
|
|
||||||
|
def _register_default_getters(self):
|
||||||
|
"""Регистрация геттеров по умолчанию"""
|
||||||
|
# Используем схемы Pydantic как единый источник правды
|
||||||
|
register_getter_from_schema(
|
||||||
|
parser_instance=self,
|
||||||
|
getter_name="single_og",
|
||||||
|
method=self._get_single_og,
|
||||||
|
schema_class=SvodkaPMSingleOGRequest,
|
||||||
|
description="Получение данных по одному ОГ"
|
||||||
|
)
|
||||||
|
|
||||||
|
register_getter_from_schema(
|
||||||
|
parser_instance=self,
|
||||||
|
getter_name="total_ogs",
|
||||||
|
method=self._get_total_ogs,
|
||||||
|
schema_class=SvodkaPMTotalOGsRequest,
|
||||||
|
description="Получение данных по всем ОГ"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _get_single_og(self, params: dict):
|
||||||
|
"""Получение данных по одному ОГ"""
|
||||||
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
|
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
|
||||||
|
|
||||||
|
og_id = validated_params["id"]
|
||||||
|
codes = validated_params["codes"]
|
||||||
|
columns = validated_params["columns"]
|
||||||
|
search = validated_params.get("search")
|
||||||
|
|
||||||
|
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
|
||||||
|
# TODO: Переделать под новую архитектуру
|
||||||
|
return self.get_svodka_og(self.df, og_id, codes, columns, search)
|
||||||
|
|
||||||
|
def _get_total_ogs(self, params: dict):
|
||||||
|
"""Получение данных по всем ОГ"""
|
||||||
|
# Валидируем параметры с помощью схемы Pydantic
|
||||||
|
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
|
||||||
|
|
||||||
|
codes = validated_params["codes"]
|
||||||
|
columns = validated_params["columns"]
|
||||||
|
search = validated_params.get("search")
|
||||||
|
|
||||||
|
# TODO: Переделать под новую архитектуру
|
||||||
|
return self.get_svodka_total(self.df, codes, columns, search)
|
||||||
|
|
||||||
|
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
|
||||||
|
"""Парсинг файла и возврат DataFrame"""
|
||||||
|
# Сохраняем DataFrame для использования в геттерах
|
||||||
|
self.df = self.parse_svodka_pm_files(file_path, params)
|
||||||
|
return self.df
|
||||||
|
|
||||||
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
|
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
|
||||||
"""Определения индекса заголовка в excel по ключевому слову"""
|
"""Определения индекса заголовка в excel по ключевому слову"""
|
||||||
# Читаем первые max_rows строк без заголовков
|
# Читаем первые max_rows строк без заголовков
|
||||||
@@ -16,7 +69,8 @@ class SvodkaPMParser(ParserPort):
|
|||||||
file,
|
file,
|
||||||
sheet_name=sheet,
|
sheet_name=sheet,
|
||||||
header=None,
|
header=None,
|
||||||
nrows=max_rows
|
nrows=max_rows,
|
||||||
|
engine='openpyxl'
|
||||||
)
|
)
|
||||||
|
|
||||||
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
|
||||||
@@ -40,6 +94,7 @@ class SvodkaPMParser(ParserPort):
|
|||||||
header=header_num,
|
header=header_num,
|
||||||
usecols=None,
|
usecols=None,
|
||||||
nrows=2,
|
nrows=2,
|
||||||
|
engine='openpyxl'
|
||||||
)
|
)
|
||||||
|
|
||||||
if df_probe.shape[0] == 0:
|
if df_probe.shape[0] == 0:
|
||||||
@@ -61,7 +116,8 @@ class SvodkaPMParser(ParserPort):
|
|||||||
sheet_name=sheet,
|
sheet_name=sheet,
|
||||||
header=header_num,
|
header=header_num,
|
||||||
usecols=None,
|
usecols=None,
|
||||||
index_col=None
|
index_col=None,
|
||||||
|
engine='openpyxl'
|
||||||
)
|
)
|
||||||
|
|
||||||
if indicator_col_name not in df_full.columns:
|
if indicator_col_name not in df_full.columns:
|
||||||
@@ -99,25 +155,25 @@ class SvodkaPMParser(ParserPort):
|
|||||||
# Проверяем, является ли колонка пустой/некорректной
|
# Проверяем, является ли колонка пустой/некорректной
|
||||||
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
|
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
|
||||||
|
|
||||||
# Проверяем, начинается ли на "Итого"
|
if is_empty_or_unnamed:
|
||||||
if col_str.startswith('Итого'):
|
# Если это пустая колонка, используем последнее хорошее имя
|
||||||
current_name = 'Итого'
|
if last_good_name:
|
||||||
last_good_name = current_name # обновляем last_good_name
|
new_columns.append(last_good_name)
|
||||||
new_columns.append(current_name)
|
else:
|
||||||
elif is_empty_or_unnamed:
|
# Если нет хорошего имени, пропускаем
|
||||||
# Используем последнее хорошее имя
|
continue
|
||||||
new_columns.append(last_good_name)
|
|
||||||
else:
|
else:
|
||||||
# Имя, полученное из exel
|
# Это хорошая колонка
|
||||||
last_good_name = col_str
|
last_good_name = col_str
|
||||||
new_columns.append(col_str)
|
new_columns.append(col_str)
|
||||||
|
|
||||||
|
# Применяем новые заголовки
|
||||||
df_final.columns = new_columns
|
df_final.columns = new_columns
|
||||||
|
|
||||||
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
|
|
||||||
return df_final
|
return df_final
|
||||||
|
|
||||||
def parse(self, file_path: str, params: dict) -> dict:
|
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
|
||||||
|
"""Парсинг ZIP архива со сводками ПМ"""
|
||||||
import zipfile
|
import zipfile
|
||||||
pm_dict = {
|
pm_dict = {
|
||||||
"facts": {},
|
"facts": {},
|
||||||
@@ -125,7 +181,7 @@ class SvodkaPMParser(ParserPort):
|
|||||||
}
|
}
|
||||||
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
|
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
|
||||||
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
|
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
|
||||||
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
||||||
file_list = zip_ref.namelist()
|
file_list = zip_ref.namelist()
|
||||||
for name, id in OG_IDS.items():
|
for name, id in OG_IDS.items():
|
||||||
if id == 'BASH':
|
if id == 'BASH':
|
||||||
@@ -155,9 +211,9 @@ class SvodkaPMParser(ParserPort):
|
|||||||
|
|
||||||
return pm_dict
|
return pm_dict
|
||||||
|
|
||||||
def get_svodka_value(self, df_svodka, id, code, search_value=None):
|
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
|
||||||
''' Служебная функция для простой выборке по сводке '''
|
''' Служебная функция получения значения по коду и столбцу '''
|
||||||
row_index = id
|
row_index = code
|
||||||
|
|
||||||
mask_value = df_svodka.iloc[0] == code
|
mask_value = df_svodka.iloc[0] == code
|
||||||
if search_value is None:
|
if search_value is None:
|
||||||
@@ -254,22 +310,4 @@ class SvodkaPMParser(ParserPort):
|
|||||||
|
|
||||||
return total_result
|
return total_result
|
||||||
|
|
||||||
def get_value(self, df, params):
|
# Убираем старый метод get_value, так как он теперь в базовом классе
|
||||||
og_id = params.get("id")
|
|
||||||
codes = params.get("codes")
|
|
||||||
columns = params.get("columns")
|
|
||||||
search = params.get("search")
|
|
||||||
mode = params.get("mode", "total")
|
|
||||||
if not isinstance(codes, list):
|
|
||||||
raise ValueError("Поле 'codes' должно быть списком")
|
|
||||||
if not isinstance(columns, list):
|
|
||||||
raise ValueError("Поле 'columns' должно быть списком")
|
|
||||||
data = None
|
|
||||||
if mode == "single":
|
|
||||||
if not og_id:
|
|
||||||
raise ValueError("Отсутствует идентификатор ОГ")
|
|
||||||
data = self.get_svodka_og(df, og_id, codes, columns, search)
|
|
||||||
elif mode == "total":
|
|
||||||
data = self.get_svodka_total(df, codes, columns, search)
|
|
||||||
json_result = data_to_json(data)
|
|
||||||
return json_result
|
|
||||||
|
|||||||
Binary file not shown.
140
python_parser/core/schema_utils.py
Normal file
140
python_parser/core/schema_utils.py
Normal file
@@ -0,0 +1,140 @@
|
|||||||
|
"""
|
||||||
|
Упрощенные утилиты для работы со схемами Pydantic
|
||||||
|
"""
|
||||||
|
from typing import List, Dict, Any, Type
|
||||||
|
from pydantic import BaseModel
|
||||||
|
import inspect
|
||||||
|
|
||||||
|
|
||||||
|
def get_required_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
|
||||||
|
"""
|
||||||
|
Извлекает список обязательных полей из схемы Pydantic
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema_class: Класс схемы Pydantic
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Список имен обязательных полей
|
||||||
|
"""
|
||||||
|
required_fields = []
|
||||||
|
|
||||||
|
# Используем model_fields для Pydantic v2 или __fields__ для v1
|
||||||
|
if hasattr(schema_class, 'model_fields'):
|
||||||
|
fields = schema_class.model_fields
|
||||||
|
else:
|
||||||
|
fields = schema_class.__fields__
|
||||||
|
|
||||||
|
for field_name, field_info in fields.items():
|
||||||
|
# В Pydantic v2 есть метод is_required()
|
||||||
|
if hasattr(field_info, 'is_required'):
|
||||||
|
if field_info.is_required():
|
||||||
|
required_fields.append(field_name)
|
||||||
|
elif hasattr(field_info, 'required'):
|
||||||
|
if field_info.required:
|
||||||
|
required_fields.append(field_name)
|
||||||
|
else:
|
||||||
|
# Fallback для старых версий - проверяем наличие default
|
||||||
|
has_default = False
|
||||||
|
|
||||||
|
if hasattr(field_info, 'default'):
|
||||||
|
has_default = field_info.default is not ...
|
||||||
|
elif hasattr(field_info, 'default_factory'):
|
||||||
|
has_default = field_info.default_factory is not None
|
||||||
|
|
||||||
|
if not has_default:
|
||||||
|
required_fields.append(field_name)
|
||||||
|
|
||||||
|
return required_fields
|
||||||
|
|
||||||
|
|
||||||
|
def get_optional_fields_from_schema(schema_class: Type[BaseModel]) -> List[str]:
|
||||||
|
"""
|
||||||
|
Извлекает список необязательных полей из схемы Pydantic
|
||||||
|
|
||||||
|
Args:
|
||||||
|
schema_class: Класс схемы Pydantic
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Список имен необязательных полей
|
||||||
|
"""
|
||||||
|
optional_fields = []
|
||||||
|
|
||||||
|
# Используем model_fields для Pydantic v2 или __fields__ для v1
|
||||||
|
if hasattr(schema_class, 'model_fields'):
|
||||||
|
fields = schema_class.model_fields
|
||||||
|
else:
|
||||||
|
fields = schema_class.__fields__
|
||||||
|
|
||||||
|
for field_name, field_info in fields.items():
|
||||||
|
# В Pydantic v2 есть метод is_required()
|
||||||
|
if hasattr(field_info, 'is_required'):
|
||||||
|
if not field_info.is_required():
|
||||||
|
optional_fields.append(field_name)
|
||||||
|
elif hasattr(field_info, 'required'):
|
||||||
|
if not field_info.required:
|
||||||
|
optional_fields.append(field_name)
|
||||||
|
else:
|
||||||
|
# Fallback для старых версий - проверяем наличие default
|
||||||
|
has_default = False
|
||||||
|
|
||||||
|
if hasattr(field_info, 'default'):
|
||||||
|
has_default = field_info.default is not ...
|
||||||
|
elif hasattr(field_info, 'default_factory'):
|
||||||
|
has_default = field_info.default_factory is not None
|
||||||
|
|
||||||
|
if has_default:
|
||||||
|
optional_fields.append(field_name)
|
||||||
|
|
||||||
|
return optional_fields
|
||||||
|
|
||||||
|
|
||||||
|
def register_getter_from_schema(parser_instance, getter_name: str, method: callable,
|
||||||
|
schema_class: Type[BaseModel], description: str = ""):
|
||||||
|
"""
|
||||||
|
Регистрирует геттер в парсере, используя схему Pydantic для определения параметров
|
||||||
|
|
||||||
|
Args:
|
||||||
|
parser_instance: Экземпляр парсера
|
||||||
|
getter_name: Имя геттера
|
||||||
|
method: Метод для выполнения
|
||||||
|
schema_class: Класс схемы Pydantic
|
||||||
|
description: Описание геттера (если не указано, берется из docstring метода)
|
||||||
|
"""
|
||||||
|
# Извлекаем параметры из схемы
|
||||||
|
required_params = get_required_fields_from_schema(schema_class)
|
||||||
|
optional_params = get_optional_fields_from_schema(schema_class)
|
||||||
|
|
||||||
|
# Если описание не указано, берем из docstring метода
|
||||||
|
if not description:
|
||||||
|
description = inspect.getdoc(method) or ""
|
||||||
|
|
||||||
|
# Регистрируем геттер
|
||||||
|
parser_instance.register_getter(
|
||||||
|
name=getter_name,
|
||||||
|
method=method,
|
||||||
|
required_params=required_params,
|
||||||
|
optional_params=optional_params,
|
||||||
|
description=description
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_params_with_schema(params: Dict[str, Any], schema_class: Type[BaseModel]) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Валидирует параметры с помощью схемы Pydantic
|
||||||
|
|
||||||
|
Args:
|
||||||
|
params: Словарь параметров
|
||||||
|
schema_class: Класс схемы Pydantic
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Валидированные параметры
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ValidationError: Если параметры не прошли валидацию
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Создаем экземпляр схемы для валидации
|
||||||
|
validated_data = schema_class(**params)
|
||||||
|
return validated_data.dict()
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"Ошибка валидации параметров: {str(e)}")
|
||||||
Reference in New Issue
Block a user