Сводка ПМ первый геттер корректен

This commit is contained in:
2025-09-03 13:38:36 +03:00
parent 631e58dad7
commit 1fcb44193d

View File

@@ -3,6 +3,9 @@
import pandas as pd
import os
import json
import zipfile
import tempfile
import shutil
from typing import Dict, Any, List, Optional
from core.ports import ParserPort
from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json
@@ -35,25 +38,99 @@ class SvodkaPMParser(ParserPort):
description="Получение данных по всем ОГ из сводки ПМ"
)
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла сводки ПМ и возврат DataFrame"""
def parse(self, file_path: str, params: dict) -> Dict[str, pd.DataFrame]:
"""Парсинг ZIP архива со сводками ПМ и возврат словаря с DataFrame"""
# Проверяем расширение файла
if not file_path.lower().endswith(('.xlsx', '.xlsm', '.xls')):
raise ValueError(f"Неподдерживаемый формат файла: {file_path}")
if not file_path.lower().endswith('.zip'):
raise ValueError(f"Ожидается ZIP архив: {file_path}")
# Определяем тип файла по имени файла
filename = os.path.basename(file_path).lower()
# Создаем временную директорию для разархивирования
temp_dir = tempfile.mkdtemp()
if "plan" in filename or "план" in filename:
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
elif "fact" in filename or "факт" in filename:
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
else:
# По умолчанию пытаемся парсить как есть
sheet_name = "Сводка Нефтепереработка"
return self._parse_svodka_pm(file_path, sheet_name)
try:
# Разархивируем файл
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
print(f"📦 Архив разархивирован в: {temp_dir}")
# Посмотрим, что находится в архиве
print(f"🔍 Содержимое архива:")
for root, dirs, files in os.walk(temp_dir):
level = root.replace(temp_dir, '').count(os.sep)
indent = ' ' * 2 * level
print(f"{indent}{os.path.basename(root)}/")
subindent = ' ' * 2 * (level + 1)
for file in files:
print(f"{subindent}{file}")
# Создаем словари для хранения данных как в оригинале
df_pm_facts = {} # Словарь с данными факта, ключ - ID ОГ
df_pm_plans = {} # Словарь с данными плана, ключ - ID ОГ
# Ищем файлы в архиве (адаптируемся к реальной структуре)
fact_files = []
plan_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.lower().endswith(('.xlsx', '.xlsm')):
full_path = os.path.join(root, file)
if 'fact' in file.lower() or 'факт' in file.lower():
fact_files.append(full_path)
elif 'plan' in file.lower() or 'план' in file.lower():
plan_files.append(full_path)
print(f"📊 Найдено файлов факта: {len(fact_files)}")
print(f"📊 Найдено файлов плана: {len(plan_files)}")
# Обрабатываем найденные файлы
for fact_file in fact_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(fact_file)
# Ищем паттерн типа svodka_fact_pm_SNPZ.xlsm
if 'svodka_fact_pm_' in filename:
og_id = filename.replace('svodka_fact_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка факта: {fact_file} (ОГ: {og_id})')
df_pm_facts[og_id] = self._parse_svodka_pm(fact_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен для {og_id}")
for plan_file in plan_files:
# Извлекаем ID ОГ из имени файла
filename = os.path.basename(plan_file)
# Ищем паттерн типа svodka_plan_pm_SNPZ.xlsm
if 'svodka_plan_pm_' in filename:
og_id = filename.replace('svodka_plan_pm_', '').replace('.xlsx', '').replace('.xlsm', '')
if og_id in SINGLE_OGS:
print(f'📊 Загрузка плана: {plan_file} (ОГ: {og_id})')
df_pm_plans[og_id] = self._parse_svodka_pm(plan_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен для {og_id}")
# Инициализируем None для ОГ, для которых файлы не найдены
for og_id in SINGLE_OGS:
if og_id == 'BASH':
continue
if og_id not in df_pm_facts:
df_pm_facts[og_id] = None
if og_id not in df_pm_plans:
df_pm_plans[og_id] = None
# Возвращаем словарь с данными (как в оригинале)
result = {
'df_pm_facts': df_pm_facts,
'df_pm_plans': df_pm_plans
}
print(f"🎯 Обработано ОГ: {len([k for k, v in df_pm_facts.items() if v is not None])} факт, {len([k for k, v in df_pm_plans.items() if v is not None])} план")
return result
finally:
# Удаляем временную директорию
shutil.rmtree(temp_dir, ignore_errors=True)
print(f"🗑️ Временная директория удалена: {temp_dir}")
def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame:
"""Парсинг отчетов одного ОГ для БП, ПП и факта"""
@@ -147,96 +224,135 @@ class SvodkaPMParser(ParserPort):
return df_final
def _get_svodka_value(self, df_svodka: pd.DataFrame, id: str, code: int, search_value: Optional[str] = None):
def _get_svodka_value(self, df_svodka: pd.DataFrame, og_id: str, code: int, search_value: Optional[str] = None):
"""Служебная функция для простой выборке по сводке"""
row_index = id
print(f"🔍 DEBUG: Ищем код '{code}' в строке '{row_index}'")
print(f"🔍 DEBUG: Ищем код '{code}' для ОГ '{og_id}' в DataFrame с {len(df_svodka)} строками")
print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}")
print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}")
print(f"🔍 DEBUG: Доступные столбцы: {list(df_svodka.columns)}")
# Ищем столбцы, где в первой строке есть значение code
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
print(f"🔍 DEBUG: mask_value = {mask_value.tolist()}")
print(f"🔍 DEBUG: mask_name = {mask_name.tolist()}")
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"❌ Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name
col_positions = final_mask.values
print(f"🔍 DEBUG: final_mask = {final_mask.tolist()}")
print(f"🔍 DEBUG: col_positions = {col_positions}")
if not final_mask.any():
print(f"⚠️ Код '{code}' не найден в первой строке")
# Проверяем, есть ли код в индексе
if code not in df_svodka.index:
print(f"⚠️ Код '{code}' не найден в индексе")
return 0
# Получаем позицию строки с кодом
code_row_loc = df_svodka.index.get_loc(code)
print(f"🔍 DEBUG: Код '{code}' в позиции {code_row_loc}")
# Определяем позиции для поиска
if search_value is None:
# Ищем все позиции кроме "Итого" и None (первый столбец с заголовком)
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name != 'Итого' and col_name is not None:
target_positions.append(i)
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
print(f"🔍 DEBUG: Найдена строка '{row_index}' в позиции {row_loc}")
# Ищем позиции в первой строке, где есть нужное название
target_positions = []
for i, col_name in enumerate(df_svodka.iloc[0]):
if col_name == search_value:
target_positions.append(i)
print(f"🔍 DEBUG: Найдены позиции для '{search_value}': {target_positions[:5]}...")
print(f"🔍 DEBUG: Позиции в первой строке: {target_positions[:5]}...")
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
print(f"🔍 DEBUG: Извлеченные значения: {values.tolist()}")
print(f"🔍 DEBUG: Ищем столбцы с названием '{search_value}'")
print(f"🔍 DEBUG: Целевые позиции: {target_positions[:10]}...")
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения: {numeric_values.tolist()}")
if not target_positions:
print(f"⚠️ Позиции '{search_value}' не найдены")
return 0
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
# Извлекаем значения из найденных позиций
values = []
for pos in target_positions:
# Берем значение из пересечения строки с кодом и позиции столбца
value = df_svodka.iloc[code_row_loc, pos]
# Если это Series, берем первое значение
if isinstance(value, pd.Series):
if len(value) > 0:
# Берем первое не-NaN значение
first_valid = value.dropna().iloc[0] if not value.dropna().empty else 0
values.append(first_valid)
else:
return numeric_values.iloc[0]
values.append(0)
else:
print(f"⚠️ Строка '{row_index}' не найдена в индексе")
return None
values.append(value)
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
print(f"🔍 DEBUG: Числовые значения (первые 5): {numeric_values.tolist()[:5]}")
# Попробуем альтернативное преобразование
try:
# Если pandas не может преобразовать, попробуем вручную
manual_values = []
for v in values:
if pd.isna(v) or v is None:
manual_values.append(0)
else:
try:
# Пробуем преобразовать в float
manual_values.append(float(str(v).replace(',', '.')))
except (ValueError, TypeError):
manual_values.append(0)
print(f"🔍 DEBUG: Ручное преобразование (первые 5): {manual_values[:5]}")
numeric_values = pd.Series(manual_values)
except Exception as e:
print(f"⚠️ Ошибка при ручном преобразовании: {e}")
# Используем исходные значения
numeric_values = pd.Series([0 if pd.isna(v) or v is None else v for v in values])
# Агрегация данных (NaN игнорируются)
if search_value is None:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
else:
# Возвращаем массив всех значений (игнорируя NaN)
if len(numeric_values) > 0:
# Фильтруем NaN значения и возвращаем как список
valid_values = numeric_values.dropna()
if len(valid_values) > 0:
return valid_values.tolist()
else:
return []
else:
return []
def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None):
"""Служебная функция получения данных по одному ОГ"""
result = {}
# Пути к файлам
exel_fact = 'data/pm_fact/svodka_fact_pm_ID'
exel_plan = 'data/pm_plan/svodka_plan_pm_ID'
# Получаем данные из сохраненных словарей (через self.df)
if not hasattr(self, 'df') or self.df is None:
print("❌ Данные не загружены. Сначала загрузите ZIP архив.")
return {col: {str(code): None for code in codes} for col in columns}
current_fact = replace_id_in_path(exel_fact, og_id)
current_plan = replace_id_in_path(exel_plan, og_id)
# Извлекаем словари из сохраненных данных
df_pm_facts = self.df.get('df_pm_facts', {})
df_pm_plans = self.df.get('df_pm_plans', {})
# Загружаем данные
fact_df = None
plan_df = None
# Получаем данные для конкретного ОГ
fact_df = df_pm_facts.get(og_id)
plan_df = df_pm_plans.get(og_id)
if os.path.exists(current_fact):
try:
fact_df = self._parse_svodka_pm(current_fact, 'Сводка Нефтепереработка')
print(f"✅ Файл факта загружен: {current_fact}")
print(f"📊 Столбцы факта: {list(fact_df.columns)}")
print(f"📊 Индексы факта: {list(fact_df.index)}")
except Exception as e:
print(f"❌ Ошибка при загрузке файла факта {current_fact}: {e}")
fact_df = None
if os.path.exists(current_plan):
try:
plan_df = self._parse_svodka_pm(current_plan, 'Сводка Нефтепереработка')
print(f"✅ Файл плана загружен: {current_plan}")
print(f"📊 Столбцы плана: {list(plan_df.columns)}")
print(f"📊 Индексы плана: {list(plan_df.index)}")
except Exception as e:
print(f"❌ Ошибка при загрузке файла плана {current_plan}: {e}")
plan_df = None
print(f"🔍 ===== НАЧАЛО ОБРАБОТКИ ОГ {og_id} =====")
print(f"🔍 Коды: {codes}")
print(f"🔍 Столбцы: {columns}")
print(f"🔍 Получены данные для {og_id}: факт={'' if fact_df is not None else ''}, план={'' if plan_df is not None else ''}")
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
@@ -246,16 +362,19 @@ class SvodkaPMParser(ParserPort):
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}")
else:
print(f"🔍 DEBUG: ===== ОБРАБАТЫВАЕМ '{col}' ИЗ ДАННЫХ ПЛАНА =====")
for code in codes:
val = self._get_svodka_value(plan_df, og_id, code, search_value)
print(f"🔍 DEBUG: --- Код {code} для {col} ---")
val = self._get_svodka_value(plan_df, og_id, code, col)
col_result[str(code)] = val
print(f"🔍 DEBUG: ===== ЗАВЕРШИЛИ ОБРАБОТКУ '{col}' =====")
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}")
else:
for code in codes:
val = self._get_svodka_value(fact_df, og_id, code, search_value)
val = self._get_svodka_value(fact_df, og_id, code, col)
col_result[str(code)] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")