344 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
LibreOffice Calc Makro: NV_MASTER-Abgleich (verbessertes semantisches Matching)
Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro/mapper_macro.py
"""
import os
import re
import json
import traceback
# ------------------------------------------------------------
# LIBRARIES & MODELS
# ------------------------------------------------------------
try:
import pandas as pd
PANDAS_AVAILABLE = True
except Exception:
PANDAS_AVAILABLE = False
try:
import spacy
# Verwende das mittlere Modell für semantische Ähnlichkeit
nlp = spacy.load("de_core_news_md")
SPACY_AVAILABLE = True
except Exception:
SPACY_AVAILABLE = False
nlp = None
try:
from rapidfuzz import fuzz
RAPIDFUZZ_AVAILABLE = True
except Exception:
RAPIDFUZZ_AVAILABLE = False
from difflib import SequenceMatcher
# ------------------------------------------------------------
# KONFIGURATION
# ------------------------------------------------------------
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log")
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json")
STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"}
CONF_THRESHOLD = 0.70 # etwas großzügiger für semantisches Matching
# ------------------------------------------------------------
# LOGGING
# ------------------------------------------------------------
def log(msg):
"""Schreibt technische Logs ins Makroverzeichnis."""
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(msg.strip() + "\n")
except Exception:
pass
log("Makro gestartet")
# ------------------------------------------------------------
# CACHE
# ------------------------------------------------------------
try:
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "r", encoding="utf-8") as f:
CACHE = json.load(f)
else:
CACHE = {}
except Exception:
CACHE = {}
# ------------------------------------------------------------
# TEXTNORMALISIERUNG & LEMMATISIERUNG
# ------------------------------------------------------------
def normalize_text(s):
if not s:
return ""
s = str(s).strip().lower()
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
s = re.sub(r"\s+", " ", s)
return s
lemma_cache = {}
def lemmatize_term(term):
t = normalize_text(term)
if t in lemma_cache:
return lemma_cache[t]
if SPACY_AVAILABLE and nlp:
try:
doc = nlp(t)
lemma = " ".join([token.lemma_ for token in doc])
except Exception:
lemma = t
else:
lemma = t
lemma_cache[t] = lemma
return lemma
# ------------------------------------------------------------
# NV_MASTER LADEN
# ------------------------------------------------------------
def build_norm_index(nv_path):
norm_dict = {}
lemma_index = {}
if not PANDAS_AVAILABLE:
log("Pandas nicht verfügbar NV_MASTER kann nicht geladen werden.")
return norm_dict, lemma_index
try:
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
except Exception as e:
log(f"Fehler beim Laden von NV_MASTER: {e}")
return norm_dict, lemma_index
for sheet_name, df in sheets.items():
if str(sheet_name).strip().lower() == "master":
continue
df = df.fillna("")
cols = [str(c).strip().lower() for c in df.columns]
id_col = next((df.columns[i] for i, c in enumerate(cols) if "id" in c), df.columns[0])
word_col = next((df.columns[i] for i, c in enumerate(cols) if "wort" in c or "vokabel" in c), df.columns[-1])
current_parent_id = None
for _, row in df.iterrows():
id_val = str(row[id_col]).strip()
word_val = str(row[word_col]).strip()
if id_val:
current_parent_id = id_val
if not word_val:
continue
norm_name = normalize_text(word_val)
lemma = lemmatize_term(word_val)
entry = {"Name": word_val, "ID": current_parent_id or "", "Sheet": sheet_name}
norm_dict.setdefault(norm_name, []).append(entry)
lemma_index.setdefault(lemma, []).append(entry)
log(f"NV_MASTER geladen: {sum(len(v) for v in norm_dict.values())} Begriffe.")
return norm_dict, lemma_index
# ------------------------------------------------------------
# SCORING: FUZZY + SEMANTISCH
# ------------------------------------------------------------
def fuzzy_score(a, b):
if RAPIDFUZZ_AVAILABLE:
try:
return fuzz.token_set_ratio(a, b) / 100.0
except Exception:
return 0.0
else:
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
def semantic_similarity(a, b):
if not SPACY_AVAILABLE or not hasattr(nlp.vocab, "vectors"):
return 0.0
try:
doc_a, doc_b = nlp(a), nlp(b)
if doc_a.vector_norm and doc_b.vector_norm:
return float(doc_a.similarity(doc_b))
return 0.0
except Exception:
return 0.0
def combined_score(a, b):
sf = fuzzy_score(a, b)
ss = semantic_similarity(a, b)
return max(sf, ss)
# ------------------------------------------------------------
# MATCHING & VORSCHLÄGE
# ------------------------------------------------------------
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD):
candidates = []
for key_lemma, entries in lemma_index.items():
score = combined_score(term_lemma, key_lemma)
if key_lemma.startswith(term_lemma):
score = min(score + 0.05, 1.0)
if score >= threshold:
for e in entries:
candidates.append((score, e["Name"], e["ID"]))
for norm_key, entries in norm_dict.items():
score = combined_score(term_lemma, norm_key)
if norm_key.startswith(term_lemma):
score = min(score + 0.05, 1.0)
if score >= threshold:
for e in entries:
candidates.append((score, e["Name"], e["ID"]))
candidates.sort(key=lambda x: x[0], reverse=True)
seen, results = set(), []
for score, name, id_ in candidates:
key = (name.lower(), id_.lower() if id_ else "")
if key in seen:
continue
seen.add(key)
results.append({"score": score, "name": name, "id": id_})
if len(results) >= top_n:
break
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
def map_term_with_indexes(term, norm_dict, lemma_index):
term_norm = normalize_text(term)
term_lemma = lemmatize_term(term)
if term_lemma in CACHE:
return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"], CACHE[term_lemma]["ids"]
hits, suggestions, ids = [], [], []
if term_norm in norm_dict:
for e in norm_dict[term_norm]:
hits.append(e["Name"])
if e["ID"]:
ids.append(e["ID"])
if not hits and term_lemma in lemma_index:
for e in lemma_index[term_lemma]:
hits.append(e["Name"])
if e["ID"]:
ids.append(e["ID"])
suggs = get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD)
filtered_suggs = []
for s in suggs:
s_clean = normalize_text(s.split(" (")[0])
if s_clean not in [normalize_text(h) for h in hits]:
filtered_suggs.append(s)
suggestions = filtered_suggs
def uniq(seq):
seen = set()
out = []
for x in seq:
if x not in seen:
seen.add(x)
out.append(x)
return out
hits, suggestions, ids = uniq(hits), uniq(suggestions), uniq(ids)
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids}
log(f"TERM: {term} | HITS: {hits} | SUGGS: {suggestions}")
return hits, suggestions, ids
# ------------------------------------------------------------
# HAUPTMAKRO
# ------------------------------------------------------------
def run_mapper_macro():
try:
doc = XSCRIPTCONTEXT.getDocument()
sheet = doc.CurrentController.ActiveSheet
except Exception as e:
log(f"Fehler beim Zugriff auf Dokument: {e}")
return
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
if not norm_dict:
log("Fehler: NV_MASTER leer oder nicht gefunden.")
return
try:
cursor = sheet.createCursor()
cursor.gotoStartOfUsedArea(False)
cursor.gotoEndOfUsedArea(True)
used = cursor.getRangeAddress()
except Exception as e:
log(f"Cursor-Fehler: {e}")
return
header_row = 0
objekt_col = None
for c in range(0, used.EndColumn + 1):
val = str(sheet.getCellByPosition(c, header_row).String).strip().lower()
if val == "objektbeschreibung":
objekt_col = c
break
if objekt_col is None:
log("Keine Spalte 'Objektbeschreibung' gefunden.")
return
existing = {}
for c in range(0, used.EndColumn + 1):
h = str(sheet.getCellByPosition(c, header_row).String).strip()
if h == "Norm_Treffer": existing["Norm_Treffer"] = c
if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c
if h == "Norm_ID": existing["Norm_ID"] = c
last_col = used.EndColumn
for name in ["Norm_Treffer", "Norm_Vorschlag", "Norm_ID"]:
if name not in existing:
last_col += 1
existing[name] = last_col
sheet.getCellByPosition(last_col, header_row).String = name
GREEN, YELLOW, RED = 0xADFF2F, 0xFFD700, 0xCC0000
norm_tr_col, norm_sug_col, norm_id_col = existing["Norm_Treffer"], existing["Norm_Vorschlag"], existing["Norm_ID"]
rows = 0
for r in range(header_row + 1, used.EndRow + 1):
txt = str(sheet.getCellByPosition(objekt_col, r).String).strip()
if not txt:
continue
terms = [t.strip() for t in re.split(r",|\s+", txt) if t.strip() and t.lower() not in STOPWORDS]
row_hits, row_sugs, row_ids, any_unmapped = [], [], [], False
for term in terms:
hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index)
if hits: row_hits.extend(hits)
if sugs: row_sugs.extend(sugs)
if ids: row_ids.extend(ids)
if not hits and not sugs: any_unmapped = True
def uniq(seq):
seen = set()
out = []
for x in seq:
if x not in seen:
seen.add(x)
out.append(x)
return out
row_hits, row_sugs, row_ids = uniq(row_hits), uniq(row_sugs), uniq(row_ids)
sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits)
sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs)
sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids)
obj_cell = sheet.getCellByPosition(objekt_col, r)
sug_cell = sheet.getCellByPosition(norm_sug_col, r)
tr_cell = sheet.getCellByPosition(norm_tr_col, r)
if any_unmapped:
obj_cell.CellBackColor = RED
elif row_hits:
tr_cell.CellBackColor = GREEN
if row_sugs:
sug_cell.CellBackColor = YELLOW
rows += 1
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(CACHE, f, ensure_ascii=False, indent=2)
log(f"Makro abgeschlossen, {rows} Zeilen verarbeitet.")
g_exportedScripts = (run_mapper_macro,)