Files
python_parser/PARSER_DEVELOPMENT_GUIDE.md

1002 lines
36 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 📚 Руководство по разработке парсеров
Полное руководство по созданию новых парсеров для системы NIN Excel Parsers API.
## 📋 Содержание
1. [Архитектура системы](#архитектура-системы)
2. [Структура проекта](#структура-проекта)
3. [Создание нового парсера](#создание-нового-парсера)
4. [Регистрация парсера](#регистрация-парсера)
5. [Создание API эндпоинтов](#создание-api-эндпоинтов)
6. [Интеграция с Streamlit](#интеграция-с-streamlit)
7. [Тестирование](#тестирование)
8. [Лучшие практики](#лучшие-практики)
9. [Примеры](#примеры)
---
## 🏗️ Архитектура системы
### Hexagonal Architecture
Система построена на принципах **Hexagonal Architecture** (Ports & Adapters):
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Streamlit UI │ │ FastAPI │ │ MinIO Storage │
│ (Adapter) │◄──►│ (Application) │◄──►│ (Adapter) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
┌─────────────────┐
│ ParserPort │
│ (Core) │
└─────────────────┘
┌─────────────────┐
│ Parser │
│ (Adapter) │
└─────────────────┘
```
### Ключевые компоненты
- **ParserPort** - базовый класс для всех парсеров
- **ReportService** - сервис для управления отчетами
- **MinIOStorageAdapter** - адаптер для хранения данных
- **FastAPI** - веб-фреймворк для API
- **Streamlit** - веб-интерфейс
---
## 📁 Структура проекта
```
python_parser/
├── adapters/
│ ├── parsers/ # Парсеры (адаптеры)
│ │ ├── __init__.py
│ │ ├── monitoring_fuel.py
│ │ ├── monitoring_tar.py
│ │ ├── svodka_ca.py
│ │ ├── svodka_pm.py
│ │ ├── svodka_repair_ca.py
│ │ └── statuses_repair_ca.py
│ ├── pconfig.py # Конфигурация парсеров
│ └── storage.py # Адаптер хранилища
├── app/
│ ├── main.py # FastAPI приложение
│ └── schemas/ # Pydantic схемы
│ ├── monitoring_fuel.py
│ ├── monitoring_tar.py
│ ├── svodka_ca.py
│ ├── svodka_pm.py
│ ├── svodka_repair_ca.py
│ └── statuses_repair_ca.py
├── core/
│ ├── models.py # Модели данных
│ ├── ports.py # Базовые порты
│ ├── schema_utils.py # Утилиты схем
│ └── services.py # Сервисы
└── requirements.txt
streamlit_app/
├── streamlit_app.py # Streamlit интерфейс
└── requirements.txt
```
---
## 🔧 Создание нового парсера
### 1. Создание Pydantic схем
Создайте файл схемы в `python_parser/app/schemas/your_parser.py`:
```python
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from enum import Enum
class YourParserMode(str, Enum):
"""Режимы работы парсера"""
MODE1 = "mode1"
MODE2 = "mode2"
class YourParserRequest(BaseModel):
"""Схема запроса для основного геттера"""
param1: Optional[str] = Field(
None,
description="Описание параметра 1",
example="example_value"
)
param2: Optional[List[str]] = Field(
None,
description="Описание параметра 2",
example=["value1", "value2"]
)
mode: Optional[YourParserMode] = Field(
None,
description="Режим работы парсера",
example="mode1"
)
class Config:
json_schema_extra = {
"example": {
"param1": "example_value",
"param2": ["value1", "value2"],
"mode": "mode1"
}
}
class YourParserFullRequest(BaseModel):
"""Схема запроса для получения всех данных"""
# Пустая схема - возвращает все данные без фильтрации
pass
class Config:
json_schema_extra = {
"example": {}
}
```
### 2. Создание парсера
Создайте файл парсера в `python_parser/adapters/parsers/your_parser.py`:
```python
import pandas as pd
import os
import zipfile
import tempfile
from typing import Dict, Any, Optional
from core.ports import ParserPort
from adapters.pconfig import find_header_row, data_to_json
from app.schemas.your_parser import YourParserRequest, YourParserFullRequest
class YourParser(ParserPort):
"""Парсер для вашего типа данных"""
name = "your_parser" # Уникальное имя парсера
def __init__(self):
super().__init__()
# Регистрируем геттеры
self.register_getter("get_data", YourParserRequest, self._get_data_wrapper)
self.register_getter("get_full_data", YourParserFullRequest, self._get_full_data_wrapper)
# Данные парсера
self.data_dict = {}
def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
"""Основной метод парсинга"""
print(f"🔍 DEBUG: YourParser.parse вызван с файлом: {file_path}")
try:
# Проверяем тип файла (пример для ZIP-only парсера)
if not file_path.endswith('.zip'):
raise ValueError(f"Неподдерживаемый тип файла: {file_path}. Ожидается только ZIP архив.")
# Обрабатываем ZIP архив
result = self._parse_zip_archive(file_path)
# Сохраняем результат
self.data_dict = result
print(f"✅ Парсинг завершен. Получено {len(result)} записей")
return result
except Exception as e:
print(f"❌ Ошибка при парсинге: {e}")
raise
def _parse_zip_archive(self, zip_path: str) -> Dict[str, Any]:
"""Парсинг ZIP архива"""
print(f"📦 Обработка ZIP архива: {zip_path}")
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Ищем нужные файлы
target_files = []
for root, dirs, files in os.walk(temp_dir):
for file in files:
if file.endswith(('.xlsx', '.xls')):
target_files.append(os.path.join(root, file))
if not target_files:
raise ValueError("В архиве не найдены поддерживаемые файлы")
# Парсим все найденные файлы
all_data = {}
for file_path in target_files:
file_data = self._parse_single_file(file_path)
# Объединяем данные
all_data.update(file_data)
return all_data
def _parse_single_file(self, file_path: str) -> Dict[str, Any]:
"""Парсинг одного файла"""
print(f"📁 Обработка файла: {file_path}")
try:
# Читаем Excel файл
excel_file = pd.ExcelFile(file_path)
available_sheets = excel_file.sheet_names
# Обрабатываем нужные листы
result_data = {}
for sheet_name in available_sheets:
if self._should_process_sheet(sheet_name):
sheet_data = self._parse_sheet(file_path, sheet_name)
result_data.update(sheet_data)
return result_data
except Exception as e:
print(f"❌ Ошибка при обработке файла {file_path}: {e}")
return {}
def _should_process_sheet(self, sheet_name: str) -> bool:
"""Определяет, нужно ли обрабатывать лист"""
# Логика фильтрации листов
return True # Или ваша логика
def _parse_sheet(self, file_path: str, sheet_name: str) -> Dict[str, Any]:
"""Парсинг конкретного листа"""
try:
# Находим заголовок
header_num = find_header_row(file_path, sheet_name, search_value="1")
if header_num is None:
print(f"❌ Не найден заголовок в листе {sheet_name}")
return {}
# Читаем данные
df = pd.read_excel(
file_path,
sheet_name=sheet_name,
header=header_num,
index_col=None
)
# Обрабатываем данные
processed_data = self._process_dataframe(df, sheet_name)
return processed_data
except Exception as e:
print(f"❌ Ошибка при обработке листа {sheet_name}: {e}")
return {}
def _process_dataframe(self, df: pd.DataFrame, sheet_name: str) -> Dict[str, Any]:
"""Обработка DataFrame"""
# Ваша логика обработки данных
return {"sheet_name": sheet_name, "data": df.to_dict('records')}
def _get_data_wrapper(self, params: dict) -> Dict[str, Any]:
"""Обертка для основного геттера"""
print(f"🔍 DEBUG: _get_data_wrapper вызван с параметрами: {params}")
# Валидируем параметры
validated_params = YourParserRequest(**params)
# Получаем данные из парсера
data_source = self._get_data_source()
if not data_source:
print("⚠️ Нет данных в парсере")
return {}
# Фильтруем данные по параметрам
filtered_data = self._filter_data(data_source, validated_params)
# Конвертируем в JSON
try:
result_json = data_to_json(filtered_data)
return result_json
except Exception as e:
print(f"❌ Ошибка при конвертации данных в JSON: {e}")
return {}
def _get_full_data_wrapper(self, params: dict) -> Dict[str, Any]:
"""Обертка для геттера всех данных"""
print(f"🔍 DEBUG: _get_full_data_wrapper вызван с параметрами: {params}")
# Получаем данные из парсера
data_source = self._get_data_source()
if not data_source:
print("⚠️ Нет данных в парсере")
return {}
# Конвертируем все данные в JSON
try:
result_json = data_to_json(data_source)
return result_json
except Exception as e:
print(f"❌ Ошибка при конвертации данных в JSON: {e}")
return {}
def _get_data_source(self) -> Dict[str, Any]:
"""Получает источник данных"""
if hasattr(self, 'df') and self.df is not None:
# Данные загружены из MinIO
if isinstance(self.df, dict):
return self.df
else:
return {}
elif hasattr(self, 'data_dict') and self.data_dict:
# Данные из локального парсинга
return self.data_dict
else:
return {}
def _filter_data(self, data_source: Dict[str, Any], params: YourParserRequest) -> Dict[str, Any]:
"""Фильтрует данные по параметрам"""
# Ваша логика фильтрации
return data_source
```
---
## 📝 Регистрация парсера
### 1. Регистрация в __init__.py
Добавьте импорт в `python_parser/adapters/parsers/__init__.py`:
```python
from .monitoring_fuel import MonitoringFuelParser
from .monitoring_tar import MonitoringTarParser
from .your_parser import YourParser # Добавить эту строку
from .svodka_ca import SvodkaCAParser
from .svodka_pm import SvodkaPMParser
from .svodka_repair_ca import SvodkaRepairCAParser
from .statuses_repair_ca import StatusesRepairCAParser
__all__ = [
'MonitoringFuelParser',
'MonitoringTarParser',
'YourParser', # Добавить эту строку
'SvodkaCAParser',
'SvodkaPMParser',
'SvodkaRepairCAParser',
'StatusesRepairCAParser'
]
```
### 2. Регистрация в main.py
Добавьте импорт и регистрацию в `python_parser/app/main.py`:
```python
# Импорты
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser, MonitoringTarParser, YourParser, SvodkaRepairCAParser, StatusesRepairCAParser
from app.schemas.your_parser import YourParserRequest, YourParserFullRequest
# Регистрация парсера
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
'monitoring_tar': MonitoringTarParser,
'your_parser': YourParser, # Добавить эту строку
'svodka_repair_ca': SvodkaRepairCAParser,
'statuses_repair_ca': StatusesRepairCAParser,
})
```
### 3. Регистрация в services.py
Добавьте логику выбора геттера в `python_parser/core/services.py`:
```python
elif request.report_type == 'monitoring_tar':
# Для monitoring_tar используем геттер get_tar_data
getter_name = 'get_tar_data'
elif request.report_type == 'your_parser': # Добавить эту секцию
# Для your_parser используем геттер get_data
getter_name = 'get_data'
elif request.report_type == 'monitoring_fuel':
```
---
## 🌐 Создание API эндпоинтов
Добавьте эндпоинты в `python_parser/app/main.py`:
```python
# ====== YOUR PARSER ENDPOINTS ======
@app.post("/your_parser/upload", tags=[YourParser.name],
summary="Загрузка отчета вашего типа")
async def upload_your_parser(
file: UploadFile = File(...)
):
"""Загрузка и обработка отчета вашего типа
### Поддерживаемые форматы:
- **ZIP архивы** с файлами (только ZIP)
### Структура данных:
- Описание структуры данных
- Какие данные извлекаются
- Формат возвращаемых данных
"""
report_service = get_report_service()
try:
# Проверяем тип файла - только ZIP архивы
if not file.filename.endswith('.zip'):
raise HTTPException(
status_code=400,
detail="Неподдерживаемый тип файла. Ожидается только ZIP архив (.zip)"
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос на загрузку
upload_request = UploadRequest(
report_type='your_parser',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(upload_request)
if result.success:
return UploadResponse(
success=True,
message="Отчет успешно загружен и обработан",
report_id=result.object_id,
filename=file.filename
).model_dump()
else:
return UploadErrorResponse(
success=False,
message=result.message,
error_code="ERR_UPLOAD",
details=None
).model_dump()
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/your_parser/get_data", tags=[YourParser.name],
summary="Получение данных из отчета")
async def get_your_parser_data(
request_data: YourParserRequest
):
"""Получение данных из отчета
### Структура параметров:
- `param1`: **Описание параметра 1** (опциональный)
- `param2`: **Описание параметра 2** (опциональный)
- `mode`: **Режим работы** (опциональный)
### Пример тела запроса:
```json
{
"param1": "example_value",
"param2": ["value1", "value2"],
"mode": "mode1"
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='your_parser',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/your_parser/get_full_data", tags=[YourParser.name],
summary="Получение всех данных из отчета")
async def get_your_parser_full_data():
"""Получение всех данных из отчета без фильтрации
### Возвращает:
- Все данные без фильтрации
- Полная структура данных
"""
report_service = get_report_service()
try:
# Создаем запрос без параметров
request = DataRequest(
report_type='your_parser',
get_params={}
)
# Получаем данные через геттер get_full_data
result = report_service.get_data(request, getter_name='get_full_data')
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
```
---
## 🎨 Интеграция с Streamlit
### 1. Добавление новой вкладки
В `streamlit_app/streamlit_app.py`:
```python
# Обновите список вкладок
tab1, tab2, tab3, tab4, tab5, tab6, tab7 = st.tabs([
"📊 Сводки ПМ",
"🏭 Сводки СА",
"⛽ Мониторинг топлива",
"⚡ Мониторинг ТЭР",
"🔧 Ремонт СА",
"📋 Статусы ремонта СА",
"🆕 Ваш парсер" # Добавить новую вкладку
])
# Добавьте новую вкладку
with tab7:
st.header("🆕 Ваш парсер")
# Секция загрузки файлов
st.subheader("📤 Загрузка файлов")
uploaded_file = st.file_uploader(
"Выберите ZIP архив для вашего парсера",
type=['zip'],
key="your_parser_upload"
)
if uploaded_file is not None:
if st.button("📤 Загрузить файл", key="your_parser_upload_btn"):
with st.spinner("Загружаем файл..."):
file_data = uploaded_file.read()
result, status_code = upload_file_to_api("/your_parser/upload", file_data, uploaded_file.name)
if status_code == 200:
st.success("✅ Файл успешно загружен!")
st.json(result)
else:
st.error(f"❌ Ошибка загрузки: {result}")
# Секция получения данных
st.subheader("📊 Получение данных")
col1, col2 = st.columns(2)
with col1:
st.subheader("🔍 Фильтрованные данные")
# Параметры запроса
param1 = st.text_input("Параметр 1:", key="your_parser_param1")
param2 = st.multiselect("Параметр 2:", ["value1", "value2", "value3"], key="your_parser_param2")
mode = st.selectbox("Режим:", ["mode1", "mode2"], key="your_parser_mode")
if st.button("📊 Получить данные", key="your_parser_get_data_btn"):
with st.spinner("Получаем данные..."):
request_data = {
"param1": param1 if param1 else None,
"param2": param2 if param2 else None,
"mode": mode if mode else None
}
result, status_code = make_api_request("/your_parser/get_data", request_data)
if status_code == 200 and result.get("success"):
st.success("✅ Данные успешно получены!")
# Показываем данные
data = result.get("data", {}).get("value", {})
if data:
st.subheader("📋 Результат:")
st.json(data)
else:
st.info("📋 Нет данных для отображения")
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
with col2:
st.subheader("📋 Все данные")
if st.button("📊 Получить все данные", key="your_parser_get_full_data_btn"):
with st.spinner("Получаем все данные..."):
result, status_code = make_api_request("/your_parser/get_full_data", {})
if status_code == 200 and result.get("success"):
st.success("✅ Все данные успешно получены!")
# Показываем данные
data = result.get("data", {}).get("value", {})
if data:
st.subheader("📋 Результат:")
st.json(data)
else:
st.info("📋 Нет данных для отображения")
else:
st.error(f"❌ Ошибка: {result.get('message', 'Неизвестная ошибка')}")
```
### 2. Обновление информации о проекте
```python
# В секции "О проекте"
**Возможности:**
- 📊 Парсинг сводок ПМ (план и факт)
- 🏭 Парсинг сводок СА
- Мониторинг топлива
- Мониторинг ТЭР (Топливно-энергетические ресурсы)
- 🔧 Управление ремонтными работами СА
- 📋 Мониторинг статусов ремонта СА
- 🆕 Ваш новый парсер # Добавить эту строку
```
---
## 🧪 Тестирование
### 1. Создание тестового скрипта
Создайте `test_your_parser.py`:
```python
#!/usr/bin/env python3
"""
Тестовый скрипт для проверки парсера Your Parser
"""
import requests
import os
API_BASE_URL = "http://localhost:8000"
def test_upload_file():
"""Тест загрузки файла"""
print("🔍 Тестируем загрузку файла...")
# Используем тестовый файл
file_path = "path/to/your/test/file.xlsx"
if not os.path.exists(file_path):
print(f"❌ Файл не найден: {file_path}")
return False
print(f"📁 Найден файл: {file_path}")
with open(file_path, 'rb') as f:
file_content = f.read()
files = {
'file': ('test_file.xlsx', file_content, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
}
try:
response = requests.post(f"{API_BASE_URL}/your_parser/upload", files=files)
print(f"📊 Статус ответа: {response.status_code}")
print(f"📄 Ответ: {response.text}")
if response.status_code == 200:
result = response.json()
if result.get("success"):
print("✅ Загрузка успешна!")
return True
else:
print("❌ Ошибка загрузки!")
return False
else:
print("❌ Ошибка HTTP!")
return False
except Exception as e:
print(f"❌ Исключение: {e}")
return False
def test_get_data():
"""Тест получения данных"""
print("\n🔍 Тестируем получение данных...")
test_data = {
"param1": "test_value",
"param2": ["value1", "value2"],
"mode": "mode1"
}
try:
response = requests.post(f"{API_BASE_URL}/your_parser/get_data", json=test_data)
print(f"📊 Статус ответа: {response.status_code}")
if response.status_code == 200:
result = response.json()
if result.get("success"):
print("✅ Получение данных успешно!")
data = result.get("data", {}).get("value", {})
print(f"📊 Получено данных: {len(data)} записей")
return True
else:
print("❌ Ошибка получения данных!")
print(f"📄 Ответ: {response.text}")
return False
else:
print("❌ Ошибка HTTP!")
print(f"📄 Ответ: {response.text}")
return False
except Exception as e:
print(f"❌ Исключение: {e}")
return False
def main():
"""Основная функция тестирования"""
print("🚀 Тестирование парсера Your Parser")
print("=" * 70)
# Тест 1: Загрузка файла
upload_success = test_upload_file()
if upload_success:
# Тест 2: Получение данных
get_data_success = test_get_data()
print("\n" + "=" * 70)
print("📋 Результаты тестирования:")
print(f" ✅ Загрузка файла: {'ПРОЙДЕН' if upload_success else 'ПРОВАЛЕН'}")
print(f" ✅ Получение данных: {'ПРОЙДЕН' if get_data_success else 'ПРОВАЛЕН'}")
if upload_success and get_data_success:
print("\n🎉 Все тесты пройдены! Парсер работает корректно!")
else:
print("\n❌ Некоторые тесты провалены.")
else:
print("\n❌ Загрузка файла провалена. Пропускаем остальные тесты.")
if __name__ == "__main__":
main()
```
### 2. Запуск тестов
```bash
# Запуск тестового скрипта
python test_your_parser.py
# Проверка через curl
curl -X POST "http://localhost:8000/your_parser/upload" \
-H "accept: application/json" \
-H "Content-Type: multipart/form-data" \
-F "file=@test_file.xlsx"
curl -X POST "http://localhost:8000/your_parser/get_data" \
-H "accept: application/json" \
-H "Content-Type: application/json" \
-d '{"param1": "test_value", "mode": "mode1"}'
```
---
## 📋 Лучшие практики
### 1. Именование
- **Парсер**: `YourParser` (PascalCase)
- **Файл парсера**: `your_parser.py` (snake_case)
- **Имя парсера**: `"your_parser"` (snake_case)
- **Геттеры**: `get_data`, `get_full_data` (snake_case)
- **Эндпоинты**: `/your_parser/upload`, `/your_parser/get_data`
### 2. Структура данных
```python
# Рекомендуемая структура возвращаемых данных
{
"installation_id": {
"data_type1": [
{"field1": "value1", "field2": "value2"},
{"field1": "value3", "field2": "value4"}
],
"data_type2": [
{"field1": "value5", "field2": "value6"}
]
}
}
```
### 3. Обработка ошибок
```python
try:
# Ваш код
result = some_operation()
return result
except SpecificException as e:
print(f"❌ Специфическая ошибка: {e}")
return {}
except Exception as e:
print(f"❌ Общая ошибка: {e}")
raise
```
### 4. Логирование
```python
# Используйте эмодзи для разных типов сообщений
print(f"🔍 DEBUG: Отладочная информация")
print(f"📁 INFO: Информационное сообщение")
print(f"✅ SUCCESS: Успешная операция")
print(f"⚠️ WARNING: Предупреждение")
print(f"❌ ERROR: Ошибка")
```
### 5. Валидация данных
```python
# Всегда валидируйте входные параметры
validated_params = YourParserRequest(**params)
# Проверяйте наличие данных
if not data_source:
print("⚠️ Нет данных в парсере")
return {}
```
### 6. Документация
- Добавляйте docstrings ко всем методам
- Описывайте параметры в Pydantic схемах
- Добавляйте примеры в FastAPI эндпоинты
- Комментируйте сложную логику
---
## 📚 Примеры
### Пример 1: Простой парсер
```python
class SimpleParser(ParserPort):
"""Простой парсер для демонстрации"""
name = "simple_parser"
def __init__(self):
super().__init__()
self.register_getter("get_data", SimpleRequest, self._get_data_wrapper)
def parse(self, file_path: str, params: dict) -> Dict[str, Any]:
"""Парсинг простого Excel файла"""
df = pd.read_excel(file_path)
return {"data": df.to_dict('records')}
def _get_data_wrapper(self, params: dict) -> Dict[str, Any]:
"""Обертка для геттера"""
data_source = self._get_data_source()
return data_to_json(data_source)
```
### Пример 2: Парсер с множественными геттерами
```python
class MultiGetterParser(ParserPort):
"""Парсер с несколькими геттерами"""
name = "multi_getter_parser"
def __init__(self):
super().__init__()
# Регистрируем несколько геттеров
self.register_getter("get_summary", SummaryRequest, self._get_summary_wrapper)
self.register_getter("get_details", DetailsRequest, self._get_details_wrapper)
self.register_getter("get_statistics", StatisticsRequest, self._get_statistics_wrapper)
def _get_summary_wrapper(self, params: dict) -> Dict[str, Any]:
"""Геттер для получения сводки"""
# Логика получения сводки
pass
def _get_details_wrapper(self, params: dict) -> Dict[str, Any]:
"""Геттер для получения деталей"""
# Логика получения деталей
pass
def _get_statistics_wrapper(self, params: dict) -> Dict[str, Any]:
"""Геттер для получения статистики"""
# Логика получения статистики
pass
```
### Пример 3: Парсер с фильтрацией
```python
class FilteredParser(ParserPort):
"""Парсер с продвинутой фильтрацией"""
def _filter_data(self, data_source: Dict[str, Any], params: FilterRequest) -> Dict[str, Any]:
"""Фильтрация данных по параметрам"""
filtered_data = {}
for key, value in data_source.items():
# Фильтр по дате
if params.start_date and value.get('date') < params.start_date:
continue
# Фильтр по типу
if params.type and value.get('type') != params.type:
continue
# Фильтр по статусу
if params.status and value.get('status') not in params.status:
continue
filtered_data[key] = value
return filtered_data
```
---
## 🚀 Заключение
Это руководство покрывает все аспекты создания новых парсеров в системе NIN Excel Parsers API. Следуйте этим инструкциям для создания качественных, поддерживаемых и тестируемых парсеров.
### Чек-лист для нового парсера:
- [ ] Создана Pydantic схема
- [ ] Создан класс парсера с геттерами
- [ ] Парсер зарегистрирован в `__init__.py`
- [ ] Парсер зарегистрирован в `main.py`
- [ ] Добавлена логика в `services.py`
- [ ] Созданы FastAPI эндпоинты
- [ ] Добавлена вкладка в Streamlit
- [ ] Создан тестовый скрипт
- [ ] Проведено тестирование
- [ ] Обновлена документация
### Полезные ссылки:
- [FastAPI Documentation](https://fastapi.tiangolo.com/)
- [Pydantic Documentation](https://pydantic-docs.helpmanual.io/)
- [Streamlit Documentation](https://docs.streamlit.io/)
- [Pandas Documentation](https://pandas.pydata.org/docs/)
---
**Удачной разработки! 🚀**