Пока не работает

This commit is contained in:
2025-09-03 11:35:04 +03:00
parent 15d13870f3
commit 631e58dad7
9 changed files with 674 additions and 189 deletions

View File

@@ -0,0 +1,88 @@
# Парсер Сводки ПМ
## Описание
Парсер для обработки сводок ПМ (план и факт) с поддержкой множественных геттеров. Наследуется от `ParserPort` и реализует архитектуру hexagonal architecture.
## Доступные геттеры
### 1. `get_single_og`
Получение данных по одному ОГ из сводки ПМ.
**Обязательные параметры:**
- `id` (str): ID ОГ (например, "SNPZ", "KNPZ")
- `codes` (list): Список кодов показателей (например, [78, 79, 81, 82])
- `columns` (list): Список столбцов для извлечения (например, ["ПП", "БП", "СЭБ"])
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"id": "SNPZ",
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_single_og", params)
```
### 2. `get_total_ogs`
Получение данных по всем ОГ из сводки ПМ.
**Обязательные параметры:**
- `codes` (list): Список кодов показателей
- `columns` (list): Список столбцов для извлечения
**Необязательные параметры:**
- `search` (str): Значение для поиска в столбцах
**Пример использования:**
```python
parser = SvodkaPMParser()
params = {
"codes": [78, 79, 81, 82],
"columns": ["ПП", "БП", "СЭБ"]
}
result = parser.get_value("get_total_ogs", params)
```
## Поддерживаемые столбцы
- **ПП, БП**: Данные из файлов плана
- **ТБ, СЭБ, НЭБ**: Данные из файлов факта
## Структура файлов
Парсер ожидает следующую структуру файлов:
- `data/pm_fact/svodka_fact_pm_{OG_ID}.xlsx` или `.xlsm`
- `data/pm_plan/svodka_plan_pm_{OG_ID}.xlsx` или `.xlsm`
Где `{OG_ID}` - это ID ОГ (например, SNPZ, KNPZ и т.д.)
## Формат результата
Результат возвращается в формате JSON со следующей структурой:
```json
{
"ПП": {
"78": 123.45,
"79": 234.56
},
"БП": {
"78": 111.11,
"79": 222.22
},
"СЭБ": {
"78": 333.33,
"79": 444.44
}
}
```
## Обработка ошибок
- Если файл плана/факта не найден, соответствующие столбцы будут пустыми
- Если код показателя не найден, возвращается 0
- Валидация параметров выполняется автоматически

View File

@@ -0,0 +1,326 @@
import pandas as pd
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort):
"""Парсер для сводок ПМ (план и факт)"""
name = "Сводки ПМ"
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
)
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
first_data_row = df_probe.iloc[0]
# Находим столбец с 'INDICATOR_ID'
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
if len(indicator_cols) == 0:
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
)
if indicator_col_name not in df_full.columns:
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
# Перемещаем INDICATOR_ID в начало и делаем индексом
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
df_full = df_full[cols]
df_full.set_index(indicator_col_name, inplace=True)
# Обрезаем до "Итого" + 1
header_list = [str(h).strip() for h in df_full.columns]
try:
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = []
last_good_name = None
for col in df_final.columns:
col_str = str(col).strip()
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, используем имя по умолчанию
new_columns.append(f"col_{len(new_columns)}")
else:
# Это хорошая колонка
last_good_name = col_str
new_columns.append(col_str)
# Убеждаемся, что количество столбцов совпадает
if len(new_columns) != len(df_final.columns):
# Если количество не совпадает, обрезаем или дополняем
if len(new_columns) > len(df_final.columns):
new_columns = new_columns[:len(df_final.columns)]
else:
# Дополняем недостающие столбцы
while len(new_columns) < len(df_final.columns):
new_columns.append(f"col_{len(new_columns)}")
# Применяем новые заголовки
df_final.columns = new_columns
return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
else:
return numeric_values.iloc[0]
else:
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
result = {}
# Безопасно получаем данные, проверяя их наличие
fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None
plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
col_result = {}
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
col_result = {code: None for code in codes}
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
return total_result
# Убираем старый метод get_value, так как он теперь в базовом классе

View File

@@ -1,9 +1,11 @@
import pandas as pd
import pandas as pd
import os
import json
from typing import Dict, Any, List, Optional
from core.ports import ParserPort
from core.schema_utils import register_getter_from_schema, validate_params_with_schema
from app.schemas.svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json
class SvodkaPMParser(ParserPort):
@@ -11,91 +13,66 @@ class SvodkaPMParser(ParserPort):
name = "Сводки ПМ"
def __init__(self):
super().__init__()
self._register_default_getters()
def _register_default_getters(self):
"""Регистрация геттеров по умолчанию"""
# Используем схемы Pydantic как единый источник правды
register_getter_from_schema(
parser_instance=self,
getter_name="single_og",
"""Регистрация геттеров для Сводки ПМ"""
self.register_getter(
name="single_og",
method=self._get_single_og,
schema_class=SvodkaPMSingleOGRequest,
description="Получение данных по одному ОГ"
required_params=["id", "codes", "columns"],
optional_params=["search"],
description="Получение данных по одному ОГ из сводки ПМ"
)
register_getter_from_schema(
parser_instance=self,
getter_name="total_ogs",
self.register_getter(
name="total_ogs",
method=self._get_total_ogs,
schema_class=SvodkaPMTotalOGsRequest,
description="Получение данных по всем ОГ"
required_params=["codes", "columns"],
optional_params=["search"],
description="Получение данных по всем ОГ из сводки ПМ"
)
def _get_single_og(self, params: dict):
"""Получение данных по одному ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMSingleOGRequest)
og_id = validated_params["id"]
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# Здесь нужно получить DataFrame из self.df, но пока используем старую логику
# TODO: Переделать под новую архитектуру
return self.get_svodka_og(self.df, og_id, codes, columns, search)
def _get_total_ogs(self, params: dict):
"""Получение данных по всем ОГ"""
# Валидируем параметры с помощью схемы Pydantic
validated_params = validate_params_with_schema(params, SvodkaPMTotalOGsRequest)
codes = validated_params["codes"]
columns = validated_params["columns"]
search = validated_params.get("search")
# TODO: Переделать под новую архитектуру
return self.get_svodka_total(self.df, codes, columns, search)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
# Сохраняем DataFrame для использования в геттерах
self.df = self.parse_svodka_pm_files(file_path, params)
return self.df
"""Парсинг файла сводки ПМ и возврат DataFrame"""
# Проверяем расширение файла
if not file_path.lower().endswith(('.xlsx', '.xlsm', '.xls')):
raise ValueError(f"Неподдерживаемый формат файла: {file_path}")
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows,
engine='openpyxl'
)
# Определяем тип файла по имени файла
filename = os.path.basename(file_path).lower()
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
if "plan" in filename or "план" in filename:
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
elif "fact" in filename or "факт" in filename:
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
else:
# По умолчанию пытаемся парсить как есть
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов одного ОГ для БП, ПП и факта"""
try:
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
header_num = find_header_row(file_path, sheet_name, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
nrows=2,
engine='openpyxl'
engine='openpyxl' # Явно указываем движок
)
except Exception as e:
raise ValueError(f"Ошибка при чтении файла {file_path}: {str(e)}")
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
@@ -108,16 +85,15 @@ class SvodkaPMParser(ParserPort):
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
file_path,
sheet_name=sheet_name,
header=header_num,
usecols=None,
index_col=None,
engine='openpyxl'
engine='openpyxl' # Явно указываем движок
)
if indicator_col_name not in df_full.columns:
@@ -134,19 +110,18 @@ class SvodkaPMParser(ParserPort):
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
# Удаление полностью пустых столбцов
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
# Обработка заголовков: Unnamed и "Итого" → "Итого"
new_columns = []
last_good_name = None
for col in df_final.columns:
@@ -155,104 +130,69 @@ class SvodkaPMParser(ParserPort):
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
if is_empty_or_unnamed:
# Если это пустая колонка, используем последнее хорошее имя
if last_good_name:
# Проверяем, начинается ли на "Итого"
if col_str.startswith('Итого'):
current_name = 'Итого'
last_good_name = current_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name)
else:
# Если нет хорошего имени, используем имя по умолчанию
new_columns.append(f"col_{len(new_columns)}")
else:
# Это хорошая колонка
# Имя, полученное из excel
last_good_name = col_str
new_columns.append(col_str)
# Убеждаемся, что количество столбцов совпадает
if len(new_columns) != len(df_final.columns):
# Если количество не совпадает, обрезаем или дополняем
if len(new_columns) > len(df_final.columns):
new_columns = new_columns[:len(df_final.columns)]
else:
# Дополняем недостающие столбцы
while len(new_columns) < len(df_final.columns):
new_columns.append(f"col_{len(new_columns)}")
# Применяем новые заголовки
df_final.columns = new_columns
return df_final
def parse_svodka_pm_files(self, zip_path: str, params: dict) -> dict:
"""Парсинг ZIP архива со сводками ПМ"""
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
def _get_svodka_value(self, df_svodka: pd.DataFrame, id: str, code: int, search_value: Optional[str] = None):
"""Служебная функция для простой выборке по сводке"""
row_index = id
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, code, search_value, search_value_filter=None):
''' Служебная функция получения значения по коду и столбцу '''
row_index = code
print(f"🔍 DEBUG: Ищем код '{code}' в строке '{row_index}'")
print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}")
print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}")
# Ищем столбцы, где в первой строке есть значение code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
print(f"🔍 DEBUG: mask_value = {mask_value.tolist()}")
print(f"🔍 DEBUG: mask_name = {mask_name.tolist()}")
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
final_mask = mask_value & mask_name
col_positions = final_mask.values
print(f"🔍 DEBUG: final_mask = {final_mask.tolist()}")
print(f"🔍 DEBUG: col_positions = {col_positions}")
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
print(f"⚠️ Код '{code}' не найден в первой строке")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
print(f"🔍 DEBUG: Найдена строка '{row_index}' в позиции {row_loc}")
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
print(f"🔍 DEBUG: Извлеченные значения: {values.tolist()}")
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения: {numeric_values.tolist()}")
# Агрегация данных (NaN игнорируются)
if search_value is None:
@@ -260,15 +200,43 @@ class SvodkaPMParser(ParserPort):
else:
return numeric_values.iloc[0]
else:
print(f"⚠️ Строка '{row_index}' не найдена в индексе")
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None):
"""Служебная функция получения данных по одному ОГ"""
result = {}
# Безопасно получаем данные, проверяя их наличие
fact_df = pm_dict.get('facts', {}).get(id) if 'facts' in pm_dict else None
plan_df = pm_dict.get('plans', {}).get(id) if 'plans' in pm_dict else None
# Пути к файлам
exel_fact = 'data/pm_fact/svodka_fact_pm_ID'
exel_plan = 'data/pm_plan/svodka_plan_pm_ID'
current_fact = replace_id_in_path(exel_fact, og_id)
current_plan = replace_id_in_path(exel_plan, og_id)
# Загружаем данные
fact_df = None
plan_df = None
if os.path.exists(current_fact):
try:
fact_df = self._parse_svodka_pm(current_fact, 'Сводка Нефтепереработка')
print(f"✅ Файл факта загружен: {current_fact}")
print(f"📊 Столбцы факта: {list(fact_df.columns)}")
print(f"📊 Индексы факта: {list(fact_df.index)}")
except Exception as e:
print(f"❌ Ошибка при загрузке файла факта {current_fact}: {e}")
fact_df = None
if os.path.exists(current_plan):
try:
plan_df = self._parse_svodka_pm(current_plan, 'Сводка Нефтепереработка')
print(f"✅ Файл плана загружен: {current_plan}")
print(f"📊 Столбцы плана: {list(plan_df.columns)}")
print(f"📊 Индексы плана: {list(plan_df.index)}")
except Exception as e:
print(f"❌ Ошибка при загрузке файла плана {current_plan}: {e}")
plan_df = None
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
@@ -276,51 +244,88 @@ class SvodkaPMParser(ParserPort):
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
col_result = {code: None for code in codes}
print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}")
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
val = self._get_svodka_value(plan_df, og_id, code, search_value)
col_result[str(code)] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
col_result = {code: None for code in codes}
print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}")
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
val = self._get_svodka_value(fact_df, og_id, code, search_value)
col_result[str(code)] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
col_result = {str(code): None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
def _get_single_og(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по одному ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = self._get_svodka_og(og_id, codes, columns, search)
json_result = data_to_json(data)
return json_result
def _get_total_ogs(self, params: Dict[str, Any]) -> str:
"""API функция для получения данных по всем ОГ"""
# Если на входе строка — парсим как JSON
if isinstance(params, str):
try:
params = json.loads(params)
except json.JSONDecodeError as e:
raise ValueError(f"❌Некорректный JSON: {e}")
# Проверяем структуру
if not isinstance(params, dict):
raise TypeError("Конфиг должен быть словарём или JSON-строкой")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
total_result = {}
for name, og_id in OG_IDS.items():
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
data = self._get_svodka_og(og_id, codes, columns, search)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
print(f"❌ Ошибка при обработке {og_id}: {e}")
total_result[og_id] = None
return total_result
# Убираем старый метод get_value, так как он теперь в базовом классе
json_result = data_to_json(total_result)
return json_result

View File

@@ -3,6 +3,7 @@ from functools import lru_cache
import json
import numpy as np
import pandas as pd
import os
OG_IDS = {
"Комсомольский НПЗ": "KNPZ",
@@ -22,8 +23,37 @@ OG_IDS = {
"Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS",
"Уфанефтехим": "UNH",
"РНПК": "RNPK",
"КмсНПЗ": "KNPZ",
"АНХК": "ANHK",
"НК НПЗ": "NovKuybNPZ",
"КНПЗ": "KuybNPZ",
"СНПЗ": "CyzNPZ",
"Нижневаторское НПО": "NVNPO",
"ПурНП": "PurNP",
}
SINGLE_OGS = [
"KNPZ",
"ANHK",
"AchNPZ",
"BASH",
"UNPZ",
"UNH",
"NOV",
"NovKuybNPZ",
"KuybNPZ",
"CyzNPZ",
"TuapsNPZ",
"SNPZ",
"RNPK",
"NVNPO",
"KLNPZ",
"PurNP",
"YANOS",
]
SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM",
@@ -40,7 +70,18 @@ SNPZ_IDS = {
def replace_id_in_path(file_path, new_id):
return file_path.replace('ID', str(new_id))
# Заменяем 'ID' на новое значение
modified_path = file_path.replace('ID', str(new_id)) + '.xlsx'
# Проверяем, существует ли файл
if not os.path.exists(modified_path):
# Меняем расширение на .xlsm
directory, filename = os.path.split(modified_path)
name, ext = os.path.splitext(filename)
new_filename = name + '.xlsm'
modified_path = os.path.join(directory, new_filename)
return modified_path
def get_table_name(exel):
@@ -109,6 +150,25 @@ def get_id_by_name(name, dictionary):
return best_match
def find_header_row(file, sheet, search_value="Итого", max_rows=50):
''' Определения индекса заголовка в exel по ключевому слову '''
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def data_to_json(data, indent=2, ensure_ascii=False):
"""
Полностью безопасная сериализация данных в JSON.
@@ -153,11 +213,18 @@ def data_to_json(data, indent=2, ensure_ascii=False):
# --- рекурсия по dict и list ---
elif isinstance(obj, dict):
return {
key: convert_obj(value)
for key, value in obj.items()
if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON)
}
# Обрабатываем только значения, ключи оставляем как строки
converted = {}
for k, v in obj.items():
if is_nan_like(k):
continue # ключи не могут быть null в JSON
# Превращаем ключ в строку, но не пытаемся интерпретировать как число
key_str = str(k)
converted[key_str] = convert_obj(v) # только значение проходит через convert_obj
# Если все значения 0.0 — считаем, что данных нет, т.к. возможно ожидается массив.
if converted and all(v == 0.0 for v in converted.values()):
return None
return converted
elif isinstance(obj, list):
return [convert_obj(item) for item in obj]
@@ -175,7 +242,6 @@ def data_to_json(data, indent=2, ensure_ascii=False):
try:
cleaned_data = convert_obj(data)
cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
return cleaned_data
return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")

View File

@@ -25,7 +25,7 @@ class OGID(str, Enum):
class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field(
id: str = Field(
...,
description="Идентификатор МА для запрашиваемого ОГ",
example="SNPZ"