87 lines
3.0 KiB
Python
87 lines
3.0 KiB
Python
"""
|
||
Модуль для работы с API
|
||
"""
|
||
import requests
|
||
from typing import Dict, Any, List, Tuple
|
||
from config import API_BASE_URL, API_PUBLIC_URL
|
||
|
||
|
||
def check_api_health() -> bool:
|
||
"""Проверка доступности API"""
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/", timeout=5)
|
||
return response.status_code == 200
|
||
except:
|
||
return False
|
||
|
||
|
||
def get_available_parsers() -> List[str]:
|
||
"""Получение списка доступных парсеров"""
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/parsers")
|
||
if response.status_code == 200:
|
||
return response.json()["parsers"]
|
||
return []
|
||
except:
|
||
return []
|
||
|
||
|
||
def get_server_info() -> Dict[str, Any]:
|
||
"""Получение информации о сервере"""
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/server-info")
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
return {}
|
||
except:
|
||
return {}
|
||
|
||
|
||
def get_system_ogs() -> Dict[str, Any]:
|
||
"""Получение системного списка ОГ из pconfig"""
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/system/ogs")
|
||
if response.status_code == 200:
|
||
return response.json()
|
||
return {"single_ogs": [], "og_ids": {}}
|
||
except:
|
||
return {"single_ogs": [], "og_ids": {}}
|
||
|
||
|
||
def upload_file_to_api(endpoint: str, file_data: bytes, filename: str) -> Tuple[Dict[str, Any], int]:
|
||
"""Загрузка файла на API"""
|
||
try:
|
||
# Определяем правильное имя поля в зависимости от эндпоинта
|
||
if "zip" in endpoint:
|
||
files = {"zip_file": (filename, file_data, "application/zip")}
|
||
else:
|
||
files = {"file": (filename, file_data, "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")}
|
||
|
||
response = requests.post(f"{API_BASE_URL}{endpoint}", files=files)
|
||
return response.json(), response.status_code
|
||
except Exception as e:
|
||
return {"error": str(e)}, 500
|
||
|
||
|
||
def make_api_request(endpoint: str, data: Dict[str, Any]) -> Tuple[Dict[str, Any], int]:
|
||
"""Выполнение API запроса"""
|
||
try:
|
||
response = requests.post(f"{API_BASE_URL}{endpoint}", json=data)
|
||
return response.json(), response.status_code
|
||
except Exception as e:
|
||
return {"error": str(e)}, 500
|
||
|
||
|
||
def get_available_ogs(parser_name: str) -> List[str]:
|
||
"""Получение доступных ОГ для парсера"""
|
||
try:
|
||
response = requests.get(f"{API_BASE_URL}/parsers/{parser_name}/available_ogs")
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
return data.get("available_ogs", [])
|
||
else:
|
||
print(f"⚠️ Ошибка получения ОГ: {response.status_code}")
|
||
return []
|
||
except Exception as e:
|
||
print(f"⚠️ Ошибка при запросе ОГ: {e}")
|
||
return [] |