344 lines
12 KiB
Python
344 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
LibreOffice Calc Makro: NV_MASTER-Abgleich (verbessertes semantisches Matching)
|
||
Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro/mapper_macro.py
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import traceback
|
||
|
||
# ------------------------------------------------------------
|
||
# LIBRARIES & MODELS
|
||
# ------------------------------------------------------------
|
||
try:
|
||
import pandas as pd
|
||
PANDAS_AVAILABLE = True
|
||
except Exception:
|
||
PANDAS_AVAILABLE = False
|
||
|
||
try:
|
||
import spacy
|
||
# Verwende das mittlere Modell für semantische Ähnlichkeit
|
||
nlp = spacy.load("de_core_news_md")
|
||
SPACY_AVAILABLE = True
|
||
except Exception:
|
||
SPACY_AVAILABLE = False
|
||
nlp = None
|
||
|
||
try:
|
||
from rapidfuzz import fuzz
|
||
RAPIDFUZZ_AVAILABLE = True
|
||
except Exception:
|
||
RAPIDFUZZ_AVAILABLE = False
|
||
from difflib import SequenceMatcher
|
||
|
||
# ------------------------------------------------------------
|
||
# KONFIGURATION
|
||
# ------------------------------------------------------------
|
||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log")
|
||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json")
|
||
|
||
STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"}
|
||
CONF_THRESHOLD = 0.70 # etwas großzügiger für semantisches Matching
|
||
|
||
# ------------------------------------------------------------
|
||
# LOGGING
|
||
# ------------------------------------------------------------
|
||
def log(msg):
|
||
"""Schreibt technische Logs ins Makroverzeichnis."""
|
||
try:
|
||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||
f.write(msg.strip() + "\n")
|
||
except Exception:
|
||
pass
|
||
|
||
log("Makro gestartet")
|
||
|
||
# ------------------------------------------------------------
|
||
# CACHE
|
||
# ------------------------------------------------------------
|
||
try:
|
||
if os.path.exists(CACHE_FILE):
|
||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||
CACHE = json.load(f)
|
||
else:
|
||
CACHE = {}
|
||
except Exception:
|
||
CACHE = {}
|
||
|
||
# ------------------------------------------------------------
|
||
# TEXTNORMALISIERUNG & LEMMATISIERUNG
|
||
# ------------------------------------------------------------
|
||
def normalize_text(s):
|
||
if not s:
|
||
return ""
|
||
s = str(s).strip().lower()
|
||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||
s = re.sub(r"\s+", " ", s)
|
||
return s
|
||
|
||
lemma_cache = {}
|
||
def lemmatize_term(term):
|
||
t = normalize_text(term)
|
||
if t in lemma_cache:
|
||
return lemma_cache[t]
|
||
if SPACY_AVAILABLE and nlp:
|
||
try:
|
||
doc = nlp(t)
|
||
lemma = " ".join([token.lemma_ for token in doc])
|
||
except Exception:
|
||
lemma = t
|
||
else:
|
||
lemma = t
|
||
lemma_cache[t] = lemma
|
||
return lemma
|
||
|
||
# ------------------------------------------------------------
|
||
# NV_MASTER LADEN
|
||
# ------------------------------------------------------------
|
||
def build_norm_index(nv_path):
|
||
norm_dict = {}
|
||
lemma_index = {}
|
||
|
||
if not PANDAS_AVAILABLE:
|
||
log("Pandas nicht verfügbar – NV_MASTER kann nicht geladen werden.")
|
||
return norm_dict, lemma_index
|
||
|
||
try:
|
||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||
except Exception as e:
|
||
log(f"Fehler beim Laden von NV_MASTER: {e}")
|
||
return norm_dict, lemma_index
|
||
|
||
for sheet_name, df in sheets.items():
|
||
if str(sheet_name).strip().lower() == "master":
|
||
continue
|
||
df = df.fillna("")
|
||
cols = [str(c).strip().lower() for c in df.columns]
|
||
id_col = next((df.columns[i] for i, c in enumerate(cols) if "id" in c), df.columns[0])
|
||
word_col = next((df.columns[i] for i, c in enumerate(cols) if "wort" in c or "vokabel" in c), df.columns[-1])
|
||
|
||
current_parent_id = None
|
||
for _, row in df.iterrows():
|
||
id_val = str(row[id_col]).strip()
|
||
word_val = str(row[word_col]).strip()
|
||
if id_val:
|
||
current_parent_id = id_val
|
||
if not word_val:
|
||
continue
|
||
norm_name = normalize_text(word_val)
|
||
lemma = lemmatize_term(word_val)
|
||
entry = {"Name": word_val, "ID": current_parent_id or "", "Sheet": sheet_name}
|
||
norm_dict.setdefault(norm_name, []).append(entry)
|
||
lemma_index.setdefault(lemma, []).append(entry)
|
||
|
||
log(f"NV_MASTER geladen: {sum(len(v) for v in norm_dict.values())} Begriffe.")
|
||
return norm_dict, lemma_index
|
||
|
||
# ------------------------------------------------------------
|
||
# SCORING: FUZZY + SEMANTISCH
|
||
# ------------------------------------------------------------
|
||
def fuzzy_score(a, b):
|
||
if RAPIDFUZZ_AVAILABLE:
|
||
try:
|
||
return fuzz.token_set_ratio(a, b) / 100.0
|
||
except Exception:
|
||
return 0.0
|
||
else:
|
||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||
|
||
def semantic_similarity(a, b):
|
||
if not SPACY_AVAILABLE or not hasattr(nlp.vocab, "vectors"):
|
||
return 0.0
|
||
try:
|
||
doc_a, doc_b = nlp(a), nlp(b)
|
||
if doc_a.vector_norm and doc_b.vector_norm:
|
||
return float(doc_a.similarity(doc_b))
|
||
return 0.0
|
||
except Exception:
|
||
return 0.0
|
||
|
||
def combined_score(a, b):
|
||
sf = fuzzy_score(a, b)
|
||
ss = semantic_similarity(a, b)
|
||
return max(sf, ss)
|
||
|
||
# ------------------------------------------------------------
|
||
# MATCHING & VORSCHLÄGE
|
||
# ------------------------------------------------------------
|
||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD):
|
||
candidates = []
|
||
for key_lemma, entries in lemma_index.items():
|
||
score = combined_score(term_lemma, key_lemma)
|
||
if key_lemma.startswith(term_lemma):
|
||
score = min(score + 0.05, 1.0)
|
||
if score >= threshold:
|
||
for e in entries:
|
||
candidates.append((score, e["Name"], e["ID"]))
|
||
for norm_key, entries in norm_dict.items():
|
||
score = combined_score(term_lemma, norm_key)
|
||
if norm_key.startswith(term_lemma):
|
||
score = min(score + 0.05, 1.0)
|
||
if score >= threshold:
|
||
for e in entries:
|
||
candidates.append((score, e["Name"], e["ID"]))
|
||
candidates.sort(key=lambda x: x[0], reverse=True)
|
||
seen, results = set(), []
|
||
for score, name, id_ in candidates:
|
||
key = (name.lower(), id_.lower() if id_ else "")
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
results.append({"score": score, "name": name, "id": id_})
|
||
if len(results) >= top_n:
|
||
break
|
||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
||
|
||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
||
term_norm = normalize_text(term)
|
||
term_lemma = lemmatize_term(term)
|
||
|
||
if term_lemma in CACHE:
|
||
return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"], CACHE[term_lemma]["ids"]
|
||
|
||
hits, suggestions, ids = [], [], []
|
||
|
||
if term_norm in norm_dict:
|
||
for e in norm_dict[term_norm]:
|
||
hits.append(e["Name"])
|
||
if e["ID"]:
|
||
ids.append(e["ID"])
|
||
|
||
if not hits and term_lemma in lemma_index:
|
||
for e in lemma_index[term_lemma]:
|
||
hits.append(e["Name"])
|
||
if e["ID"]:
|
||
ids.append(e["ID"])
|
||
|
||
suggs = get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD)
|
||
filtered_suggs = []
|
||
for s in suggs:
|
||
s_clean = normalize_text(s.split(" (")[0])
|
||
if s_clean not in [normalize_text(h) for h in hits]:
|
||
filtered_suggs.append(s)
|
||
suggestions = filtered_suggs
|
||
|
||
def uniq(seq):
|
||
seen = set()
|
||
out = []
|
||
for x in seq:
|
||
if x not in seen:
|
||
seen.add(x)
|
||
out.append(x)
|
||
return out
|
||
|
||
hits, suggestions, ids = uniq(hits), uniq(suggestions), uniq(ids)
|
||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids}
|
||
|
||
log(f"TERM: {term} | HITS: {hits} | SUGGS: {suggestions}")
|
||
return hits, suggestions, ids
|
||
|
||
# ------------------------------------------------------------
|
||
# HAUPTMAKRO
|
||
# ------------------------------------------------------------
|
||
def run_mapper_macro():
|
||
try:
|
||
doc = XSCRIPTCONTEXT.getDocument()
|
||
sheet = doc.CurrentController.ActiveSheet
|
||
except Exception as e:
|
||
log(f"Fehler beim Zugriff auf Dokument: {e}")
|
||
return
|
||
|
||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||
if not norm_dict:
|
||
log("Fehler: NV_MASTER leer oder nicht gefunden.")
|
||
return
|
||
|
||
try:
|
||
cursor = sheet.createCursor()
|
||
cursor.gotoStartOfUsedArea(False)
|
||
cursor.gotoEndOfUsedArea(True)
|
||
used = cursor.getRangeAddress()
|
||
except Exception as e:
|
||
log(f"Cursor-Fehler: {e}")
|
||
return
|
||
|
||
header_row = 0
|
||
objekt_col = None
|
||
for c in range(0, used.EndColumn + 1):
|
||
val = str(sheet.getCellByPosition(c, header_row).String).strip().lower()
|
||
if val == "objektbeschreibung":
|
||
objekt_col = c
|
||
break
|
||
if objekt_col is None:
|
||
log("Keine Spalte 'Objektbeschreibung' gefunden.")
|
||
return
|
||
|
||
existing = {}
|
||
for c in range(0, used.EndColumn + 1):
|
||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||
if h == "Norm_Treffer": existing["Norm_Treffer"] = c
|
||
if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c
|
||
if h == "Norm_ID": existing["Norm_ID"] = c
|
||
|
||
last_col = used.EndColumn
|
||
for name in ["Norm_Treffer", "Norm_Vorschlag", "Norm_ID"]:
|
||
if name not in existing:
|
||
last_col += 1
|
||
existing[name] = last_col
|
||
sheet.getCellByPosition(last_col, header_row).String = name
|
||
|
||
GREEN, YELLOW, RED = 0xADFF2F, 0xFFD700, 0xCC0000
|
||
norm_tr_col, norm_sug_col, norm_id_col = existing["Norm_Treffer"], existing["Norm_Vorschlag"], existing["Norm_ID"]
|
||
|
||
rows = 0
|
||
for r in range(header_row + 1, used.EndRow + 1):
|
||
txt = str(sheet.getCellByPosition(objekt_col, r).String).strip()
|
||
if not txt:
|
||
continue
|
||
terms = [t.strip() for t in re.split(r",|\s+", txt) if t.strip() and t.lower() not in STOPWORDS]
|
||
row_hits, row_sugs, row_ids, any_unmapped = [], [], [], False
|
||
for term in terms:
|
||
hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index)
|
||
if hits: row_hits.extend(hits)
|
||
if sugs: row_sugs.extend(sugs)
|
||
if ids: row_ids.extend(ids)
|
||
if not hits and not sugs: any_unmapped = True
|
||
|
||
def uniq(seq):
|
||
seen = set()
|
||
out = []
|
||
for x in seq:
|
||
if x not in seen:
|
||
seen.add(x)
|
||
out.append(x)
|
||
return out
|
||
|
||
row_hits, row_sugs, row_ids = uniq(row_hits), uniq(row_sugs), uniq(row_ids)
|
||
sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits)
|
||
sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs)
|
||
sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids)
|
||
|
||
obj_cell = sheet.getCellByPosition(objekt_col, r)
|
||
sug_cell = sheet.getCellByPosition(norm_sug_col, r)
|
||
tr_cell = sheet.getCellByPosition(norm_tr_col, r)
|
||
|
||
if any_unmapped:
|
||
obj_cell.CellBackColor = RED
|
||
elif row_hits:
|
||
tr_cell.CellBackColor = GREEN
|
||
if row_sugs:
|
||
sug_cell.CellBackColor = YELLOW
|
||
|
||
rows += 1
|
||
|
||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||
log(f"Makro abgeschlossen, {rows} Zeilen verarbeitet.")
|
||
|
||
g_exportedScripts = (run_mapper_macro,)
|