Files
python_parser/PARSER_DEVELOPMENT_GUIDE.md

36 KiB
Raw Permalink Blame History

📚 Руководство по разработке парсеров

Полное руководство по созданию новых парсеров для системы NIN Excel Parsers API.

📋 Содержание

  1. Архитектура системы
  2. Структура проекта
  3. Создание нового парсера
  4. Регистрация парсера
  5. Создание API эндпоинтов
  6. Интеграция с Streamlit
  7. Тестирование
  8. Лучшие практики
  9. Примеры

🏗️ Архитектура системы

Hexagonal Architecture

Система построена на принципах Hexagonal Architecture (Ports & Adapters):

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Streamlit UI  │    │   FastAPI       │    │   MinIO Storage │
│   (Adapter)     │◄──►│   (Application) │◄──►│   (Adapter)     │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                              │
                              ▼
                    ┌─────────────────┐
                    │   ParserPort    │
                    │   (Core)        │
                    └─────────────────┘
                              ▲
                              │
                    ┌─────────────────┐
                    │   Parser        │
                    │   (Adapter)     │
                    └─────────────────┘

Ключевые компоненты

  • ParserPort - базовый класс для всех парсеров
  • ReportService - сервис для управления отчетами
  • MinIOStorageAdapter - адаптер для хранения данных
  • FastAPI - веб-фреймворк для API
  • Streamlit - веб-интерфейс

📁 Структура проекта

python_parser/
├── adapters/
│   ├── parsers/           # Парсеры (адаптеры)
│   │   ├── __init__.py
│   │   ├── monitoring_fuel.py
│   │   ├── monitoring_tar.py
│   │   ├── svodka_ca.py
│   │   ├── svodka_pm.py
│   │   ├── svodka_repair_ca.py
│   │   └── statuses_repair_ca.py
│   ├── pconfig.py         # Конфигурация парсеров
│   └── storage.py         # Адаптер хранилища
├── app/
│   ├── main.py           # FastAPI приложение
│   └── schemas/          # Pydantic схемы
│       ├── monitoring_fuel.py
│       ├── monitoring_tar.py
│       ├── svodka_ca.py
│       ├── svodka_pm.py
│       ├── svodka_repair_ca.py
│       └── statuses_repair_ca.py
├── core/
│   ├── models.py         # Модели данных
│   ├── ports.py          # Базовые порты
│   ├── schema_utils.py   # Утилиты схем
│   └── services.py       # Сервисы
└── requirements.txt

streamlit_app/
├── streamlit_app.py      # Streamlit интерфейс
└── requirements.txt

🔧 Создание нового парсера

1. Создание Pydantic схем

Создайте файл схемы в python_parser/app/schemas/your_parser.py:

from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from enum import Enum

class YourParserMode(str, Enum):
    """Режимы работы парсера"""
    MODE1 = "mode1"
    MODE2 = "mode2"

class YourParserRequest(BaseModel):
    """Схема запроса для основного геттера"""
    param1: Optional[str] = Field(
        None,
        description="Описание параметра 1",
        example="example_value"
    )
    param2: Optional[List[str]] = Field(
        None,
        description="Описание параметра 2",
        example=["value1", "value2"]
    )
    mode: Optional[YourParserMode] = Field(
        None,
        description="Режим работы парсера",
        example="mode1"
    )

    class Config:
        json_schema_extra = {
            "example": {
                "param1": "example_value",
                "param2": ["value1", "value2"],
                "mode": "mode1"
            }
        }

class YourParserFullRequest(BaseModel):
    """Схема запроса для получения всех данных"""
    # Пустая схема - возвращает все данные без фильтрации
    pass

    class Config:
        json_schema_extra = {
            "example": {}
        }

2. Создание парсера

Создайте файл парсера в python_parser/adapters/parsers/your_parser.py:

import pandas as pd
import os
import zipfile
import tempfile
from typing import Dict, Any, Optional
from core.ports import ParserPort
from adapters.pconfig import find_header_row, data_to_json
from app.schemas.your_parser import YourParserRequest, YourParserFullRequest


class YourParser(ParserPort):
    """Парсер для вашего типа данных"""
    
    name = "your_parser"  # Уникальное имя парсера
    
    def __init__(self):
        super().__init__()
        # Регистрируем геттеры
        self.register_getter("get_data", YourParserRequest, self._get_data_wrapper)
        self.register_getter("get_full_data", YourParserFullRequest, self._get_full_data_wrapper)
        
        # Данные парсера
        self.data_dict = {}
    
    def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
        """Основной метод парсинга"""
        print(f"🔍 DEBUG: YourParser.parse вызван с файлом: {file_path}")
        
        try:
            # Проверяем тип файла (пример для ZIP-only парсера)
            if not file_path.endswith('.zip'):
                raise ValueError(f"Неподдерживаемый тип файла: {file_path}. Ожидается только ZIP архив.")
            
            # Обрабатываем ZIP архив
            result = self._parse_zip_archive(file_path)
            
            # Сохраняем результат
            self.data_dict = result
            print(f"✅ Парсинг завершен. Получено {len(result)} записей")
            return result
            
        except Exception as e:
            print(f"❌ Ошибка при парсинге: {e}")
            raise
    
    def _parse_zip_archive(self, zip_path: str) -> Dict[str, Any]:
        """Парсинг ZIP архива"""
        print(f"📦 Обработка ZIP архива: {zip_path}")
        
        with tempfile.TemporaryDirectory() as temp_dir:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(temp_dir)
            
            # Ищем нужные файлы
            target_files = []
            for root, dirs, files in os.walk(temp_dir):
                for file in files:
                    if file.endswith(('.xlsx', '.xls')):
                        target_files.append(os.path.join(root, file))
            
            if not target_files:
                raise ValueError("В архиве не найдены поддерживаемые файлы")
            
            # Парсим все найденные файлы
            all_data = {}
            for file_path in target_files:
                file_data = self._parse_single_file(file_path)
                # Объединяем данные
                all_data.update(file_data)
            
            return all_data
    
    def _parse_single_file(self, file_path: str) -> Dict[str, Any]:
        """Парсинг одного файла"""
        print(f"📁 Обработка файла: {file_path}")
        
        try:
            # Читаем Excel файл
            excel_file = pd.ExcelFile(file_path)
            available_sheets = excel_file.sheet_names
            
            # Обрабатываем нужные листы
            result_data = {}
            for sheet_name in available_sheets:
                if self._should_process_sheet(sheet_name):
                    sheet_data = self._parse_sheet(file_path, sheet_name)
                    result_data.update(sheet_data)
            
            return result_data
            
        except Exception as e:
            print(f"❌ Ошибка при обработке файла {file_path}: {e}")
            return {}
    
    def _should_process_sheet(self, sheet_name: str) -> bool:
        """Определяет, нужно ли обрабатывать лист"""
        # Логика фильтрации листов
        return True  # Или ваша логика
    
    def _parse_sheet(self, file_path: str, sheet_name: str) -> Dict[str, Any]:
        """Парсинг конкретного листа"""
        try:
            # Находим заголовок
            header_num = find_header_row(file_path, sheet_name, search_value="1")
            if header_num is None:
                print(f"❌ Не найден заголовок в листе {sheet_name}")
                return {}
            
            # Читаем данные
            df = pd.read_excel(
                file_path,
                sheet_name=sheet_name,
                header=header_num,
                index_col=None
            )
            
            # Обрабатываем данные
            processed_data = self._process_dataframe(df, sheet_name)
            
            return processed_data
            
        except Exception as e:
            print(f"❌ Ошибка при обработке листа {sheet_name}: {e}")
            return {}
    
    def _process_dataframe(self, df: pd.DataFrame, sheet_name: str) -> Dict[str, Any]:
        """Обработка DataFrame"""
        # Ваша логика обработки данных
        return {"sheet_name": sheet_name, "data": df.to_dict('records')}
    
    def _get_data_wrapper(self, params: dict) -> Dict[str, Any]:
        """Обертка для основного геттера"""
        print(f"🔍 DEBUG: _get_data_wrapper вызван с параметрами: {params}")
        
        # Валидируем параметры
        validated_params = YourParserRequest(**params)
        
        # Получаем данные из парсера
        data_source = self._get_data_source()
        
        if not data_source:
            print("⚠️ Нет данных в парсере")
            return {}
        
        # Фильтруем данные по параметрам
        filtered_data = self._filter_data(data_source, validated_params)
        
        # Конвертируем в JSON
        try:
            result_json = data_to_json(filtered_data)
            return result_json
        except Exception as e:
            print(f"❌ Ошибка при конвертации данных в JSON: {e}")
            return {}
    
    def _get_full_data_wrapper(self, params: dict) -> Dict[str, Any]:
        """Обертка для геттера всех данных"""
        print(f"🔍 DEBUG: _get_full_data_wrapper вызван с параметрами: {params}")
        
        # Получаем данные из парсера
        data_source = self._get_data_source()
        
        if not data_source:
            print("⚠️ Нет данных в парсере")
            return {}
        
        # Конвертируем все данные в JSON
        try:
            result_json = data_to_json(data_source)
            return result_json
        except Exception as e:
            print(f"❌ Ошибка при конвертации данных в JSON: {e}")
            return {}
    
    def _get_data_source(self) -> Dict[str, Any]:
        """Получает источник данных"""
        if hasattr(self, 'df') and self.df is not None:
            # Данные загружены из MinIO
            if isinstance(self.df, dict):
                return self.df
            else:
                return {}
        elif hasattr(self, 'data_dict') and self.data_dict:
            # Данные из локального парсинга
            return self.data_dict
        else:
            return {}
    
    def _filter_data(self, data_source: Dict[str, Any], params: YourParserRequest) -> Dict[str, Any]:
        """Фильтрует данные по параметрам"""
        # Ваша логика фильтрации
        return data_source

📝 Регистрация парсера

1. Регистрация в init.py

Добавьте импорт в python_parser/adapters/parsers/__init__.py:

from .monitoring_fuel import MonitoringFuelParser
from .monitoring_tar import MonitoringTarParser
from .your_parser import YourParser  # Добавить эту строку
from .svodka_ca import SvodkaCAParser
from .svodka_pm import SvodkaPMParser
from .svodka_repair_ca import SvodkaRepairCAParser
from .statuses_repair_ca import StatusesRepairCAParser

__all__ = [
    'MonitoringFuelParser',
    'MonitoringTarParser',
    'YourParser',  # Добавить эту строку
    'SvodkaCAParser',
    'SvodkaPMParser',
    'SvodkaRepairCAParser',
    'StatusesRepairCAParser'
]

2. Регистрация в main.py

Добавьте импорт и регистрацию в python_parser/app/main.py:

# Импорты
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser, MonitoringTarParser, YourParser, SvodkaRepairCAParser, StatusesRepairCAParser

from app.schemas.your_parser import YourParserRequest, YourParserFullRequest

# Регистрация парсера
PARSERS.update({
    'svodka_pm': SvodkaPMParser,
    'svodka_ca': SvodkaCAParser,
    'monitoring_fuel': MonitoringFuelParser,
    'monitoring_tar': MonitoringTarParser,
    'your_parser': YourParser,  # Добавить эту строку
    'svodka_repair_ca': SvodkaRepairCAParser,
    'statuses_repair_ca': StatusesRepairCAParser,
})

3. Регистрация в services.py

Добавьте логику выбора геттера в python_parser/core/services.py:

elif request.report_type == 'monitoring_tar':
    # Для monitoring_tar используем геттер get_tar_data
    getter_name = 'get_tar_data'
elif request.report_type == 'your_parser':  # Добавить эту секцию
    # Для your_parser используем геттер get_data
    getter_name = 'get_data'
elif request.report_type == 'monitoring_fuel':

🌐 Создание API эндпоинтов

Добавьте эндпоинты в python_parser/app/main.py:

# ====== YOUR PARSER ENDPOINTS ======

@app.post("/your_parser/upload", tags=[YourParser.name],
          summary="Загрузка отчета вашего типа")
async def upload_your_parser(
    file: UploadFile = File(...)
):
    """Загрузка и обработка отчета вашего типа

    ### Поддерживаемые форматы:
    - **ZIP архивы** с файлами (только ZIP)

    ### Структура данных:
    - Описание структуры данных
    - Какие данные извлекаются
    - Формат возвращаемых данных
    """
    report_service = get_report_service()

    try:
        # Проверяем тип файла - только ZIP архивы
        if not file.filename.endswith('.zip'):
            raise HTTPException(
                status_code=400,
                detail="Неподдерживаемый тип файла. Ожидается только ZIP архив (.zip)"
            )

        # Читаем содержимое файла
        file_content = await file.read()

        # Создаем запрос на загрузку
        upload_request = UploadRequest(
            report_type='your_parser',
            file_content=file_content,
            file_name=file.filename
        )

        # Загружаем отчет
        result = report_service.upload_report(upload_request)

        if result.success:
            return UploadResponse(
                success=True,
                message="Отчет успешно загружен и обработан",
                report_id=result.object_id,
                filename=file.filename
            ).model_dump()
        else:
            return UploadErrorResponse(
                success=False,
                message=result.message,
                error_code="ERR_UPLOAD",
                details=None
            ).model_dump()

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")


@app.post("/your_parser/get_data", tags=[YourParser.name],
          summary="Получение данных из отчета")
async def get_your_parser_data(
    request_data: YourParserRequest
):
    """Получение данных из отчета

    ### Структура параметров:
    - `param1`: **Описание параметра 1** (опциональный)
    - `param2`: **Описание параметра 2** (опциональный)
    - `mode`: **Режим работы** (опциональный)

    ### Пример тела запроса:
    ```json
    {
        "param1": "example_value",
        "param2": ["value1", "value2"],
        "mode": "mode1"
    }
    ```
    """
    report_service = get_report_service()

    try:
        # Создаем запрос
        request_dict = request_data.model_dump()
        request = DataRequest(
            report_type='your_parser',
            get_params=request_dict
        )

        # Получаем данные
        result = report_service.get_data(request)

        if result.success:
            return {
                "success": True,
                "data": result.data
            }
        else:
            raise HTTPException(status_code=404, detail=result.message)

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")


@app.post("/your_parser/get_full_data", tags=[YourParser.name],
          summary="Получение всех данных из отчета")
async def get_your_parser_full_data():
    """Получение всех данных из отчета без фильтрации

    ### Возвращает:
    - Все данные без фильтрации
    - Полная структура данных
    """
    report_service = get_report_service()

    try:
        # Создаем запрос без параметров
        request = DataRequest(
            report_type='your_parser',
            get_params={}
        )

        # Получаем данные через геттер get_full_data
        result = report_service.get_data(request, getter_name='get_full_data')

        if result.success:
            return {
                "success": True,
                "data": result.data
            }
        else:
            raise HTTPException(status_code=404, detail=result.message)

    except HTTPException:
        raise
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")

🎨 Интеграция с Streamlit

1. Добавление новой вкладки

В streamlit_app/streamlit_app.py:

# Обновите список вкладок
tab1, tab2, tab3, tab4, tab5, tab6, tab7 = st.tabs([
    "📊 Сводки ПМ", 
    "🏭 Сводки СА", 
    "⛽ Мониторинг топлива",
    "⚡ Мониторинг ТЭР",
    "🔧 Ремонт СА",
    "📋 Статусы ремонта СА",
    "🆕 Ваш парсер"  # Добавить новую вкладку
])

# Добавьте новую вкладку
with tab7:
    st.header("🆕 Ваш парсер")
    
    # Секция загрузки файлов
    st.subheader("📤 Загрузка файлов")
            uploaded_file = st.file_uploader(
            "Выберите ZIP архив для вашего парсера",
            type=['zip'],
            key="your_parser_upload"
        )
    
    if uploaded_file is not None:
        if st.button("📤 Загрузить файл", key="your_parser_upload_btn"):
            with st.spinner("Загружаем файл..."):
                file_data = uploaded_file.read()
                result, status_code = upload_file_to_api("/your_parser/upload", file_data, uploaded_file.name)
                
                if status_code == 200:
                    st.success("✅ Файл успешно загружен!")
                    st.json(result)
                else:
                    st.error(f"❌ Ошибка загрузки: {result}")
    
    # Секция получения данных
    st.subheader("📊 Получение данных")
    
    col1, col2 = st.columns(2)
    
    with col1:
        st.subheader("🔍 Фильтрованные данные")
        
        # Параметры запроса
        param1 = st.text_input("Параметр 1:", key="your_parser_param1")
        param2 = st.multiselect("Параметр 2:", ["value1", "value2", "value3"], key="your_parser_param2")
        mode = st.selectbox("Режим:", ["mode1", "mode2"], key="your_parser_mode")
        
        if st.button("📊 Получить данные", key="your_parser_get_data_btn"):
            with st.spinner("Получаем данные..."):
                request_data = {
                    "param1": param1 if param1 else None,
                    "param2": param2 if param2 else None,
                    "mode": mode if mode else None
                }
                result, status_code = make_api_request("/your_parser/get_data", request_data)
                
                if status_code == 200 and result.get("success"):
                    st.success("✅ Данные успешно получены!")
                    
                    # Показываем данные
                    data = result.get("data", {}).get("value", {})
                    if data:
                        st.subheader("📋 Результат:")
                        st.json(data)
                    else:
                        st.info("📋 Нет данных для отображения")
                else:
                    st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
    
    with col2:
        st.subheader("📋 Все данные")
        
        if st.button("📊 Получить все данные", key="your_parser_get_full_data_btn"):
            with st.spinner("Получаем все данные..."):
                result, status_code = make_api_request("/your_parser/get_full_data", {})
                
                if status_code == 200 and result.get("success"):
                    st.success("✅ Все данные успешно получены!")
                    
                    # Показываем данные
                    data = result.get("data", {}).get("value", {})
                    if data:
                        st.subheader("📋 Результат:")
                        st.json(data)
                    else:
                        st.info("📋 Нет данных для отображения")
                else:
                    st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")

2. Обновление информации о проекте

# В секции "О проекте"
**Возможности:**
- 📊 Парсинг сводок ПМ (план и факт)
- 🏭 Парсинг сводок СА
-  Мониторинг топлива
-  Мониторинг ТЭР (Топливно-энергетические ресурсы)
- 🔧 Управление ремонтными работами СА
- 📋 Мониторинг статусов ремонта СА
- 🆕 Ваш новый парсер  # Добавить эту строку

🧪 Тестирование

1. Создание тестового скрипта

Создайте test_your_parser.py:

#!/usr/bin/env python3
"""
Тестовый скрипт для проверки парсера Your Parser
"""

import requests
import os

API_BASE_URL = "http://localhost:8000"

def test_upload_file():
    """Тест загрузки файла"""
    print("🔍 Тестируем загрузку файла...")
    
    # Используем тестовый файл
    file_path = "path/to/your/test/file.xlsx"
    
    if not os.path.exists(file_path):
        print(f"❌ Файл не найден: {file_path}")
        return False
    
    print(f"📁 Найден файл: {file_path}")
    
    with open(file_path, 'rb') as f:
        file_content = f.read()
    
    files = {
        'file': ('test_file.xlsx', file_content, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
    }
    
    try:
        response = requests.post(f"{API_BASE_URL}/your_parser/upload", files=files)
        print(f"📊 Статус ответа: {response.status_code}")
        print(f"📄 Ответ: {response.text}")
        
        if response.status_code == 200:
            result = response.json()
            if result.get("success"):
                print("✅ Загрузка успешна!")
                return True
            else:
                print("❌ Ошибка загрузки!")
                return False
        else:
            print("❌ Ошибка HTTP!")
            return False
            
    except Exception as e:
        print(f"❌ Исключение: {e}")
        return False

def test_get_data():
    """Тест получения данных"""
    print("\n🔍 Тестируем получение данных...")
    
    test_data = {
        "param1": "test_value",
        "param2": ["value1", "value2"],
        "mode": "mode1"
    }
    
    try:
        response = requests.post(f"{API_BASE_URL}/your_parser/get_data", json=test_data)
        print(f"📊 Статус ответа: {response.status_code}")
        
        if response.status_code == 200:
            result = response.json()
            if result.get("success"):
                print("✅ Получение данных успешно!")
                data = result.get("data", {}).get("value", {})
                print(f"📊 Получено данных: {len(data)} записей")
                return True
            else:
                print("❌ Ошибка получения данных!")
                print(f"📄 Ответ: {response.text}")
                return False
        else:
            print("❌ Ошибка HTTP!")
            print(f"📄 Ответ: {response.text}")
            return False
            
    except Exception as e:
        print(f"❌ Исключение: {e}")
        return False

def main():
    """Основная функция тестирования"""
    print("🚀 Тестирование парсера Your Parser")
    print("=" * 70)
    
    # Тест 1: Загрузка файла
    upload_success = test_upload_file()
    
    if upload_success:
        # Тест 2: Получение данных
        get_data_success = test_get_data()
        
        print("\n" + "=" * 70)
        print("📋 Результаты тестирования:")
        print(f"  ✅ Загрузка файла: {'ПРОЙДЕН' if upload_success else 'ПРОВАЛЕН'}")
        print(f"  ✅ Получение данных: {'ПРОЙДЕН' if get_data_success else 'ПРОВАЛЕН'}")
        
        if upload_success and get_data_success:
            print("\n🎉 Все тесты пройдены! Парсер работает корректно!")
        else:
            print("\n❌ Некоторые тесты провалены.")
    else:
        print("\n❌ Загрузка файла провалена. Пропускаем остальные тесты.")

if __name__ == "__main__":
    main()

2. Запуск тестов

# Запуск тестового скрипта
python test_your_parser.py

# Проверка через curl
curl -X POST "http://localhost:8000/your_parser/upload" \
  -H "accept: application/json" \
  -H "Content-Type: multipart/form-data" \
  -F "file=@test_file.xlsx"

curl -X POST "http://localhost:8000/your_parser/get_data" \
  -H "accept: application/json" \
  -H "Content-Type: application/json" \
  -d '{"param1": "test_value", "mode": "mode1"}'

📋 Лучшие практики

1. Именование

  • Парсер: YourParser (PascalCase)
  • Файл парсера: your_parser.py (snake_case)
  • Имя парсера: "your_parser" (snake_case)
  • Геттеры: get_data, get_full_data (snake_case)
  • Эндпоинты: /your_parser/upload, /your_parser/get_data

2. Структура данных

# Рекомендуемая структура возвращаемых данных
{
    "installation_id": {
        "data_type1": [
            {"field1": "value1", "field2": "value2"},
            {"field1": "value3", "field2": "value4"}
        ],
        "data_type2": [
            {"field1": "value5", "field2": "value6"}
        ]
    }
}

3. Обработка ошибок

try:
    # Ваш код
    result = some_operation()
    return result
except SpecificException as e:
    print(f"❌ Специфическая ошибка: {e}")
    return {}
except Exception as e:
    print(f"❌ Общая ошибка: {e}")
    raise

4. Логирование

# Используйте эмодзи для разных типов сообщений
print(f"🔍 DEBUG: Отладочная информация")
print(f"📁 INFO: Информационное сообщение")
print(f"✅ SUCCESS: Успешная операция")
print(f"⚠️ WARNING: Предупреждение")
print(f"❌ ERROR: Ошибка")

5. Валидация данных

# Всегда валидируйте входные параметры
validated_params = YourParserRequest(**params)

# Проверяйте наличие данных
if not data_source:
    print("⚠️ Нет данных в парсере")
    return {}

6. Документация

  • Добавляйте docstrings ко всем методам
  • Описывайте параметры в Pydantic схемах
  • Добавляйте примеры в FastAPI эндпоинты
  • Комментируйте сложную логику

📚 Примеры

Пример 1: Простой парсер

class SimpleParser(ParserPort):
    """Простой парсер для демонстрации"""
    
    name = "simple_parser"
    
    def __init__(self):
        super().__init__()
        self.register_getter("get_data", SimpleRequest, self._get_data_wrapper)
    
    def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
        """Парсинг простого Excel файла"""
        df = pd.read_excel(file_path)
        return {"data": df.to_dict('records')}
    
    def _get_data_wrapper(self, params: dict) -> Dict[str, Any]:
        """Обертка для геттера"""
        data_source = self._get_data_source()
        return data_to_json(data_source)

Пример 2: Парсер с множественными геттерами

class MultiGetterParser(ParserPort):
    """Парсер с несколькими геттерами"""
    
    name = "multi_getter_parser"
    
    def __init__(self):
        super().__init__()
        # Регистрируем несколько геттеров
        self.register_getter("get_summary", SummaryRequest, self._get_summary_wrapper)
        self.register_getter("get_details", DetailsRequest, self._get_details_wrapper)
        self.register_getter("get_statistics", StatisticsRequest, self._get_statistics_wrapper)
    
    def _get_summary_wrapper(self, params: dict) -> Dict[str, Any]:
        """Геттер для получения сводки"""
        # Логика получения сводки
        pass
    
    def _get_details_wrapper(self, params: dict) -> Dict[str, Any]:
        """Геттер для получения деталей"""
        # Логика получения деталей
        pass
    
    def _get_statistics_wrapper(self, params: dict) -> Dict[str, Any]:
        """Геттер для получения статистики"""
        # Логика получения статистики
        pass

Пример 3: Парсер с фильтрацией

class FilteredParser(ParserPort):
    """Парсер с продвинутой фильтрацией"""
    
    def _filter_data(self, data_source: Dict[str, Any], params: FilterRequest) -> Dict[str, Any]:
        """Фильтрация данных по параметрам"""
        filtered_data = {}
        
        for key, value in data_source.items():
            # Фильтр по дате
            if params.start_date and value.get('date') < params.start_date:
                continue
            
            # Фильтр по типу
            if params.type and value.get('type') != params.type:
                continue
            
            # Фильтр по статусу
            if params.status and value.get('status') not in params.status:
                continue
            
            filtered_data[key] = value
        
        return filtered_data

🚀 Заключение

Это руководство покрывает все аспекты создания новых парсеров в системе NIN Excel Parsers API. Следуйте этим инструкциям для создания качественных, поддерживаемых и тестируемых парсеров.

Чек-лист для нового парсера:

  • Создана Pydantic схема
  • Создан класс парсера с геттерами
  • Парсер зарегистрирован в __init__.py
  • Парсер зарегистрирован в main.py
  • Добавлена логика в services.py
  • Созданы FastAPI эндпоинты
  • Добавлена вкладка в Streamlit
  • Создан тестовый скрипт
  • Проведено тестирование
  • Обновлена документация

Полезные ссылки:


Удачной разработки! 🚀