This commit is contained in:
2025-08-26 23:33:29 +03:00
commit 3b238ae283
110 changed files with 3837 additions and 0 deletions

9
python_parser/.env Normal file
View File

@@ -0,0 +1,9 @@
# Database
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/svodka_db
# MinIO
MINIO_ENDPOINT=localhost:9000
MINIO_ACCESS_KEY=minioadmin
MINIO_SECRET_KEY=minioadmin
MINIO_BUCKET=svodka-data
MINIO_SECURE=false

20
python_parser/Dockerfile Normal file
View File

@@ -0,0 +1,20 @@
FROM repo-dev.predix.rosneft.ru/python:3.11-slim
WORKDIR /app
# RUN pip install kafka-python==2.0.2
# RUN pip freeze > /app/requirements.txt
# ADD . /app
COPY requirements.txt .
RUN mkdir -p vendor
RUN pip download -r /app/requirements.txt --no-binary=:none: -d /app/vendor
# ADD . /app
# ENV KAFKA_BROKER=10.234.160.10:9093,10.234.160.10:9094,10.234.160.10:9095
# ENV KAFKA_UPDATE_ALGORITHM_RULES_TOPIC=algorithm-rule-update
# ENV KAFKA_CLIENT_USERNAME=cf-service
# CMD ["python", "/app/run_dev.py"]

1
python_parser/Procfile Normal file
View File

@@ -0,0 +1 @@
web: python /app/run_stand.py

View File

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,9 @@
from .monitoring_fuel import MonitoringFuelParser
from .svodka_ca import SvodkaCAParser
from .svodka_pm import SvodkaPMParser
__all__ = [
'MonitoringFuelParser',
'SvodkaCAParser',
'SvodkaPMParser'
]

View File

@@ -0,0 +1,206 @@
import pandas as pd
import re
from typing import Dict
from core.ports import ParserPort
from adapters.pconfig import data_to_json, get_object_by_name
class MonitoringFuelParser(ParserPort):
"""Парсер для мониторинга топлива"""
name = "Мониторинг топлива"
def find_header_row(self, file_path: str, sheet: str, search_value: str = "Установка", max_rows: int = 50) -> int:
"""Определение индекса заголовка в Excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file_path,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx + 1 # возвращаем индекс строки (0-based)
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_single(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного объекта'''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Установка")
# Читаем весь лист, начиная с найденной строки как заголовок
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
)
# === Удаление полностью пустых столбцов ===
df_clean = df_full.replace(r'^\s*$', pd.NA, regex=True) # заменяем пустые строки на NA
df_clean = df_clean.dropna(axis=1, how='all') # удаляем столбцы, где все значения — NA
df_full = df_full.loc[:, df_clean.columns] # оставляем только непустые столбцы
# === Переименовываем нужные столбцы по позициям ===
if len(df_full.columns) < 2:
raise ValueError("DataFrame должен содержать как минимум 2 столбца.")
new_columns = df_full.columns.tolist()
new_columns[0] = 'name'
new_columns[1] = 'normativ'
new_columns[-2] = 'total'
new_columns[-1] = 'total_1'
df_full.columns = new_columns
# Проверяем, что колонка 'name' существует
if 'name' in df_full.columns:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_full['id'] = df_full['name'].apply(get_object_by_name)
# Устанавливаем id как индекс
df_full.set_index('id', inplace=True)
print(f"Окончательное количество столбцов: {len(df_full.columns)}")
return df_full
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
df_monitorings = {} # ЭТО СЛОВАРЬ ДАТАФРЕЙМОВ, ГДЕ КЛЮЧ - НОМЕР МЕСЯЦА, ЗНАЧЕНИЕ - ДАТАФРЕЙМ
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for month in range(1, 13):
mm = f"{month:02d}"
file_temp = f'monitoring_SNPZ_{mm}.xlsm'
candidates = [f for f in file_list if file_temp in f]
if len(candidates) == 1:
file = candidates[0]
print(f'Загрузка {file}')
with zip_ref.open(file) as excel_file:
try:
df = self.parse_single(excel_file, 'Мониторинг потребления')
df_monitorings[mm] = df
print(f"✅ Данные за месяц {mm} загружены")
except Exception as e:
print(f"Ошибка при загрузке файла {file_temp}: {e}")
else:
print(f"⚠️ Файл не найден: {file_temp}")
return df_monitorings
def aggregate_by_columns(self, df_dict: Dict[str, pd.DataFrame], columns):
''' Служебная функция. Агрегация данных по среднему по определенным колонкам. '''
all_data = {} # Для хранения полных данных (месяцы) по каждой колонке
means = {} # Для хранения средних
for col in columns:
totals = [] # Список Series (по месяцам) для текущей колонки
for file_key, df in df_dict.items():
if col not in df.columns:
print(f"Колонка '{col}' не найдена в {file_key}, пропускаем.")
continue
# Берём колонку, оставляем id как индекс
series = df[col].copy()
# Определяем имя месяца: извлекаем номер из ключа (например, '03' из '03')
# Если ключ уже '03', '04' и т.п. — используем как есть
match = re.search(r'\d{2}', str(file_key))
month = match.group() if match else file_key
series.name = month
totals.append(series)
if not totals:
raise ValueError(f"Ни один DataFrame не содержит колонку '{col}'")
# Объединяем все Series в один DataFrame (id × месяцы)
df_combined = pd.concat(totals, axis=1)
all_data[col] = df_combined # Сохраняем
# Считаем среднее по строкам (по месяцам), игнорируя NaN
mean_series = df_combined.mean(axis=1)
mean_series = mean_series.dropna() # Убираем id, где нет данных вообще
means[col] = mean_series
# Собираем все средние в один DataFrame
df_means = pd.DataFrame(means)
return df_means, all_data
def get_month(self, df_monitorings, month_key):
''' Служебная функция. Выгрузить только конкретный месяц '''
if month_key not in df_monitorings:
raise KeyError(f"Месяц '{month_key}' не найден в df_monitorings. Доступные: {list(df_monitorings.keys())}")
df = df_monitorings[month_key]
# Создаём копию, чтобы не изменять оригинальный словарь
result_df = df.copy()
# Удаляем колонку 'name', если она существует
if 'name' in result_df.columns:
result_df = result_df.drop(columns=['name'])
return result_df
def aggregate_total_by_id(self, df_dict: Dict[str, pd.DataFrame], column: str):
"""Агрегация данных по ID"""
totals = []
for file, df in df_dict.items():
if column not in df.columns:
print(f"Колонка '{column}' не найдена в {file}, пропускаем.")
continue
# Берём колонку и сохраняем как Series с именем месяца
series = df[column].copy()
series.name = re.sub(r'\D', '', file) # Имя Series будет использовано как имя колонки в concat
totals.append(series)
if not totals:
raise ValueError(f"Ни один DataFrame не содержит колонку '{column}'")
df_combined = pd.concat(totals, axis=1)
# Считаем среднее по строкам (по месяцам)
total = df_combined.mean(axis=1)
total = total.dropna()
total.name = 'mean'
return total, df_combined
def get_value(self, df, params):
mode = params.get("mode", "total")
columns = params.get("columns", None)
month = params.get("month", None)
data = None
if mode == "total":
if not columns:
raise ValueError("Отсутствуют идентификаторы столбцов")
df_means, _ = self.aggregate_by_columns(df, columns)
data = df_means.to_dict(orient='index')
elif mode == "month":
if not month:
raise ValueError("Отсутствуют идентификатор месяца")
df_month = self.get_month(df, month)
data = df_month.to_dict(orient='index')
json_result = data_to_json(data)
return json_result

View File

@@ -0,0 +1,324 @@
import pandas as pd
import numpy as np
from core.ports import ParserPort
from adapters.pconfig import get_og_by_name
class SvodkaCAParser(ParserPort):
"""Парсер для сводки СА"""
name = "Сводка СА"
def extract_all_tables(self, file_path, sheet_name=0):
"""Извлекает все таблицы из Excel файла"""
df = pd.read_excel(file_path, sheet_name=sheet_name, header=None)
df_filled = df.fillna('')
df_clean = df_filled.astype(str).replace(r'^\s*$', '', regex=True)
non_empty_rows = ~(df_clean.eq('').all(axis=1))
non_empty_cols = ~(df_clean.eq('').all(axis=0))
row_indices = non_empty_rows[non_empty_rows].index.tolist()
col_indices = non_empty_cols[non_empty_cols].index.tolist()
if not row_indices or not col_indices:
return []
row_blocks = self._get_contiguous_blocks(row_indices)
col_blocks = self._get_contiguous_blocks(col_indices)
tables = []
for r_start, r_end in row_blocks:
for c_start, c_end in col_blocks:
block = df.iloc[r_start:r_end + 1, c_start:c_end + 1]
if block.empty or block.fillna('').astype(str).replace(r'^\s*$', '', regex=True).eq('').all().all():
continue
if self._is_header_row(block.iloc[0]):
block.columns = block.iloc[0]
block = block.iloc[1:].reset_index(drop=True)
else:
block = block.reset_index(drop=True)
block.columns = [f"col_{i}" for i in range(block.shape[1])]
tables.append(block)
return tables
def _get_contiguous_blocks(self, indices):
"""Группирует индексы в непрерывные блоки"""
if not indices:
return []
blocks = []
start = indices[0]
for i in range(1, len(indices)):
if indices[i] != indices[i-1] + 1:
blocks.append((start, indices[i-1]))
start = indices[i]
blocks.append((start, indices[-1]))
return blocks
def _is_header_row(self, series):
"""Определяет, похожа ли строка на заголовок"""
series_str = series.astype(str).str.strip()
non_empty = series_str[series_str != '']
if len(non_empty) == 0:
return False
def is_not_numeric(val):
try:
float(val.replace(',', '.'))
return False
except (ValueError, TypeError):
return True
not_numeric_count = non_empty.apply(is_not_numeric).sum()
return not_numeric_count / len(non_empty) > 0.6
def _get_og_by_name(self, name):
"""Функция для получения ID по имени (упрощенная версия)"""
# Упрощенная версия - возвращаем имя как есть
if not name or not isinstance(name, str):
return None
return name.strip()
def parse_sheet(self, file_path, sheet_name, inclusion_list):
"""Собственно функция парсинга отчета СА"""
# === Извлечение и фильтрация ===
tables = self.extract_all_tables(file_path, sheet_name)
# Фильтруем таблицы: оставляем только те, где первая строка содержит нужные заголовки
filtered_tables = []
for table in tables:
if table.empty:
continue
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
if any(val in inclusion_list for val in first_row_values):
filtered_tables.append(table)
tables = filtered_tables
# === Итоговый список таблиц датафреймов ===
result_list = []
for table in tables:
if table.empty:
continue
# Получаем первую строку (до удаления)
first_row_values = table.iloc[0].astype(str).str.strip().tolist()
# Находим, какой элемент из inclusion_list присутствует
matched_key = None
for val in first_row_values:
if val in inclusion_list:
matched_key = val
break # берём первый совпадающий заголовок
if matched_key is None:
continue # на всякий случай (хотя уже отфильтровано)
# Удаляем первую строку (заголовок) и сбрасываем индекс
df_cleaned = table.iloc[1:].copy().reset_index(drop=True)
# Пропускаем, если таблица пустая
if df_cleaned.empty:
continue
# Первая строка становится заголовком
new_header = df_cleaned.iloc[0] # извлекаем первую строку как потенциальные названия столбцов
# Преобразуем заголовок: только первый столбец может быть заменён на "name"
cleaned_header = []
# Обрабатываем первый столбец отдельно
first_item = new_header.iloc[0] if isinstance(new_header, pd.Series) else new_header[0]
first_item_str = str(first_item).strip() if pd.notna(first_item) else ""
if first_item_str == "" or first_item_str == "nan":
cleaned_header.append("name")
else:
cleaned_header.append(first_item_str)
# Остальные столбцы добавляем без изменений (или с минимальной очисткой)
for item in new_header[1:]:
# Опционально: приводим к строке и убираем лишние пробелы, но не заменяем на "name"
item_str = str(item).strip() if pd.notna(item) else ""
cleaned_header.append(item_str)
# Применяем очищенные названия столбцов
df_cleaned = df_cleaned[1:] # удаляем строку с заголовком
df_cleaned.columns = cleaned_header
df_cleaned = df_cleaned.reset_index(drop=True)
if matched_key.endswith('**'):
cleaned_key = matched_key[:-2] # удаляем последние **
else:
cleaned_key = matched_key
# Добавляем новую колонку с именем параметра
df_cleaned["table"] = cleaned_key
# Проверяем, что колонка 'name' существует
if 'name' not in df_cleaned.columns:
print(
f"Внимание: колонка 'name' отсутствует в таблице для '{matched_key}'. Пропускаем добавление 'id'.")
continue # или обработать по-другому
else:
# Применяем функцию get_id_by_name к каждой строке в колонке 'name'
df_cleaned['id'] = df_cleaned['name'].apply(get_og_by_name)
# Удаляем строки, где id — None, NaN или пустой
df_cleaned = df_cleaned.dropna(subset=['id']) # dropna удаляет NaN
# Дополнительно: удаляем None (если не поймал dropna)
df_cleaned = df_cleaned[df_cleaned['id'].notna() & (df_cleaned['id'].astype(str) != 'None')]
# Добавляем в словарь
result_list.append(df_cleaned)
# === Объединение и сортировка по id (индекс) и table ===
if result_list:
combined_df = pd.concat(result_list, axis=0)
# Сортируем по индексу (id) и по столбцу 'table'
combined_df = combined_df.sort_values(by=['id', 'table'], axis=0)
# Устанавливаем id как индекс
# combined_df.set_index('id', inplace=True)
return combined_df
else:
return None
def parse(self, file_path: str, params: dict) -> dict:
"""Парсинг файла сводки СА"""
# === Точка входа. Нужно выгрузить три таблицы: План, Факт и Норматив ===
# Выгружаем План в df_ca_plan
inclusion_list_plan = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
df_ca_plan = self.parse_sheet(file_path, 'План', inclusion_list_plan) # ЭТО ДАТАФРЕЙМ ПЛАНА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный План: {df_ca_plan.shape} ---")
# Выгружаем Факт
inclusion_list_fact = {
"ТиП, %",
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн",
"в т.ч. Идентифицированные безвозвратные потери, %",
"в т.ч. Неидентифицированные потери, тонн",
"в т.ч. Неидентифицированные потери, %"
}
df_ca_fact = self.parse_sheet(file_path, 'Факт', inclusion_list_fact) # ЭТО ДАТАФРЕЙМ ФАКТА В СВОДКЕ ЦА
print(f"\n--- Объединённый и отсортированный Факт: {df_ca_fact.shape} ---")
# Выгружаем План в df_ca_normativ
inclusion_list_normativ = {
"Топливо итого, тонн",
"Топливо итого, %",
"Топливо на технологию, тонн",
"Топливо на технологию, %",
"Топливо на энергетику, тонн",
"Топливо на энергетику, %",
"Потери итого, тонн",
"Потери итого, %",
"в т.ч. Идентифицированные безвозвратные потери, тонн**",
"в т.ч. Идентифицированные безвозвратные потери, %**",
"в т.ч. Неидентифицированные потери, тонн**",
"в т.ч. Неидентифицированные потери, %**"
}
# ЭТО ДАТАФРЕЙМ НОРМАТИВА В СВОДКЕ ЦА
df_ca_normativ = self.parse_sheet(file_path, 'Норматив', inclusion_list_normativ)
print(f"\n--- Объединённый и отсортированный Норматив: {df_ca_normativ.shape} ---")
df_dict = {
"plan": df_ca_plan,
"fact": df_ca_fact,
"normativ": df_ca_normativ
}
return df_dict
def data_dict_to_json(self, data_dict):
''' Служебная функция для парсинга словаря в json. '''
def convert_types(obj):
if isinstance(obj, (np.integer, np.int64)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64)):
return float(obj) if not np.isnan(obj) else None
elif isinstance(obj, (np.ndarray,)):
return obj.tolist()
elif pd.isna(obj):
return None
elif isinstance(obj, (pd.Timestamp, np.datetime64)):
return obj.isoformat()
else:
return obj
# Новый словарь для JSON
serializable_dict = {}
for source, table_dict in data_dict.items(): # source: 'plan', 'fact', 'normativ'
serializable_dict[source] = {}
for table_name, df in table_dict.items(): # table_name: 'ТиП, %' и т.п., df: DataFrame
if isinstance(df, pd.DataFrame):
records = df.to_dict(orient='records')
cleaned_records = []
for record in records:
cleaned_record = {
str(k): convert_types(v) for k, v in record.items()
}
cleaned_records.append(cleaned_record)
serializable_dict[source][table_name] = cleaned_records
else:
# На всякий случай, если попался не DataFrame
serializable_dict[source][table_name] = None
return serializable_dict
def get_data(self, df_dict, df_type, table_values):
''' Служебная функция. Выборка данных из датафрейма. '''
df = df_dict.get(df_type, {})
if 'table' not in df.columns:
raise KeyError("В датафрейме отсутствует колонка 'table'")
filtered_df = df[df['table'].isin(table_values)].copy()
result_dict = {key: group for key, group in filtered_df.groupby('table')}
return result_dict
def get_value(self, df: pd.DataFrame, params: dict):
modes = params.get("modes")
tables = params.get("tables")
if not isinstance(modes, list):
raise ValueError("Поле 'modes' должно быть списком")
if not isinstance(tables, list):
raise ValueError("Поле 'tables' должно быть списком")
# Собираем данные
data_dict = {}
for mode in modes:
data_dict[mode] = self.get_data(df, mode, tables)
return self.data_dict_to_json(data_dict)

View File

@@ -0,0 +1,275 @@
import pandas as pd
from core.ports import ParserPort
from adapters.pconfig import OG_IDS, replace_id_in_path, data_to_json
class SvodkaPMParser(ParserPort):
"""Парсер для сводок ПМ (план и факт)"""
name = "Сводки ПМ"
def find_header_row(self, file: str, sheet: str, search_value: str = "Итого", max_rows: int = 50) -> int:
"""Определения индекса заголовка в excel по ключевому слову"""
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def parse_svodka_pm(self, file, sheet, header_num=None):
''' Собственно парсер отчетов одного ОГ для БП, ПП и факта '''
# Автоопределение header_num, если не передан
if header_num is None:
header_num = self.find_header_row(file, sheet, search_value="Итого")
# Читаем заголовки header_num и 1-2 строки данных, чтобы найти INDICATOR_ID
df_probe = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
nrows=2,
)
if df_probe.shape[0] == 0:
raise ValueError("Файл пуст или не содержит данных.")
first_data_row = df_probe.iloc[0]
# Находим столбец с 'INDICATOR_ID'
indicator_cols = first_data_row[first_data_row == 'INDICATOR_ID']
if len(indicator_cols) == 0:
raise ValueError('Не найден столбец со значением "INDICATOR_ID" в первой строке данных.')
indicator_col_name = indicator_cols.index[0]
print(f"Найден INDICATOR_ID в столбце: {indicator_col_name}")
# Читаем весь лист
df_full = pd.read_excel(
file,
sheet_name=sheet,
header=header_num,
usecols=None,
index_col=None
)
if indicator_col_name not in df_full.columns:
raise ValueError(f"Столбец {indicator_col_name} отсутствует при полной загрузке.")
# Перемещаем INDICATOR_ID в начало и делаем индексом
cols = [indicator_col_name] + [col for col in df_full.columns if col != indicator_col_name]
df_full = df_full[cols]
df_full.set_index(indicator_col_name, inplace=True)
# Обрезаем до "Итого" + 1
header_list = [str(h).strip() for h in df_full.columns]
try:
itogo_idx = header_list.index("Итого")
num_cols_needed = itogo_idx + 2
except ValueError:
print('Столбец "Итого" не найден. Оставляем все столбцы.')
num_cols_needed = len(header_list)
num_cols_needed = min(num_cols_needed, len(header_list))
df_final = df_full.iloc[:, :num_cols_needed]
# === Удаление полностью пустых столбцов ===
df_clean = df_final.replace(r'^\s*$', pd.NA, regex=True)
df_clean = df_clean.where(pd.notnull(df_clean), pd.NA)
non_empty_mask = df_clean.notna().any()
df_final = df_final.loc[:, non_empty_mask]
# === Обработка заголовков: Unnamed и "Итого" → "Итого" ===
new_columns = []
last_good_name = None
for col in df_final.columns:
col_str = str(col).strip()
# Проверяем, является ли колонка пустой/некорректной
is_empty_or_unnamed = col_str.startswith('Unnamed') or col_str == '' or col_str.lower() == 'nan'
# Проверяем, начинается ли на "Итого"
if col_str.startswith('Итого'):
current_name = 'Итого'
last_good_name = current_name # обновляем last_good_name
new_columns.append(current_name)
elif is_empty_or_unnamed:
# Используем последнее хорошее имя
new_columns.append(last_good_name)
else:
# Имя, полученное из exel
last_good_name = col_str
new_columns.append(col_str)
df_final.columns = new_columns
print(f"Окончательное количество столбцов: {len(df_final.columns)}")
return df_final
def parse(self, file_path: str, params: dict) -> dict:
import zipfile
pm_dict = {
"facts": {},
"plans": {}
}
excel_fact_template = 'svodka_fact_pm_ID.xlsm'
excel_plan_template = 'svodka_plan_pm_ID.xlsx'
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
for name, id in OG_IDS.items():
if id == 'BASH':
continue # пропускаем BASH
current_fact = replace_id_in_path(excel_fact_template, id)
fact_candidates = [f for f in file_list if current_fact in f]
if len(fact_candidates) == 1:
print(f'Загрузка {current_fact}')
with zip_ref.open(fact_candidates[0]) as excel_file:
pm_dict['facts'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ Факт загружен: {current_fact}")
else:
print(f"⚠️ Файл не найден (Факт): {current_fact}")
pm_dict['facts'][id] = None
current_plan = replace_id_in_path(excel_plan_template, id)
plan_candidates = [f for f in file_list if current_plan in f]
if len(plan_candidates) == 1:
print(f'Загрузка {current_plan}')
with zip_ref.open(plan_candidates[0]) as excel_file:
pm_dict['plans'][id] = self.parse_svodka_pm(excel_file, 'Сводка Нефтепереработка')
print(f"✅ План загружен: {current_plan}")
else:
print(f"⚠️ Файл не найден (План): {current_plan}")
pm_dict['plans'][id] = None
return pm_dict
def get_svodka_value(self, df_svodka, id, code, search_value=None):
''' Служебная функция для простой выборке по сводке '''
row_index = id
mask_value = df_svodka.iloc[0] == code
if search_value is None:
mask_name = df_svodka.columns != 'Итого'
else:
mask_name = df_svodka.columns == search_value
# Убедимся, что маски совпадают по длине
if len(mask_value) != len(mask_name):
raise ValueError(
f"Несоответствие длин масок: mask_value={len(mask_value)}, mask_name={len(mask_name)}"
)
final_mask = mask_value & mask_name # булевая маска по позициям столбцов
col_positions = final_mask.values # numpy array или Series булевых значений
if not final_mask.any():
print(f"Нет столбцов с '{code}' в первой строке и именем, не начинающимся с '{search_value}'")
return 0
else:
if row_index in df_svodka.index:
# Получаем позицию строки
row_loc = df_svodka.index.get_loc(row_index)
# Извлекаем значения по позициям столбцов
values = df_svodka.iloc[row_loc, col_positions]
# Преобразуем в числовой формат
numeric_values = pd.to_numeric(values, errors='coerce')
# Агрегация данных (NaN игнорируются)
if search_value is None:
return numeric_values
else:
return numeric_values.iloc[0]
else:
return None
def get_svodka_og(self, pm_dict, id, codes, columns, search_value=None):
''' Служебная функция получения данных по одному ОГ '''
result = {}
fact_df = pm_dict['facts'][id]
plan_df = pm_dict['plans'][id]
# Определяем, какие столбцы из какого датафрейма брать
for col in columns:
col_result = {}
if col in ['ПП', 'БП']:
if plan_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных плана для {id}")
else:
for code in codes:
val = self.get_svodka_value(plan_df, code, col, search_value)
col_result[code] = val
elif col in ['ТБ', 'СЭБ', 'НЭБ']:
if fact_df is None:
print(f"❌ Невозможно обработать '{col}': нет данных факта для {id}")
else:
for code in codes:
val = self.get_svodka_value(fact_df, code, col, search_value)
col_result[code] = val
else:
print(f"⚠️ Неизвестный столбец: '{col}'. Пропускаем.")
col_result = {code: None for code in codes}
result[col] = col_result
return result
def get_svodka_total(self, pm_dict, codes, columns, search_value=None):
''' Служебная функция агрегации данные по всем ОГ '''
total_result = {}
for name, og_id in OG_IDS.items():
if og_id == 'BASH':
continue
# print(f"📊 Обработка: {name} ({og_id})")
try:
data = self.get_svodka_og(
pm_dict,
og_id,
codes,
columns,
search_value
)
total_result[og_id] = data
except Exception as e:
print(f"❌ Ошибка при обработке {name} ({og_id}): {e}")
total_result[og_id] = None
return total_result
def get_value(self, df, params):
og_id = params.get("id")
codes = params.get("codes")
columns = params.get("columns")
search = params.get("search")
mode = params.get("mode", "total")
if not isinstance(codes, list):
raise ValueError("Поле 'codes' должно быть списком")
if not isinstance(columns, list):
raise ValueError("Поле 'columns' должно быть списком")
data = None
if mode == "single":
if not og_id:
raise ValueError("Отсутствует идентификатор ОГ")
data = self.get_svodka_og(df, og_id, codes, columns, search)
elif mode == "total":
data = self.get_svodka_total(df, codes, columns, search)
json_result = data_to_json(data)
return json_result

View File

@@ -0,0 +1,181 @@
import re
from functools import lru_cache
import json
import numpy as np
import pandas as pd
OG_IDS = {
"Комсомольский НПЗ": "KNPZ",
"Ангарская НХК": "ANHK",
"Ачинский НПЗ": "AchNPZ",
"ЕНПЗ": "BASH",
"УНПЗ": "UNPZ",
"УНХ": "UNH",
"Новойл": "NOV",
"Новокуйбышевский НПЗ": "NovKuybNPZ",
"Куйбышевский НПЗ": "KuybNPZ",
"Сызранский НПЗ": "CyzNPZ",
"Туапсинский НПЗ": "TuapsNPZ",
"Саратовский НПЗ": "SNPZ",
"Рязанская НПК": "RNPK",
"Нижневартовское НПО": "NVNPO",
"Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS",
}
SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM",
"Л-24/6": "SNPZ.L24-6",
"ЛЧ-35-11/300": "SNPZ.L35-300",
"ЛЧ-35-11/600": "SNPZ.L35-600",
"ОЗФХ т.у.т/сут": "SNPZ.OZPH",
"УПНБ": "SNPZ.UPB",
"УПЭС": "SNPZ.UPES",
"ЭЛОУ АВТ-6": "SNPZ.EAVT6",
"Итого": "SNPZ.TOTAL",
"Норматив по фактическим загрузкам": "SNPZ.TOTAL.FACT",
}
def replace_id_in_path(file_path, new_id):
return file_path.replace('ID', str(new_id))
def get_table_name(exel):
return re.sub(r'^data/(.+)\.(xlsm|xlsx)$', r'\1', exel)
def normalize_and_tokenize(text):
if not isinstance(text, str) or not text.strip():
return set()
cleaned = re.sub(r'[^\w\s]', ' ', text.lower())
cleaned = cleaned.replace('ё', 'е')
words = [word.strip() for word in cleaned.split()]
return set(word for word in words if word)
@lru_cache(maxsize=512)
def get_object_by_name(name):
return get_id_by_name(name, SNPZ_IDS)
@lru_cache(maxsize=512)
def get_og_by_name(name):
return get_id_by_name(name, OG_IDS)
def get_id_by_name(name, dictionary):
if not name or not isinstance(name, str):
return None
query_words = normalize_and_tokenize(name)
if not query_words:
return None
best_match = None
best_score = 0
for full_name, obj_id in dictionary.items():
entry_words = normalize_and_tokenize(full_name)
if not entry_words:
continue
intersection = query_words & entry_words
if not intersection:
continue
# Полное совпадение
if query_words == entry_words:
return obj_id
# Все слова из словаря есть в запросе
if entry_words <= query_words:
score = len(entry_words)
# Хорошее пересечение
elif len(intersection) >= min(2, len(entry_words), len(query_words)):
score = len(intersection) / max(len(query_words), len(entry_words))
# Одно слово (аббревиатура)
elif len(entry_words) == 1 and list(entry_words)[0] in query_words:
score = 1.0
else:
continue
if score > best_score:
best_score = score
best_match = obj_id
return best_match
def data_to_json(data, indent=2, ensure_ascii=False):
"""
Полностью безопасная сериализация данных в JSON.
Корректно обрабатывает:
- np.nan, pd.NA, None → null
- DataFrame, Series, numpy массивы и скаляры
- вложенные структуры
"""
def is_nan_like(obj):
"""Проверяет, является ли объект NaN-подобным."""
if obj is None:
return True
if pd.isna(obj): # Ловит np.nan, pd.NA, pd.NaT, None
return True
return False
def convert_obj(obj):
# --- DataFrame ---
if isinstance(obj, pd.DataFrame):
return [convert_obj(row) for _, row in obj.iterrows()] # каждая строка → dict
# --- Series ---
if isinstance(obj, pd.Series):
# Преобразуем индекс в значения, если нужно
values = [convert_obj(v) for v in obj.values]
# Убираем None (были NaN), но сохраняем структуру, если нужно
return values
# --- numpy скаляры ---
elif isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
return None if is_nan_like(obj) else float(obj)
elif isinstance(obj, np.ndarray):
return [convert_obj(v) for v in obj]
# --- временные метки ---
elif isinstance(obj, (pd.Timestamp, pd.Timedelta)):
return obj.isoformat() if not pd.isna(obj) else None
elif isinstance(obj, pd._libs.missing.NAType): # pd.NA
return None
# --- рекурсия по dict и list ---
elif isinstance(obj, dict):
return {
key: convert_obj(value)
for key, value in obj.items()
if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON)
}
elif isinstance(obj, list):
return [convert_obj(item) for item in obj]
# --- None и NaN-подобные ---
elif is_nan_like(obj):
return None
# --- всё остальное ---
else:
try:
return float(obj) if isinstance(obj, (int, float)) else str(obj)
except Exception:
return str(obj) # финальный fallback
try:
cleaned_data = convert_obj(data)
cleaned_data_str = json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
return cleaned_data
except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")

View File

@@ -0,0 +1,95 @@
"""
Адаптер для хранилища S3/MinIO
"""
import os
import pickle
import io
from typing import Optional
from minio import Minio # boto3
import pandas as pd
from core.ports import StoragePort
class MinIOStorageAdapter(StoragePort):
"""Адаптер для MinIO хранилища"""
def __init__(self):
self.client = Minio(
os.getenv("MINIO_ENDPOINT", "localhost:9000"),
access_key=os.getenv("MINIO_ACCESS_KEY", "minioadmin"),
secret_key=os.getenv("MINIO_SECRET_KEY", "minioadmin"),
secure=os.getenv("MINIO_SECURE", "false").lower() == "true",
cert_check=False
)
self.bucket_name = os.getenv("MINIO_BUCKET", "svodka-data")
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Проверка существования bucket и создание при необходимости"""
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
print(f"Bucket '{self.bucket_name}' создан")
def save_dataframe(self, df: pd.DataFrame, object_id: str) -> bool:
"""Сохранение DataFrame в MinIO"""
try:
# Сериализуем DataFrame
data = pickle.dumps(df)
# Создаем поток данных
data_stream = io.BytesIO(data)
# Загружаем в MinIO
self.client.put_object(
self.bucket_name,
object_id,
data_stream,
length=len(data),
content_type='application/octet-stream'
)
print(f"DataFrame успешно сохранен в MinIO: {self.bucket_name}/{object_id}")
return True
except Exception as e:
print(f"Ошибка при сохранении в MinIO: {e}")
return False
def load_dataframe(self, object_id: str) -> Optional[pd.DataFrame]:
"""Загрузка DataFrame из MinIO"""
try:
# Получаем объект из MinIO
response = self.client.get_object(self.bucket_name, object_id)
# Читаем данные
data = response.read()
# Десериализуем DataFrame
df = pickle.loads(data)
return df
except Exception as e:
print(f"Ошибка при загрузке данных из MinIO: {e}")
return None
finally:
if 'response' in locals():
response.close()
response.release_conn()
def delete_object(self, object_id: str) -> bool:
"""Удаление объекта из MinIO"""
try:
self.client.remove_object(self.bucket_name, object_id)
print(f"Объект успешно удален из MinIO: {self.bucket_name}/{object_id}")
return True
except Exception as e:
print(f"Ошибка при удалении объекта из MinIO: {e}")
return False
def object_exists(self, object_id: str) -> bool:
"""Проверка существования объекта в MinIO"""
try:
self.client.stat_object(self.bucket_name, object_id)
return True
except Exception:
return False

View File

828
python_parser/app/main.py Normal file
View File

@@ -0,0 +1,828 @@
import os
import multiprocessing
import uvicorn
from typing import Dict, List
from fastapi import FastAPI, File, UploadFile, HTTPException, status
from fastapi.responses import JSONResponse
from adapters.storage import MinIOStorageAdapter
from adapters.parsers import SvodkaPMParser, SvodkaCAParser, MonitoringFuelParser
from core.models import UploadRequest, DataRequest
from core.services import ReportService, PARSERS
from app.schemas import (
ServerInfoResponse,
UploadResponse, UploadErrorResponse,
SvodkaPMTotalOGsRequest, SvodkaPMSingleOGRequest,
SvodkaCARequest,
MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
)
# Парсеры
PARSERS.update({
'svodka_pm': SvodkaPMParser,
'svodka_ca': SvodkaCAParser,
'monitoring_fuel': MonitoringFuelParser,
# 'svodka_plan_sarnpz': SvodkaPlanSarnpzParser,
})
# Адаптеры
storage_adapter = MinIOStorageAdapter()
def get_report_service() -> ReportService:
return ReportService(storage_adapter)
tags_metadata = [
{
"name": "Общее",
"display_name": "Общее",
},
{
"name": SvodkaPMParser.name,
"description": "✅ Ready",
},
{
"name": SvodkaCAParser.name,
"description": "✅ Ready",
"display_name": "Сводка ПМ",
},
{
"name": MonitoringFuelParser.name,
"description": "✅ Ready",
"display_name": "Мониторинг топлива",
},
# {
# "name": MonitoringFuelParser.name,
# "description": "⚠️ WORK IN PROGRESS",
# },
]
app = FastAPI(
title="NIN Excel Parsers API",
description="API для парсинга сводок и работы с данными экселей НиН",
version="1.0.0",
openapi_tags=tags_metadata,
)
@app.get("/", tags=["Общее"])
async def root():
return {"message": "Svodka Parser API", "version": "1.0.0"}
@app.get("/parsers", tags=["Общее"],
summary="Список доступных парсеров",
description="Возвращает список идентификаторов всех доступных парсеров",
response_model=Dict[str, List[str]],
responses={
200: {
"content": {
"application/json": {
"example": {
"parsers": ["monitoring_fuel", "svodka_ca", "svodka_pm"]
}
}
}
}
},)
async def get_available_parsers():
"""Получение списка доступных парсеров"""
parsers = list(PARSERS.keys())
return {"parsers": parsers}
@app.get("/server-info", tags=["Общее"],
summary="Информация о сервере",
response_model=ServerInfoResponse,)
async def server_info():
return {
"process_id": os.getpid(),
"parent_id": os.getppid(),
"cpu_cores": multiprocessing.cpu_count(),
"memory_mb": os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') / (1024. ** 2)
}
# @app.get("/svodka_pm/schema", tags=[SvodkaPMParser.name])
# async def get_svodka_pm_schema():
# """Получение схемы параметров для парсера сводок ПМ факта и плана"""
# parser = PARSERS['svodka_pm']()
# return parser.get_schema()
# @app.get("/svodka_ca/schema", tags=[SvodkaCAParser.name])
# async def get_svodka_ca_schema():
# """Получение схемы параметров для парсера сводки СА"""
# parser = PARSERS['svodka_ca']()
# return parser.get_schema()
# @app.get("/monitoring_fuel/schema", tags=[MonitoringFuelParser.name])
# async def get_monitoring_fuel_schema():
# """Получение схемы параметров для парсера мониторинга топлива"""
# parser = PARSERS['monitoring_fuel']()
# return parser.get_schema()
@app.post("/svodka_pm/upload-zip", tags=[SvodkaPMParser.name],
summary="Загрузка файлов сводок ПМ одним ZIP-архивом",
response_model=UploadResponse,
responses={
400: {"model": UploadErrorResponse, "description": "Неверный формат архива или файлов"},
500: {"model": UploadErrorResponse, "description": "Внутренняя ошибка сервера"}
},)
async def upload_svodka_pm_zip(
zip_file: UploadFile = File(..., description="ZIP архив с Excel файлами (.zip)")
):
"""Загрузка файлов сводок ПМ (факта и плана) по всем ОГ в **одном ZIP-архиве**
**Шаблоны названий файлов:**
- Факт: `svodka_fact_pm_<OG_ID>.xlsm`
- План: `svodka_plan_pm_<OG_ID>.xlsx`
"""
report_service = get_report_service()
try:
if not zip_file.filename.lower().endswith('.zip'):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=UploadErrorResponse(
message="Файл должен быть ZIP архивом",
error_code="INVALID_FILE_TYPE",
details={
"expected_formats": [".zip"],
"received_format": zip_file.filename.split('.')[-1] if '.' in zip_file.filename else "unknown"
}
).model_dump()
)
file_content = await zip_file.read()
# Создаем запрос
request = UploadRequest(
report_type='svodka_pm',
file_content=file_content,
file_name=zip_file.filename
)
# Загружаем отчет
result = report_service.upload_report(request)
if result.success:
return UploadResponse(
success=True,
message=result.message,
object_id=result.object_id
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=result.message,
error_code="ERR_UPLOAD"
).model_dump(),
)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=f"Внутренняя ошибка сервера: {str(e)}",
error_code="INTERNAL_SERVER_ERROR"
).model_dump()
)
# @app.post("/svodka_pm/upload", tags=[SvodkaPMParser.name])
# async def upload_svodka_pm(
# file: UploadFile = File(...)
# ):
# report_service = get_report_service()
# """
# Загрузка отчета сводки факта СарНПЗ
# - file: Excel файл для загрузки
# """
# try:
# # Проверяем тип файла
# if not file.filename.endswith(('.xlsx', '.xlsm', '.xls')):
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls)"
# )
# # Читаем содержимое файла
# file_content = await file.read()
# # Создаем запрос
# request = UploadRequest(
# report_type='svodka_pm',
# file_content=file_content,
# file_name=file.filename
# )
# # Загружаем отчет
# result = report_service.upload_report(request)
# # print(result)
# if result.success:
# return {
# "success": True,
# "message": result.message,
# "object_id": result.object_id
# }
# else:
# raise HTTPException(status_code=500, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_pm/get_single_og", tags=[SvodkaPMParser.name],
summary="Получение данных по одному ОГ")
async def get_svodka_pm_single_og(
request_data: SvodkaPMSingleOGRequest
):
"""Получение данных из сводок ПМ (факта и плана) по одному ОГ
### Структура параметров:
- `id`: **Идентификатор МА** для запрашиваемого ОГ (обязательный)
- `codes`: **Массив кодов** выбираемых строк (обязательный)
- `columns`: **Массив названий** выбираемых столбцов (обязательный)
- `search`: **Опциональный параметр** для фильтрации ("Итого" или null)
### Пример тела запроса:
```json
{
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
```
"""
report_service = get_report_service()
"""
Получение данных из отчета сводки факта СарНПЗ
- id: ID ОГ
- codes: коды выбираемых строк [78, 79]
- columns: выбираемые колонки ["БП", "СЭБ"]
- search: "Итого" не обязательный
"""
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'single'
request = DataRequest(
report_type='svodka_pm',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_pm/get_total_ogs", tags=[SvodkaPMParser.name],
summary="Получение данных по всем ОГ")
async def get_svodka_pm_total_ogs(
request_data: SvodkaPMTotalOGsRequest
):
"""Получение данных из сводок ПМ (факта и плана) по всем ОГ
### Структура параметров:
- `codes`: **Массив кодов** выбираемых строк (обязательный)
- `columns`: **Массив названий** выбираемых столбцов (обязательный)
- `search`: **Опциональный параметр** для фильтрации ("Итого" или null)
### Пример тела запроса:
```json
{
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"]
}
```
"""
report_service = get_report_service()
"""
Получение данных из отчета сводки факта СарНПЗ
- codes: коды выбираемых строк [78, 79]
- columns: выбираемые колонки ["БП", "СЭБ"]
- search: "Итого"
"""
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request = DataRequest(
report_type='svodka_pm',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/svodka_pm/get_data", tags=[SvodkaPMParser.name])
# async def get_svodka_pm_data(
# request_data: dict
# ):
# report_service = get_report_service()
# """
# Получение данных из отчета сводки факта СарНПЗ
# - indicator_id: ID индикатора
# - code: Код для поиска
# - search_value: Опциональное значение для поиска
# """
# try:
# # Создаем запрос
# request = DataRequest(
# report_type='svodka_pm',
# get_params=request_data
# )
# # Получаем данные
# result = report_service.get_data(request)
# if result.success:
# return {
# "success": True,
# "data": result.data
# }
# else:
# raise HTTPException(status_code=404, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/svodka_ca/upload", tags=[SvodkaCAParser.name],
summary="Загрузка файла отчета сводки СА",
response_model=UploadResponse,
responses={
400: {"model": UploadErrorResponse, "description": "Неверный формат файла"},
500: {"model": UploadErrorResponse, "description": "Внутренняя ошибка сервера"}
},)
async def upload_svodka_ca(
file: UploadFile = File(..., description="Excel файл сводки СА (.xlsx, .xlsm, .xls)")
):
"""
Загрузка и обработка Excel файла отчета сводки СА
**Поддерживаемые форматы:**
- Excel (.xlsx, .xlsm, .xls)
"""
report_service = get_report_service()
try:
# Проверяем тип файла
if not file.filename.endswith(('.xlsx', '.xlsm', '.xls')):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=UploadErrorResponse(
message="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls)",
error_code="INVALID_FILE_TYPE",
details={
"expected_formats": [".xlsx", ".xlsm", ".xls"],
"received_format": file.filename.split('.')[-1] if '.' in file.filename else "unknown"
}
).model_dump()
)
# Читаем содержимое файла
file_content = await file.read()
# Создаем запрос
request = UploadRequest(
report_type='svodka_ca',
file_content=file_content,
file_name=file.filename
)
# Загружаем отчет
result = report_service.upload_report(request)
if result.success:
return UploadResponse(
success=True,
message=result.message,
object_id=result.object_id
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=result.message,
error_code="ERR_UPLOAD"
).model_dump(),
)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=f"Внутренняя ошибка сервера: {str(e)}",
error_code="INTERNAL_SERVER_ERROR"
).model_dump()
)
@app.post("/svodka_ca/get_data", tags=[SvodkaCAParser.name],
summary="Получение данных из отчета сводки СА")
async def get_svodka_ca_data(
request_data: SvodkaCARequest
):
"""
Получение данных из отчета сводки СА по указанным режимам и таблицам
### Структура параметров:
- `modes`: **Массив кодов** режимов - `plan`, `fact` или `normativ` (обязательный)
- `tables`: **Массив названий** таблиц как есть (обязательный)
### Пример тела запроса:
```json
{
"modes": ["plan", "fact"],
"tables": ["ТиП, %", "Топливо итого, тонн", "Топливо итого, %", "Потери итого, тонн"]
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request = DataRequest(
report_type='svodka_ca',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload", tags=[MonitoringFuelParser.name])
# async def upload_monitoring_fuel(
# file: UploadFile = File(...),
# directory_path: str = None
# ):
# report_service = get_report_service()
# """
# Загрузка отчета мониторинга топлива
# - file: Excel файл для загрузки (или архив с файлами)
# - directory_path: Путь к директории с файлами (опционально)
# """
# try:
# # Проверяем тип файла
# if not file.filename.endswith(('.xlsx', '.xlsm', '.xls', '.zip')):
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Поддерживаются только Excel файлы (.xlsx, .xlsm, .xls) или архивы (.zip)"
# )
# # Читаем содержимое файла
# file_content = await file.read()
# # Создаем параметры для парсинга
# parse_params = {}
# if directory_path:
# parse_params['directory_path'] = directory_path
# # Создаем запрос
# request = UploadRequest(
# report_type='monitoring_fuel',
# file_content=file_content,
# file_name=file.filename,
# parse_params=parse_params
# )
# # Загружаем отчет
# result = report_service.upload_report(request)
# if result.success:
# return {
# "success": True,
# "message": result.message,
# "object_id": result.object_id
# }
# else:
# raise HTTPException(status_code=500, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/get_data", tags=[MonitoringFuelParser.name])
# async def get_monitoring_fuel_data(
# request_data: dict
# ):
# report_service = get_report_service()
# """
# Получение данных из отчета мониторинга топлива
# - column: Название колонки для агрегации (normativ, total, total_svod)
# """
# try:
# # Создаем запрос
# request = DataRequest(
# report_type='monitoring_fuel',
# get_params=request_data
# )
# # Получаем данные
# result = report_service.get_data(request)
# if result.success:
# return {
# "success": True,
# "data": result.data
# }
# else:
# raise HTTPException(status_code=404, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
# @app.post("/monitoring_fuel/upload_directory", tags=[MonitoringFuelParser.name])
# async def upload_monitoring_fuel_directory(
# request_data: dict
# ):
# report_service = get_report_service()
# """
# Загрузка отчета мониторинга топлива из директории
# - directory_path: Путь к директории с файлами monitoring_SNPZ_*.xlsm
# """
# try:
# import os
# import glob
# # Извлекаем directory_path из request_data
# directory_path = request_data.get('directory_path')
# if not directory_path:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail="Параметр 'directory_path' обязателен"
# )
# # Проверяем существование директории
# if not os.path.exists(directory_path):
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail=f"Директория не найдена: {directory_path}"
# )
# # Проверяем наличие файлов
# file_pattern = os.path.join(directory_path, "monitoring_SNPZ_*.xlsm")
# files = glob.glob(file_pattern)
# if not files:
# raise HTTPException(
# status_code=status.HTTP_400_BAD_REQUEST,
# detail=f"Не найдены файлы по паттерну {file_pattern}"
# )
# # Создаем параметры для парсинга
# parse_params = {
# 'directory_path': directory_path,
# 'sheet_name': 'Мониторинг потребления',
# 'search_value': 'Установка'
# }
# # Создаем запрос (используем пустой файл, так как парсим директорию)
# request = UploadRequest(
# report_type='monitoring_fuel',
# file_content=b'', # Пустой контент, так как парсим директорию
# file_name='directory_upload',
# parse_params=parse_params
# )
# # Загружаем отчет
# result = report_service.upload_report(request)
# if result.success:
# return {
# "success": True,
# "message": result.message,
# "object_id": result.object_id,
# "files_processed": len(files)
# }
# else:
# raise HTTPException(status_code=500, detail=result.message)
# except HTTPException:
# raise
# except Exception as e:
# raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/monitoring_fuel/upload-zip", tags=[MonitoringFuelParser.name],
summary="Загрузка файлов сводок мониторинга топлива одним ZIP-архивом",
response_model=UploadResponse,
responses={
400: {"model": UploadErrorResponse, "description": "Неверный формат архива или файлов"},
500: {"model": UploadErrorResponse, "description": "Внутренняя ошибка сервера"}
},)
async def upload_monitoring_fuel_zip(
zip_file: UploadFile = File(..., description="ZIP архив с Excel файлами (.zip)")
):
"""Загрузка файлов сводок мониторинга топлива по всем ОГ в **одном ZIP-архиве**
**Шаблоны названий файлов:**
- `monitoring_SNPZ_{MM}.xlsm`, `MM` - номер месяца с ведущим 0
"""
report_service = get_report_service()
try:
if not zip_file.filename.lower().endswith('.zip'):
return JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST,
content=UploadErrorResponse(
message="Файл должен быть ZIP архивом",
error_code="INVALID_FILE_TYPE",
details={
"expected_formats": [".zip"],
"received_format": zip_file.filename.split('.')[-1] if '.' in zip_file.filename else "unknown"
}
).model_dump()
)
file_content = await zip_file.read()
# Создаем запрос
request = UploadRequest(
report_type='monitoring_fuel',
file_content=file_content,
file_name=zip_file.filename
)
# Загружаем отчет
result = report_service.upload_report(request)
if result.success:
return UploadResponse(
success=True,
message=result.message,
object_id=result.object_id
)
else:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=result.message,
error_code="ERR_UPLOAD"
).model_dump(),
)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=UploadErrorResponse(
message=f"Внутренняя ошибка сервера: {str(e)}",
error_code="INTERNAL_SERVER_ERROR"
).model_dump()
)
@app.post("/monitoring_fuel/get_total_by_columns", tags=[MonitoringFuelParser.name],
summary="Получение данных по колонкам и расчёт средних значений")
async def get_monitoring_fuel_total_by_columns(
request_data: MonitoringFuelTotalRequest
):
"""Получение данных из сводок мониторинга топлива по колонкам и расчёт средних значений
### Структура параметров:
- `columns`: **Массив названий** выбираемых столбцов (обязательный)
### Пример тела запроса:
```json
{
"columns": ["total", "normativ"]
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'total'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
@app.post("/monitoring_fuel/get_month_by_code", tags=[MonitoringFuelParser.name],
summary="Получение данных за месяц")
async def get_monitoring_fuel_month_by_code(
request_data: MonitoringFuelMonthRequest
):
"""Получение данных из сводок мониторинга топлива за указанный номер месяца
### Структура параметров:
- `month`: **Номер месяца строкой с ведущим 0** (обязательный)
### Пример тела запроса:
```json
{
"month": "02"
}
```
"""
report_service = get_report_service()
try:
# Создаем запрос
request_dict = request_data.model_dump()
request_dict['mode'] = 'month'
request = DataRequest(
report_type='monitoring_fuel',
get_params=request_dict
)
# Получаем данные
result = report_service.get_data(request)
if result.success:
return {
"success": True,
"data": result.data
}
else:
raise HTTPException(status_code=404, detail=result.message)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=f"Внутренняя ошибка сервера: {str(e)}")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)

View File

@@ -0,0 +1,14 @@
from .monitoring_fuel import MonitoringFuelMonthRequest, MonitoringFuelTotalRequest
from .svodka_ca import SvodkaCARequest
from .svodka_pm import SvodkaPMSingleOGRequest, SvodkaPMTotalOGsRequest
from .server import ServerInfoResponse
from .upload import UploadResponse, UploadErrorResponse
__all__ = [
'MonitoringFuelMonthRequest', 'MonitoringFuelTotalRequest',
'SvodkaCARequest',
'SvodkaPMSingleOGRequest', 'SvodkaPMTotalOGsRequest',
'ServerInfoResponse',
'UploadResponse', 'UploadErrorResponse'
]

View File

@@ -0,0 +1,34 @@
from pydantic import BaseModel, Field
from typing import List
class MonitoringFuelMonthRequest(BaseModel):
month: str = Field(
...,
description="Номер месяца строкой с ведущим 0",
example="02",
pattern="^(0[1-9]|1[0-2])$"
)
class Config:
json_schema_extra = {
"example": {
"month": "02"
}
}
class MonitoringFuelTotalRequest(BaseModel):
columns: List[str] = Field(
...,
description="Массив названий выбираемых столбцов",
example=["total", "normativ"],
min_items=1
)
class Config:
json_schema_extra = {
"example": {
"columns": ["total", "normativ"]
}
}

View File

@@ -0,0 +1,18 @@
from pydantic import BaseModel, Field
class ServerInfoResponse(BaseModel):
process_id: int = Field(..., description="Идентификатор текущего процесса сервера")
parent_id: int = Field(..., description="Идентификатор родительского процесса")
cpu_cores: int = Field(..., description="Количество ядер процессора в системе")
memory_mb: float = Field(..., description="Общий объем оперативной памяти в мегабайтах")
class Config:
json_schema_extra = {
"example": {
"process_id": 12345,
"parent_id": 6789,
"cpu_cores": 8,
"memory_mb": 16384.5
}
}

View File

@@ -0,0 +1,33 @@
from pydantic import BaseModel, Field
from typing import List
from enum import Enum
class ReportMode(str, Enum):
PLAN = "plan"
FACT = "fact"
NORMATIV = "normativ"
class SvodkaCARequest(BaseModel):
modes: List[ReportMode] = Field(
...,
description="Массив кодов режимов: plan, fact или normativ",
example=["plan", "fact"],
min_items=1
)
tables: List[str] = Field(
...,
description="Массив названий таблиц как есть",
example=["ТиП, %", "Топливо итого, тонн", "Топливо итого, %", "Потери итого, тонн"],
min_items=1
)
class Config:
json_schema_extra = {
"example": {
"modes": ["plan", "fact"],
"tables": ["ТиП, %", "Топливо итого, тонн", "Топливо итого, %", "Потери итого, тонн"]
}
}

View File

@@ -0,0 +1,91 @@
from pydantic import Field, BaseModel
from typing import Optional, List
from enum import Enum
class OGID(str, Enum):
"""Доступные идентификаторы ОГ"""
KNPZ = "KNPZ"
ANHK = "ANHK"
AchNPZ = "AchNPZ"
BASH = "BASH"
UNPZ = "UNPZ"
UNH = "UNH"
NOV = "NOV"
NovKuybNPZ = "NovKuybNPZ"
KuybNPZ = "KuybNPZ"
CyzNPZ = "CyzNPZ"
TuapsNPZ = "TuapsNPZ"
SNPZ = "SNPZ"
RNPK = "RNPK"
NVNPO = "NVNPO"
KLNPZ = "KLNPZ"
PurNP = "PurNP"
YANOS = "YANOS"
class SvodkaPMSingleOGRequest(BaseModel):
id: OGID = Field(
...,
description="Идентификатор МА для запрашиваемого ОГ",
example="SNPZ"
)
codes: List[int] = Field(
...,
description="Массив кодов выбираемых строк",
example=[78, 79],
min_items=1
)
columns: List[str] = Field(
...,
description="Массив названий выбираемых столбцов",
example=["ПП", "СЭБ"],
min_items=1
)
search: Optional[str] = Field(
None,
description="Опциональный параметр для фильтрации ('Итого' или null)",
example="Итого"
)
class Config:
json_schema_extra = {
"example": {
"id": "SNPZ",
"codes": [78, 79],
"columns": ["ПП", "СЭБ"]
}
}
class SvodkaPMTotalOGsRequest(BaseModel):
codes: List[int] = Field(
...,
description="Массив кодов выбираемых строк",
example=[78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
min_items=1
)
columns: List[str] = Field(
...,
description="Массив названий выбираемых столбцов",
example=["БП", "ПП", "СЭБ"],
min_items=1
)
search: Optional[str] = Field(
None,
description="Опциональный параметр для фильтрации ('Итого' или null)",
example="Итого"
)
class Config:
json_schema_extra = {
"example": {
"codes": [78, 79, 394, 395, 396, 397, 81, 82, 83, 84],
"columns": ["БП", "ПП", "СЭБ"]
}
}

View File

@@ -0,0 +1,44 @@
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from enum import Enum
class UploadStatus(str, Enum):
SUCCESS = "success"
PROCESSING = "processing"
ERROR = "error"
class UploadResponse(BaseModel):
success: bool = Field(..., description="Успешность операции")
message: str = Field(..., description="Сообщение о результате операции")
object_id: Optional[str] = Field(None, description="Идентификатор загруженного объекта")
class Config:
json_schema_extra = {
"example": {
"success": True,
"message": "Файл успешно загружен и обработан",
"object_id": "65a1b2c3d4e5f6a7b8c9d0e1",
}
}
class UploadErrorResponse(BaseModel):
success: bool = Field(False, description="Успешность операции")
message: str = Field(..., description="Сообщение об ошибке")
error_code: Optional[str] = Field(None, description="Код ошибки")
details: Optional[Dict[str, Any]] = Field(None, description="Детали ошибки")
class Config:
json_schema_extra = {
"example": {
"success": False,
"message": "Неверный формат файла",
"error_code": "INVALID_FILE_FORMAT",
"details": {
"expected_formats": [".zip"],
"received_format": ".rar"
}
}
}

View File

View File

@@ -0,0 +1,45 @@
"""
Доменные модели
"""
from dataclasses import dataclass, field
from typing import Optional, Dict, Any
import pandas as pd
@dataclass
class UploadRequest:
"""Запрос на загрузку отчета"""
report_type: str
file_content: bytes
file_name: str
parse_params: Dict[str, Any] = field(default_factory=dict)
@dataclass
class UploadResult:
"""Результат загрузки отчета"""
success: bool
message: str
object_id: Optional[str] = None
@dataclass
class DataRequest:
"""Запрос на получение данных"""
report_type: str
get_params: Dict[str, Any]
@dataclass
class DataResult:
"""Результат получения данных"""
success: bool
data: Optional[Dict[str, Any]] = None
message: Optional[str] = None
@dataclass
class ParsedData:
"""Распарсенные данные"""
dataframe: pd.DataFrame
metadata: Optional[Dict[str, Any]] = None

View File

@@ -0,0 +1,49 @@
"""
Порты (интерфейсы) для hexagonal architecture
"""
from abc import ABC, abstractmethod
from typing import Optional
import pandas as pd
class ParserPort(ABC):
"""Интерфейс для парсеров"""
@abstractmethod
def parse(self, file_path: str, params: dict) -> pd.DataFrame:
"""Парсинг файла и возврат DataFrame"""
pass
@abstractmethod
def get_value(self, df: pd.DataFrame, params: dict):
"""Получение значения из DataFrame по параметрам"""
pass
# @abstractmethod
# def get_schema(self) -> dict:
# """Возвращает схему входных параметров для парсера"""
# pass
class StoragePort(ABC):
"""Интерфейс для хранилища данных"""
@abstractmethod
def save_dataframe(self, df: pd.DataFrame, object_id: str) -> bool:
"""Сохранение DataFrame"""
pass
@abstractmethod
def load_dataframe(self, object_id: str) -> Optional[pd.DataFrame]:
"""Загрузка DataFrame"""
pass
@abstractmethod
def delete_object(self, object_id: str) -> bool:
"""Удаление объекта"""
pass
@abstractmethod
def object_exists(self, object_id: str) -> bool:
"""Проверка существования объекта"""
pass

View File

@@ -0,0 +1,120 @@
"""
Сервисы (бизнес-логика)
"""
import tempfile
import os
from typing import Dict, Type
from core.models import UploadRequest, UploadResult, DataRequest, DataResult
from core.ports import ParserPort, StoragePort
# Глобальный словарь парсеров
PARSERS: Dict[str, Type[ParserPort]] = {}
def get_parser(report_type: str) -> ParserPort:
"""Получение парсера по типу отчета"""
if report_type not in PARSERS:
available_parsers = list(PARSERS.keys())
raise ValueError(f"Неизвестный тип отчета '{report_type}'. Доступные типы: {available_parsers}")
return PARSERS[report_type]()
class ReportService:
"""Сервис для работы с отчетами (только S3)"""
def __init__(self, storage: StoragePort):
self.storage = storage
def upload_report(self, request: UploadRequest) -> UploadResult:
"""Загрузка отчета"""
try:
# Получаем парсер для данного типа отчета
parser = get_parser(request.report_type)
# Сохраняем файл во временную директорию
suff = "." + str(request.file_name.split('.')[-1])
with tempfile.NamedTemporaryFile(delete=False, suffix=suff) as temp_file:
temp_file.write(request.file_content)
temp_file_path = temp_file.name
try:
# Парсим файл
parse_params = request.parse_params or {}
df = parser.parse(temp_file_path, parse_params)
# Генерируем object_id
object_id = f"nin_excel_data_{request.report_type}"
# Удаляем старый объект, если он существует
if self.storage.object_exists(object_id):
self.storage.delete_object(object_id)
print(f"Старый объект удален: {object_id}")
# Сохраняем в хранилище
if self.storage.save_dataframe(df, object_id):
return UploadResult(
success=True,
message="Отчет успешно загружен",
object_id=object_id
)
else:
return UploadResult(
success=False,
message="Ошибка при сохранении в хранилище"
)
finally:
# Удаляем временный файл
os.unlink(temp_file_path)
except Exception as e:
return UploadResult(
success=False,
message=f"Ошибка при обработке отчета: {str(e)}"
)
def get_data(self, request: DataRequest) -> DataResult:
"""Получение данных из отчета"""
try:
# Генерируем object_id
object_id = f"nin_excel_data_{request.report_type}"
# Проверяем существование объекта
if not self.storage.object_exists(object_id):
return DataResult(
success=False,
message=f"Отчет типа '{request.report_type}' не найден"
)
# Загружаем DataFrame из хранилища
df = self.storage.load_dataframe(object_id)
if df is None:
return DataResult(
success=False,
message="Ошибка при загрузке данных из хранилища"
)
# Получаем парсер
parser = get_parser(request.report_type)
# Получаем значение
value = parser.get_value(df, request.get_params)
# Формируем результат
if value is not None:
if hasattr(value, 'to_dict'):
result_data = dict(value)
else:
result_data = {"value": value}
return DataResult(success=True, data=result_data)
else:
return DataResult(success=False, message="Значение не найдено")
except Exception as e:
return DataResult(
success=False,
message=f"Ошибка при получении данных: {str(e)}"
)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,16 @@
version: '3.8'
services:
minio:
image: minio/minio:latest
container_name: svodka_minio
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9000:9000"
- "9001:9001"
volumes:
- ./minio:/data
command: server /data --console-address ":9001"
restart: unless-stopped

View File

@@ -0,0 +1,17 @@
applications:
- name: nin-python-parser-dev-test
buildpack: python_buildpack
health-check-type: web
services:
- logging-shared-dev
command: python /app/run_stand.py
path: .
disk_quota: 2G
memory: 4G
instances: 1
env:
MINIO_ENDPOINT: s3-region1.ppc-jv-dev.sibintek.ru
MINIO_ACCESS_KEY: 00a70fac02c1208446de
MINIO_SECRET_KEY: 1gk9tVYEEoH9ADRxb4kiAuCo6CCISdV6ie0p6oDO
MINIO_BUCKET: bucket-476684e7-1223-45ac-a101-8b5aeda487d6
MINIO_SECURE: false

View File

@@ -0,0 +1 @@
{"version":"1","format":"xl-single","id":"29118f57-702e-4363-9a41-9f06655e449d","xl":{"version":"3","this":"195a90f4-fc26-46a8-b6d4-0b50b99b1342","sets":[["195a90f4-fc26-46a8-b6d4-0b50b99b1342"]],"distributionAlgo":"SIPMOD+PARITY"}}

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More