# -*- coding: utf-8 -*- """ LibreOffice Calc Makro: NV_MASTER-Abgleich (verbessertes semantisches Matching) Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro/mapper_macro.py """ import os import re import json import traceback # ------------------------------------------------------------ # LIBRARIES & MODELS # ------------------------------------------------------------ try: import pandas as pd PANDAS_AVAILABLE = True except Exception: PANDAS_AVAILABLE = False try: import spacy # Verwende das mittlere Modell für semantische Ähnlichkeit nlp = spacy.load("de_core_news_md") SPACY_AVAILABLE = True except Exception: SPACY_AVAILABLE = False nlp = None try: from rapidfuzz import fuzz RAPIDFUZZ_AVAILABLE = True except Exception: RAPIDFUZZ_AVAILABLE = False from difflib import SequenceMatcher # ------------------------------------------------------------ # KONFIGURATION # ------------------------------------------------------------ BASE_DIR = os.path.dirname(os.path.abspath(__file__)) NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log") CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json") STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"} CONF_THRESHOLD = 0.70 # etwas großzügiger für semantisches Matching # ------------------------------------------------------------ # LOGGING # ------------------------------------------------------------ def log(msg): """Schreibt technische Logs ins Makroverzeichnis.""" try: with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(msg.strip() + "\n") except Exception: pass log("Makro gestartet") # ------------------------------------------------------------ # CACHE # ------------------------------------------------------------ try: if os.path.exists(CACHE_FILE): with open(CACHE_FILE, "r", encoding="utf-8") as f: CACHE = json.load(f) else: CACHE = {} except Exception: CACHE = {} # ------------------------------------------------------------ # TEXTNORMALISIERUNG & LEMMATISIERUNG # ------------------------------------------------------------ def normalize_text(s): if not s: return "" s = str(s).strip().lower() s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) s = re.sub(r"\s+", " ", s) return s lemma_cache = {} def lemmatize_term(term): t = normalize_text(term) if t in lemma_cache: return lemma_cache[t] if SPACY_AVAILABLE and nlp: try: doc = nlp(t) lemma = " ".join([token.lemma_ for token in doc]) except Exception: lemma = t else: lemma = t lemma_cache[t] = lemma return lemma # ------------------------------------------------------------ # NV_MASTER LADEN # ------------------------------------------------------------ def build_norm_index(nv_path): norm_dict = {} lemma_index = {} if not PANDAS_AVAILABLE: log("Pandas nicht verfügbar – NV_MASTER kann nicht geladen werden.") return norm_dict, lemma_index try: sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") except Exception as e: log(f"Fehler beim Laden von NV_MASTER: {e}") return norm_dict, lemma_index for sheet_name, df in sheets.items(): if str(sheet_name).strip().lower() == "master": continue df = df.fillna("") cols = [str(c).strip().lower() for c in df.columns] id_col = next((df.columns[i] for i, c in enumerate(cols) if "id" in c), df.columns[0]) word_col = next((df.columns[i] for i, c in enumerate(cols) if "wort" in c or "vokabel" in c), df.columns[-1]) current_parent_id = None for _, row in df.iterrows(): id_val = str(row[id_col]).strip() word_val = str(row[word_col]).strip() if id_val: current_parent_id = id_val if not word_val: continue norm_name = normalize_text(word_val) lemma = lemmatize_term(word_val) entry = {"Name": word_val, "ID": current_parent_id or "", "Sheet": sheet_name} norm_dict.setdefault(norm_name, []).append(entry) lemma_index.setdefault(lemma, []).append(entry) log(f"NV_MASTER geladen: {sum(len(v) for v in norm_dict.values())} Begriffe.") return norm_dict, lemma_index # ------------------------------------------------------------ # SCORING: FUZZY + SEMANTISCH # ------------------------------------------------------------ def fuzzy_score(a, b): if RAPIDFUZZ_AVAILABLE: try: return fuzz.token_set_ratio(a, b) / 100.0 except Exception: return 0.0 else: return SequenceMatcher(None, a.lower(), b.lower()).ratio() def semantic_similarity(a, b): if not SPACY_AVAILABLE or not hasattr(nlp.vocab, "vectors"): return 0.0 try: doc_a, doc_b = nlp(a), nlp(b) if doc_a.vector_norm and doc_b.vector_norm: return float(doc_a.similarity(doc_b)) return 0.0 except Exception: return 0.0 def combined_score(a, b): sf = fuzzy_score(a, b) ss = semantic_similarity(a, b) return max(sf, ss) # ------------------------------------------------------------ # MATCHING & VORSCHLÄGE # ------------------------------------------------------------ def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD): candidates = [] for key_lemma, entries in lemma_index.items(): score = combined_score(term_lemma, key_lemma) if key_lemma.startswith(term_lemma): score = min(score + 0.05, 1.0) if score >= threshold: for e in entries: candidates.append((score, e["Name"], e["ID"])) for norm_key, entries in norm_dict.items(): score = combined_score(term_lemma, norm_key) if norm_key.startswith(term_lemma): score = min(score + 0.05, 1.0) if score >= threshold: for e in entries: candidates.append((score, e["Name"], e["ID"])) candidates.sort(key=lambda x: x[0], reverse=True) seen, results = set(), [] for score, name, id_ in candidates: key = (name.lower(), id_.lower() if id_ else "") if key in seen: continue seen.add(key) results.append({"score": score, "name": name, "id": id_}) if len(results) >= top_n: break return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] def map_term_with_indexes(term, norm_dict, lemma_index): term_norm = normalize_text(term) term_lemma = lemmatize_term(term) if term_lemma in CACHE: return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"], CACHE[term_lemma]["ids"] hits, suggestions, ids = [], [], [] if term_norm in norm_dict: for e in norm_dict[term_norm]: hits.append(e["Name"]) if e["ID"]: ids.append(e["ID"]) if not hits and term_lemma in lemma_index: for e in lemma_index[term_lemma]: hits.append(e["Name"]) if e["ID"]: ids.append(e["ID"]) suggs = get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD) filtered_suggs = [] for s in suggs: s_clean = normalize_text(s.split(" (")[0]) if s_clean not in [normalize_text(h) for h in hits]: filtered_suggs.append(s) suggestions = filtered_suggs def uniq(seq): seen = set() out = [] for x in seq: if x not in seen: seen.add(x) out.append(x) return out hits, suggestions, ids = uniq(hits), uniq(suggestions), uniq(ids) CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids} log(f"TERM: {term} | HITS: {hits} | SUGGS: {suggestions}") return hits, suggestions, ids # ------------------------------------------------------------ # HAUPTMAKRO # ------------------------------------------------------------ def run_mapper_macro(): try: doc = XSCRIPTCONTEXT.getDocument() sheet = doc.CurrentController.ActiveSheet except Exception as e: log(f"Fehler beim Zugriff auf Dokument: {e}") return norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) if not norm_dict: log("Fehler: NV_MASTER leer oder nicht gefunden.") return try: cursor = sheet.createCursor() cursor.gotoStartOfUsedArea(False) cursor.gotoEndOfUsedArea(True) used = cursor.getRangeAddress() except Exception as e: log(f"Cursor-Fehler: {e}") return header_row = 0 objekt_col = None for c in range(0, used.EndColumn + 1): val = str(sheet.getCellByPosition(c, header_row).String).strip().lower() if val == "objektbeschreibung": objekt_col = c break if objekt_col is None: log("Keine Spalte 'Objektbeschreibung' gefunden.") return existing = {} for c in range(0, used.EndColumn + 1): h = str(sheet.getCellByPosition(c, header_row).String).strip() if h == "Norm_Treffer": existing["Norm_Treffer"] = c if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c if h == "Norm_ID": existing["Norm_ID"] = c last_col = used.EndColumn for name in ["Norm_Treffer", "Norm_Vorschlag", "Norm_ID"]: if name not in existing: last_col += 1 existing[name] = last_col sheet.getCellByPosition(last_col, header_row).String = name GREEN, YELLOW, RED = 0xADFF2F, 0xFFD700, 0xCC0000 norm_tr_col, norm_sug_col, norm_id_col = existing["Norm_Treffer"], existing["Norm_Vorschlag"], existing["Norm_ID"] rows = 0 for r in range(header_row + 1, used.EndRow + 1): txt = str(sheet.getCellByPosition(objekt_col, r).String).strip() if not txt: continue terms = [t.strip() for t in re.split(r",|\s+", txt) if t.strip() and t.lower() not in STOPWORDS] row_hits, row_sugs, row_ids, any_unmapped = [], [], [], False for term in terms: hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index) if hits: row_hits.extend(hits) if sugs: row_sugs.extend(sugs) if ids: row_ids.extend(ids) if not hits and not sugs: any_unmapped = True def uniq(seq): seen = set() out = [] for x in seq: if x not in seen: seen.add(x) out.append(x) return out row_hits, row_sugs, row_ids = uniq(row_hits), uniq(row_sugs), uniq(row_ids) sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits) sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs) sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids) obj_cell = sheet.getCellByPosition(objekt_col, r) sug_cell = sheet.getCellByPosition(norm_sug_col, r) tr_cell = sheet.getCellByPosition(norm_tr_col, r) if any_unmapped: obj_cell.CellBackColor = RED elif row_hits: tr_cell.CellBackColor = GREEN if row_sugs: sug_cell.CellBackColor = YELLOW rows += 1 with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(CACHE, f, ensure_ascii=False, indent=2) log(f"Makro abgeschlossen, {rows} Zeilen verarbeitet.") g_exportedScripts = (run_mapper_macro,)