Files
python_parser/python_parser/adapters/pconfig.py
2025-09-02 07:15:16 +03:00

241 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from functools import lru_cache
import json
import numpy as np
import pandas as pd
import os
OG_IDS = {
"Комсомольский НПЗ": "KNPZ",
"Ангарская НХК": "ANHK",
"Ачинский НПЗ": "AchNPZ",
"ЕНПЗ": "BASH",
"УНПЗ": "UNPZ",
"УНХ": "UNH",
"Новойл": "NOV",
"Новокуйбышевский НПЗ": "NovKuybNPZ",
"Куйбышевский НПЗ": "KuybNPZ",
"Сызранский НПЗ": "CyzNPZ",
"Туапсинский НПЗ": "TuapsNPZ",
"Саратовский НПЗ": "SNPZ",
"Рязанская НПК": "RNPK",
"Нижневартовское НПО": "NVNPO",
"Красноленинский НПЗ": "KLNPZ",
"Пурнефтепереработка": "PurNP",
"ЯНОС": "YANOS",
"Уфанефтехим": "UNH",
"РНПК": "RNPK",
"КмсНПЗ": "KNPZ",
"АНХК": "ANHK",
"НК НПЗ": "NovKuybNPZ",
"КНПЗ": "KuybNPZ",
"СНПЗ": "CyzNPZ",
"Нижневаторское НПО": "NVNPO",
"ПурНП": "PurNP",
}
SINGLE_OGS = [
"KNPZ",
"ANHK",
"AchNPZ",
"BASH",
"UNPZ",
"UNH",
"NOV",
"NovKuybNPZ",
"KuybNPZ",
"CyzNPZ",
"TuapsNPZ",
"SNPZ",
"RNPK",
"NVNPO",
"KLNPZ",
"PurNP",
"YANOS",
]
SNPZ_IDS = {
"Висбрекинг": "SNPZ.VISB",
"Изомеризация": "SNPZ.IZOM",
"Л-24/6": "SNPZ.L24-6",
"ЛЧ-35-11/300": "SNPZ.L35-300",
"ЛЧ-35-11/600": "SNPZ.L35-600",
"ОЗФХ т.у.т/сут": "SNPZ.OZPH",
"УПНБ": "SNPZ.UPB",
"УПЭС": "SNPZ.UPES",
"ЭЛОУ АВТ-6": "SNPZ.EAVT6",
"Итого": "SNPZ.TOTAL",
"Норматив по фактическим загрузкам": "SNPZ.TOTAL.FACT",
}
def replace_id_in_path(file_path, new_id):
# Заменяем 'ID' на новое значение
modified_path = file_path.replace('ID', str(new_id)) + '.xlsx'
# Проверяем, существует ли файл
if not os.path.exists(modified_path):
# Меняем расширение на .xlsm
directory, filename = os.path.split(modified_path)
name, ext = os.path.splitext(filename)
new_filename = name + '.xlsm'
modified_path = os.path.join(directory, new_filename)
return modified_path
def get_table_name(exel):
return re.sub(r'^data/(.+)\.(xlsm|xlsx)$', r'\1', exel)
def normalize_and_tokenize(text):
if not isinstance(text, str) or not text.strip():
return set()
cleaned = re.sub(r'[^\w\s]', ' ', text.lower())
cleaned = cleaned.replace('ё', 'е')
words = [word.strip() for word in cleaned.split()]
return set(word for word in words if word)
@lru_cache(maxsize=512)
def get_object_by_name(name):
return get_id_by_name(name, SNPZ_IDS)
@lru_cache(maxsize=512)
def get_og_by_name(name):
return get_id_by_name(name, OG_IDS)
def get_id_by_name(name, dictionary):
if not name or not isinstance(name, str):
return None
query_words = normalize_and_tokenize(name)
if not query_words:
return None
best_match = None
best_score = 0
for full_name, obj_id in dictionary.items():
entry_words = normalize_and_tokenize(full_name)
if not entry_words:
continue
intersection = query_words & entry_words
if not intersection:
continue
# Полное совпадение
if query_words == entry_words:
return obj_id
# Все слова из словаря есть в запросе
if entry_words <= query_words:
score = len(entry_words)
# Хорошее пересечение
elif len(intersection) >= min(2, len(entry_words), len(query_words)):
score = len(intersection) / max(len(query_words), len(entry_words))
# Одно слово (аббревиатура)
elif len(entry_words) == 1 and list(entry_words)[0] in query_words:
score = 1.0
else:
continue
if score > best_score:
best_score = score
best_match = obj_id
return best_match
def find_header_row(file, sheet, search_value="Итого", max_rows=50):
''' Определения индекса заголовка в exel по ключевому слову '''
# Читаем первые max_rows строк без заголовков
df_temp = pd.read_excel(
file,
sheet_name=sheet,
header=None,
nrows=max_rows
)
# Ищем строку, где хотя бы в одном столбце встречается искомое значение
for idx, row in df_temp.iterrows():
if row.astype(str).str.strip().str.contains(f"^{search_value}$", case=False, regex=True).any():
print(f"Заголовок найден в строке {idx} (Excel: {idx + 1})")
return idx # 0-based index — то, что нужно для header=
raise ValueError(f"Не найдена строка с заголовком '{search_value}' в первых {max_rows} строках.")
def data_to_json(data, indent=2, ensure_ascii=False):
"""
Полностью безопасная сериализация данных в JSON.
Корректно обрабатывает:
- np.nan, pd.NA, None → null
- DataFrame, Series, numpy массивы и скаляры
- вложенные структуры
"""
def is_nan_like(obj):
"""Проверяет, является ли объект NaN-подобным."""
if obj is None:
return True
if pd.isna(obj): # Ловит np.nan, pd.NA, pd.NaT, None
return True
return False
def convert_obj(obj):
# --- DataFrame ---
if isinstance(obj, pd.DataFrame):
return [convert_obj(row) for _, row in obj.iterrows()] # каждая строка → dict
# --- Series ---
if isinstance(obj, pd.Series):
# Преобразуем индекс в значения, если нужно
values = [convert_obj(v) for v in obj.values]
# Убираем None (были NaN), но сохраняем структуру, если нужно
return values
# --- numpy скаляры ---
elif isinstance(obj, (np.integer, np.int64, np.int32)):
return int(obj)
elif isinstance(obj, (np.floating, np.float64, np.float32)):
return None if is_nan_like(obj) else float(obj)
elif isinstance(obj, np.ndarray):
return [convert_obj(v) for v in obj]
# --- временные метки ---
elif isinstance(obj, (pd.Timestamp, pd.Timedelta)):
return obj.isoformat() if not pd.isna(obj) else None
elif isinstance(obj, pd._libs.missing.NAType): # pd.NA
return None
# --- рекурсия по dict и list ---
elif isinstance(obj, dict):
return {
key: convert_obj(value)
for key, value in obj.items()
if not is_nan_like(key) # фильтруем NaN в ключах (недопустимы в JSON)
}
elif isinstance(obj, list):
return [convert_obj(item) for item in obj]
# --- None и NaN-подобные ---
elif is_nan_like(obj):
return None
# --- всё остальное ---
else:
try:
return float(obj) if isinstance(obj, (int, float)) else str(obj)
except Exception:
return str(obj) # финальный fallback
try:
cleaned_data = convert_obj(data)
return json.dumps(cleaned_data, indent=indent, ensure_ascii=ensure_ascii)
except Exception as e:
raise ValueError(f"Не удалось сериализовать данные в JSON: {e}")