import pandas as pd import os import tempfile import zipfile from typing import Dict, Any, List, Tuple, Optional from core.ports import ParserPort from core.schema_utils import register_getter_from_schema, validate_params_with_schema from app.schemas.statuses_repair_ca import StatusesRepairCARequest from adapters.pconfig import find_header_row, get_og_by_name, data_to_json class StatusesRepairCAParser(ParserPort): """Парсер для статусов ремонта СА""" name = "Статусы ремонта СА" def _register_default_getters(self): """Регистрация геттеров по умолчанию""" register_getter_from_schema( parser_instance=self, getter_name="get_repair_statuses", method=self._get_repair_statuses_wrapper, schema_class=StatusesRepairCARequest, description="Получение статусов ремонта по ОГ и ключам" ) def parse(self, file_path: str, params: dict) -> Dict[str, Any]: """Парсинг файла статусов ремонта СА""" print(f"🔍 DEBUG: StatusesRepairCAParser.parse вызван с файлом: {file_path}") try: # Определяем тип файла if file_path.endswith('.zip'): return self._parse_zip_file(file_path) elif file_path.endswith(('.xlsx', '.xls')): return self._parse_excel_file(file_path) else: raise ValueError(f"Неподдерживаемый формат файла: {file_path}") except Exception as e: print(f"❌ Ошибка при парсинге файла {file_path}: {e}") raise def _parse_zip_file(self, zip_path: str) -> Dict[str, Any]: """Парсинг ZIP архива""" with tempfile.TemporaryDirectory() as temp_dir: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) # Ищем Excel файл в архиве excel_files = [] for root, dirs, files in os.walk(temp_dir): for file in files: if file.endswith(('.xlsx', '.xls')): excel_files.append(os.path.join(root, file)) if not excel_files: raise ValueError("В архиве не найдено Excel файлов") # Берем первый найденный Excel файл excel_file = excel_files[0] print(f"🔍 DEBUG: Найден Excel файл в архиве: {excel_file}") return self._parse_excel_file(excel_file) def _parse_excel_file(self, file_path: str) -> Dict[str, Any]: """Парсинг Excel файла""" print(f"🔍 DEBUG: Парсинг Excel файла: {file_path}") # Парсим данные df_statuses = self._parse_statuses_repair_ca(file_path, 0) if df_statuses.empty: print("⚠️ Нет данных после парсинга") return {"data": [], "records_count": 0} # Преобразуем в список словарей для хранения data_list = self._data_to_structured_json(df_statuses) result = { "data": data_list, "records_count": len(data_list) } # Устанавливаем данные в парсер для использования в геттерах self.data_dict = result print(f"✅ Парсинг завершен. Получено {len(data_list)} записей") return result def _parse_statuses_repair_ca(self, file: str, sheet: int, header_num: Optional[int] = None) -> pd.DataFrame: """Парсинг отчетов статусов ремонта""" # === ШАГ 1: Создание MultiIndex === columns_level_1 = [ 'id', 'ОГ', 'Дата начала ремонта', 'Готовность к КР', 'Отставание / опережение подготовки к КР', 'Заключение договоров на СМР', 'Поставка МТР' ] sub_columns_cmp = { 'ДВ': ['всего', 'плановая дата', 'факт', '%'], 'Сметы': ['всего', 'плановая дата', 'факт', '%'], 'Формирование лотов': ['всего', 'плановая дата', 'факт', '%'], 'Договор': ['всего', 'плановая дата', 'факт', '%'] } sub_columns_mtp = { 'Выполнение плана на текущую дату': ['инициирования закупок', 'заключения договоров', 'поставки'], 'На складе, позиций': ['всего', 'поставлено', '%', 'динамика за прошедшую неделю, поз.'] } # Формируем MultiIndex — ВСЕ кортежи длиной 3 cols = [] for col1 in columns_level_1: if col1 == 'id': cols.append((col1, '', '')) elif col1 == 'ОГ': cols.append((col1, '', '')) elif col1 == 'Дата начала ремонта': cols.append((col1, '', '')) elif col1 == 'Готовность к КР': cols.extend([(col1, 'План', ''), (col1, 'Факт', '')]) elif col1 == 'Отставание / опережение подготовки к КР': cols.extend([ (col1, 'Отставание / опережение', ''), (col1, 'Динамика за прошедшую неделю', '') ]) elif col1 == 'Заключение договоров на СМР': for subcol, sub_sub_cols in sub_columns_cmp.items(): for ssc in sub_sub_cols: cols.append((col1, subcol, ssc)) elif col1 == 'Поставка МТР': for subcol, sub_sub_cols in sub_columns_mtp.items(): for ssc in sub_sub_cols: cols.append((col1, subcol, ssc)) else: cols.append((col1, '', '')) # Создаем MultiIndex multi_index = pd.MultiIndex.from_tuples(cols, names=['Level1', 'Level2', 'Level3']) # === ШАГ 2: Читаем данные из Excel === if header_num is None: header_num = find_header_row(file, sheet, search_value="ОГ") df_data = pd.read_excel( file, skiprows=header_num + 3, header=None, index_col=0, engine='openpyxl' ) # Убираем строки с пустыми данными df_data.dropna(how='all', inplace=True) # Применяем функцию get_og_by_name для 'id' df_data['id'] = df_data.iloc[:, 0].copy() df_data['id'] = df_data['id'].apply(get_og_by_name) # Перемещаем 'id' на первое место cols = ['id'] + [col for col in df_data.columns if col != 'id'] df_data = df_data[cols] # Удаляем строки с пустым id df_data = df_data.dropna(subset=['id']) df_data = df_data[df_data['id'].astype(str).str.strip() != ''] # Сбрасываем индекс df_data = df_data.reset_index(drop=True) # Выбираем 4-ю колонку (индекс 3) для фильтрации col_index = 3 numeric_series = pd.to_numeric(df_data.iloc[:, col_index], errors='coerce') # Фильтруем: оставляем только строки, где значение — число mask = pd.notna(numeric_series) df_data = df_data[mask].copy() # === ШАГ 3: Применяем MultiIndex к данным === df_data.columns = multi_index return df_data def _data_to_structured_json(self, df: pd.DataFrame) -> List[Dict[str, Any]]: """Преобразование DataFrame с MultiIndex в структурированный JSON""" if df.empty: return [] result_list = [] for idx, row in df.iterrows(): result = {} for col in df.columns: value = row[col] # Пропускаем NaN if pd.isna(value): value = None # Распаковываем уровни level1, level2, level3 = col # Убираем пустые/неинформативные значения level1 = str(level1).strip() if level1 else "" level2 = str(level2).strip() if level2 else None level3 = str(level3).strip() if level3 else None # Обработка id и ОГ — выносим на верх if level1 == "id": result["id"] = value elif level1 == "ОГ": result["name"] = value else: # Группируем по Level1 if level1 not in result: result[level1] = {} # Вложенные уровни if level2 and level3: if level2 not in result[level1]: result[level1][level2] = {} result[level1][level2][level3] = value elif level2: result[level1][level2] = value else: result[level1] = value result_list.append(result) return result_list def _get_repair_statuses_wrapper(self, params: dict): """Обертка для получения статусов ремонта""" print(f"🔍 DEBUG: _get_repair_statuses_wrapper вызван с параметрами: {params}") # Валидация параметров validated_params = validate_params_with_schema(params, StatusesRepairCARequest) ids = validated_params.get('ids') keys = validated_params.get('keys') print(f"🔍 DEBUG: Запрошенные ОГ: {ids}") print(f"🔍 DEBUG: Запрошенные ключи: {keys}") # Получаем данные из парсера if hasattr(self, 'df') and self.df is not None: # Данные загружены из MinIO if isinstance(self.df, dict): # Это словарь (как в других парсерах) data_source = self.df.get('data', []) elif hasattr(self.df, 'columns') and 'data' in self.df.columns: # Это DataFrame data_source = [] for _, row in self.df.iterrows(): if row['data']: data_source.extend(row['data']) else: data_source = [] elif hasattr(self, 'data_dict') and self.data_dict: # Данные из локального парсинга data_source = self.data_dict.get('data', []) else: print("⚠️ Нет данных в парсере") return [] print(f"🔍 DEBUG: Используем данные с {len(data_source)} записями") # Фильтруем данные filtered_data = self._filter_statuses_data(data_source, ids, keys) print(f"🔍 DEBUG: Отфильтровано {len(filtered_data)} записей") return filtered_data def _filter_statuses_data(self, data_source: List[Dict], ids: Optional[List[str]], keys: Optional[List[List[str]]]) -> List[Dict]: """Фильтрация данных по ОГ и ключам""" if not data_source: return [] # Если не указаны фильтры, возвращаем все данные if not ids and not keys: return data_source filtered_data = [] for item in data_source: # Фильтр по ОГ if ids is not None: item_id = item.get('id') if item_id not in ids: continue # Если указаны ключи, извлекаем только нужные поля if keys is not None: filtered_item = self._extract_keys_from_item(item, keys) if filtered_item: filtered_data.append(filtered_item) else: filtered_data.append(item) return filtered_data def _extract_keys_from_item(self, item: Dict[str, Any], keys: List[List[str]]) -> Dict[str, Any]: """Извлечение указанных ключей из элемента""" result = {} # Всегда добавляем id и name if 'id' in item: result['id'] = item['id'] if 'name' in item: result['name'] = item['name'] # Извлекаем указанные ключи for key_path in keys: if not key_path: continue value = item for key in key_path: if isinstance(value, dict) and key in value: value = value[key] else: value = None break if value is not None: # Строим вложенную структуру current = result for i, key in enumerate(key_path): if i == len(key_path) - 1: current[key] = value else: if key not in current: current[key] = {} current = current[key] return result