diff --git a/python_parser/adapters/parsers/svodka_pm.py b/python_parser/adapters/parsers/svodka_pm.py index 9f8d5cd..bf83bf2 100644 --- a/python_parser/adapters/parsers/svodka_pm.py +++ b/python_parser/adapters/parsers/svodka_pm.py @@ -3,6 +3,9 @@ import pandas as pd import os import json +import zipfile +import tempfile +import shutil from typing import Dict, Any, List, Optional from core.ports import ParserPort from adapters.pconfig import SINGLE_OGS, replace_id_in_path, find_header_row, data_to_json @@ -35,25 +38,99 @@ class SvodkaPMParser(ParserPort): description="Получение данных по всем ОГ из сводки ПМ" ) - def parse(self, file_path: str, params: dict) -> pd.DataFrame: - """Парсинг файла сводки ПМ и возврат DataFrame""" + def parse(self, file_path: str, params: dict) -> Dict[str, pd.DataFrame]: + """Парсинг ZIP архива со сводками ПМ и возврат словаря с DataFrame""" # Проверяем расширение файла - if not file_path.lower().endswith(('.xlsx', '.xlsm', '.xls')): - raise ValueError(f"Неподдерживаемый формат файла: {file_path}") + if not file_path.lower().endswith('.zip'): + raise ValueError(f"Ожидается ZIP архив: {file_path}") - # Определяем тип файла по имени файла - filename = os.path.basename(file_path).lower() + # Создаем временную директорию для разархивирования + temp_dir = tempfile.mkdtemp() - if "plan" in filename or "план" in filename: - sheet_name = "Сводка Нефтепереработка" - return self._parse_svodka_pm(file_path, sheet_name) - elif "fact" in filename or "факт" in filename: - sheet_name = "Сводка Нефтепереработка" - return self._parse_svodka_pm(file_path, sheet_name) - else: - # По умолчанию пытаемся парсить как есть - sheet_name = "Сводка Нефтепереработка" - return self._parse_svodka_pm(file_path, sheet_name) + try: + # Разархивируем файл + with zipfile.ZipFile(file_path, 'r') as zip_ref: + zip_ref.extractall(temp_dir) + print(f"📦 Архив разархивирован в: {temp_dir}") + + # Посмотрим, что находится в архиве + print(f"🔍 Содержимое архива:") + for root, dirs, files in os.walk(temp_dir): + level = root.replace(temp_dir, '').count(os.sep) + indent = ' ' * 2 * level + print(f"{indent}{os.path.basename(root)}/") + subindent = ' ' * 2 * (level + 1) + for file in files: + print(f"{subindent}{file}") + + # Создаем словари для хранения данных как в оригинале + df_pm_facts = {} # Словарь с данными факта, ключ - ID ОГ + df_pm_plans = {} # Словарь с данными плана, ключ - ID ОГ + + # Ищем файлы в архиве (адаптируемся к реальной структуре) + fact_files = [] + plan_files = [] + + for root, dirs, files in os.walk(temp_dir): + for file in files: + if file.lower().endswith(('.xlsx', '.xlsm')): + full_path = os.path.join(root, file) + if 'fact' in file.lower() or 'факт' in file.lower(): + fact_files.append(full_path) + elif 'plan' in file.lower() or 'план' in file.lower(): + plan_files.append(full_path) + + print(f"📊 Найдено файлов факта: {len(fact_files)}") + print(f"📊 Найдено файлов плана: {len(plan_files)}") + + # Обрабатываем найденные файлы + for fact_file in fact_files: + # Извлекаем ID ОГ из имени файла + filename = os.path.basename(fact_file) + # Ищем паттерн типа svodka_fact_pm_SNPZ.xlsm + if 'svodka_fact_pm_' in filename: + og_id = filename.replace('svodka_fact_pm_', '').replace('.xlsx', '').replace('.xlsm', '') + if og_id in SINGLE_OGS: + print(f'📊 Загрузка факта: {fact_file} (ОГ: {og_id})') + df_pm_facts[og_id] = self._parse_svodka_pm(fact_file, 'Сводка Нефтепереработка') + print(f"✅ Факт загружен для {og_id}") + + for plan_file in plan_files: + # Извлекаем ID ОГ из имени файла + filename = os.path.basename(plan_file) + # Ищем паттерн типа svodka_plan_pm_SNPZ.xlsm + if 'svodka_plan_pm_' in filename: + og_id = filename.replace('svodka_plan_pm_', '').replace('.xlsx', '').replace('.xlsm', '') + if og_id in SINGLE_OGS: + print(f'📊 Загрузка плана: {plan_file} (ОГ: {og_id})') + df_pm_plans[og_id] = self._parse_svodka_pm(plan_file, 'Сводка Нефтепереработка') + print(f"✅ План загружен для {og_id}") + + # Инициализируем None для ОГ, для которых файлы не найдены + for og_id in SINGLE_OGS: + if og_id == 'BASH': + continue + if og_id not in df_pm_facts: + df_pm_facts[og_id] = None + if og_id not in df_pm_plans: + df_pm_plans[og_id] = None + + + + # Возвращаем словарь с данными (как в оригинале) + result = { + 'df_pm_facts': df_pm_facts, + 'df_pm_plans': df_pm_plans + } + + print(f"🎯 Обработано ОГ: {len([k for k, v in df_pm_facts.items() if v is not None])} факт, {len([k for k, v in df_pm_plans.items() if v is not None])} план") + + return result + + finally: + # Удаляем временную директорию + shutil.rmtree(temp_dir, ignore_errors=True) + print(f"🗑️ Временная директория удалена: {temp_dir}") def _parse_svodka_pm(self, file_path: str, sheet_name: str, header_num: Optional[int] = None) -> pd.DataFrame: """Парсинг отчетов одного ОГ для БП, ПП и факта""" @@ -147,96 +224,135 @@ class SvodkaPMParser(ParserPort): return df_final - def _get_svodka_value(self, df_svodka: pd.DataFrame, id: str, code: int, search_value: Optional[str] = None): + def _get_svodka_value(self, df_svodka: pd.DataFrame, og_id: str, code: int, search_value: Optional[str] = None): """Служебная функция для простой выборке по сводке""" - row_index = id - - print(f"🔍 DEBUG: Ищем код '{code}' в строке '{row_index}'") + print(f"🔍 DEBUG: Ищем код '{code}' для ОГ '{og_id}' в DataFrame с {len(df_svodka)} строками") print(f"🔍 DEBUG: Первая строка данных: {df_svodka.iloc[0].tolist()}") print(f"🔍 DEBUG: Доступные индексы: {list(df_svodka.index)}") + print(f"🔍 DEBUG: Доступные столбцы: {list(df_svodka.columns)}") - # Ищем столбцы, где в первой строке есть значение code - mask_value = df_svodka.iloc[0] == code - if search_value is None: - mask_name = df_svodka.columns != 'Итого' - else: - mask_name = df_svodka.columns == search_value - - print(f"🔍 DEBUG: mask_value = {mask_value.tolist()}") - print(f"🔍 DEBUG: mask_name = {mask_name.tolist()}") - - # Убедимся, что маски совпадают по длине - if len(mask_value) != len(mask_name): - raise ValueError( - f"❌ Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}" - ) - - final_mask = mask_value & mask_name - col_positions = final_mask.values - - print(f"🔍 DEBUG: final_mask = {final_mask.tolist()}") - print(f"🔍 DEBUG: col_positions = {col_positions}") - - if not final_mask.any(): - print(f"⚠️ Код '{code}' не найден в первой строке") + # Проверяем, есть ли код в индексе + if code not in df_svodka.index: + print(f"⚠️ Код '{code}' не найден в индексе") return 0 + + # Получаем позицию строки с кодом + code_row_loc = df_svodka.index.get_loc(code) + print(f"🔍 DEBUG: Код '{code}' в позиции {code_row_loc}") + + # Определяем позиции для поиска + if search_value is None: + # Ищем все позиции кроме "Итого" и None (первый столбец с заголовком) + target_positions = [] + for i, col_name in enumerate(df_svodka.iloc[0]): + if col_name != 'Итого' and col_name is not None: + target_positions.append(i) else: - if row_index in df_svodka.index: - # Получаем позицию строки - row_loc = df_svodka.index.get_loc(row_index) - print(f"🔍 DEBUG: Найдена строка '{row_index}' в позиции {row_loc}") + # Ищем позиции в первой строке, где есть нужное название + target_positions = [] + for i, col_name in enumerate(df_svodka.iloc[0]): + if col_name == search_value: + target_positions.append(i) + + print(f"🔍 DEBUG: Найдены позиции для '{search_value}': {target_positions[:5]}...") + print(f"🔍 DEBUG: Позиции в первой строке: {target_positions[:5]}...") - # Извлекаем значения по позициям столбцов - values = df_svodka.iloc[row_loc, col_positions] - print(f"🔍 DEBUG: Извлеченные значения: {values.tolist()}") + print(f"🔍 DEBUG: Ищем столбцы с названием '{search_value}'") + print(f"🔍 DEBUG: Целевые позиции: {target_positions[:10]}...") - # Преобразуем в числовой формат - numeric_values = pd.to_numeric(values, errors='coerce') - print(f"🔍 DEBUG: Числовые значения: {numeric_values.tolist()}") + if not target_positions: + print(f"⚠️ Позиции '{search_value}' не найдены") + return 0 - # Агрегация данных (NaN игнорируются) - if search_value is None: - return numeric_values + # Извлекаем значения из найденных позиций + values = [] + for pos in target_positions: + # Берем значение из пересечения строки с кодом и позиции столбца + value = df_svodka.iloc[code_row_loc, pos] + + # Если это Series, берем первое значение + if isinstance(value, pd.Series): + if len(value) > 0: + # Берем первое не-NaN значение + first_valid = value.dropna().iloc[0] if not value.dropna().empty else 0 + values.append(first_valid) else: - return numeric_values.iloc[0] + values.append(0) else: - print(f"⚠️ Строка '{row_index}' не найдена в индексе") - return None + values.append(value) + + + + # Преобразуем в числовой формат + numeric_values = pd.to_numeric(values, errors='coerce') + print(f"🔍 DEBUG: Числовые значения (первые 5): {numeric_values.tolist()[:5]}") + + # Попробуем альтернативное преобразование + try: + # Если pandas не может преобразовать, попробуем вручную + manual_values = [] + for v in values: + if pd.isna(v) or v is None: + manual_values.append(0) + else: + try: + # Пробуем преобразовать в float + manual_values.append(float(str(v).replace(',', '.'))) + except (ValueError, TypeError): + manual_values.append(0) + + print(f"🔍 DEBUG: Ручное преобразование (первые 5): {manual_values[:5]}") + numeric_values = pd.Series(manual_values) + except Exception as e: + print(f"⚠️ Ошибка при ручном преобразовании: {e}") + # Используем исходные значения + numeric_values = pd.Series([0 if pd.isna(v) or v is None else v for v in values]) + + # Агрегация данных (NaN игнорируются) + if search_value is None: + # Возвращаем массив всех значений (игнорируя NaN) + if len(numeric_values) > 0: + # Фильтруем NaN значения и возвращаем как список + valid_values = numeric_values.dropna() + if len(valid_values) > 0: + return valid_values.tolist() + else: + return [] + else: + return [] + else: + # Возвращаем массив всех значений (игнорируя NaN) + if len(numeric_values) > 0: + # Фильтруем NaN значения и возвращаем как список + valid_values = numeric_values.dropna() + if len(valid_values) > 0: + return valid_values.tolist() + else: + return [] + else: + return [] def _get_svodka_og(self, og_id: str, codes: List[int], columns: List[str], search_value: Optional[str] = None): """Служебная функция получения данных по одному ОГ""" result = {} - # Пути к файлам - exel_fact = 'data/pm_fact/svodka_fact_pm_ID' - exel_plan = 'data/pm_plan/svodka_plan_pm_ID' + # Получаем данные из сохраненных словарей (через self.df) + if not hasattr(self, 'df') or self.df is None: + print("❌ Данные не загружены. Сначала загрузите ZIP архив.") + return {col: {str(code): None for code in codes} for col in columns} - current_fact = replace_id_in_path(exel_fact, og_id) - current_plan = replace_id_in_path(exel_plan, og_id) + # Извлекаем словари из сохраненных данных + df_pm_facts = self.df.get('df_pm_facts', {}) + df_pm_plans = self.df.get('df_pm_plans', {}) - # Загружаем данные - fact_df = None - plan_df = None + # Получаем данные для конкретного ОГ + fact_df = df_pm_facts.get(og_id) + plan_df = df_pm_plans.get(og_id) - if os.path.exists(current_fact): - try: - fact_df = self._parse_svodka_pm(current_fact, 'Сводка Нефтепереработка') - print(f"✅ Файл факта загружен: {current_fact}") - print(f"📊 Столбцы факта: {list(fact_df.columns)}") - print(f"📊 Индексы факта: {list(fact_df.index)}") - except Exception as e: - print(f"❌ Ошибка при загрузке файла факта {current_fact}: {e}") - fact_df = None - - if os.path.exists(current_plan): - try: - plan_df = self._parse_svodka_pm(current_plan, 'Сводка Нефтепереработка') - print(f"✅ Файл плана загружен: {current_plan}") - print(f"📊 Столбцы плана: {list(plan_df.columns)}") - print(f"📊 Индексы плана: {list(plan_df.index)}") - except Exception as e: - print(f"❌ Ошибка при загрузке файла плана {current_plan}: {e}") - plan_df = None + print(f"🔍 ===== НАЧАЛО ОБРАБОТКИ ОГ {og_id} =====") + print(f"🔍 Коды: {codes}") + print(f"🔍 Столбцы: {columns}") + print(f"🔍 Получены данные для {og_id}: факт={'✅' if fact_df is not None else '❌'}, план={'✅' if plan_df is not None else '❌'}") # Определяем, какие столбцы из какого датафрейма брать for col in columns: @@ -246,16 +362,19 @@ class SvodkaPMParser(ParserPort): if plan_df is None: print(f"❌ Невозможно обработать '{col}': нет данных плана для {og_id}") else: + print(f"🔍 DEBUG: ===== ОБРАБАТЫВАЕМ '{col}' ИЗ ДАННЫХ ПЛАНА =====") for code in codes: - val = self._get_svodka_value(plan_df, og_id, code, search_value) + print(f"🔍 DEBUG: --- Код {code} для {col} ---") + val = self._get_svodka_value(plan_df, og_id, code, col) col_result[str(code)] = val + print(f"🔍 DEBUG: ===== ЗАВЕРШИЛИ ОБРАБОТКУ '{col}' =====") elif col in ['ТБ', 'СЭБ', 'НЭБ']: if fact_df is None: print(f"❌ Невозможно обработать '{col}': нет данных факта для {og_id}") else: for code in codes: - val = self._get_svodka_value(fact_df, og_id, code, search_value) + val = self._get_svodka_value(fact_df, og_id, code, col) col_result[str(code)] = val else: print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")