From 9869e72c53963f424f0d87b867f6102b1c784565 Mon Sep 17 00:00:00 2001 From: gumuArnold Date: Thu, 16 Oct 2025 08:26:35 +0000 Subject: [PATCH] Upload files to "/" Alle Alten Versionen des Mapper-Makros --- mapper_macro_1.4.py | 469 ++++++++++++++++++++++++++++++++++++++++ mapper_macro_1.5.py | 508 ++++++++++++++++++++++++++++++++++++++++++++ mapper_macro_2.0.py | 343 ++++++++++++++++++++++++++++++ mapper_macro_2.1.py | 365 +++++++++++++++++++++++++++++++ mapper_macro_2.2.py | 353 ++++++++++++++++++++++++++++++ 5 files changed, 2038 insertions(+) create mode 100644 mapper_macro_1.4.py create mode 100644 mapper_macro_1.5.py create mode 100644 mapper_macro_2.0.py create mode 100644 mapper_macro_2.1.py create mode 100644 mapper_macro_2.2.py diff --git a/mapper_macro_1.4.py b/mapper_macro_1.4.py new file mode 100644 index 0000000..a22c231 --- /dev/null +++ b/mapper_macro_1.4.py @@ -0,0 +1,469 @@ +# -*- coding: utf-8 -*- +# mapper_macro 1.5 - LibreOffice Calc +# Features: Kompositum-Split, Cache, Live-Vorschläge nur auf 'Objektbeschreibung', Logging + +import os +import re +import json +import datetime + +# optional imports (Pandas, Spacy, RapidFuzz) +try: + import pandas as pd + PANDAS_AVAILABLE = True +except Exception: + PANDAS_AVAILABLE = False + +try: + import spacy + nlp = spacy.load("de_core_news_sm") + SPACY_AVAILABLE = True +except Exception: + SPACY_AVAILABLE = False + nlp = None + +try: + from rapidfuzz import fuzz + RAPIDFUZZ_AVAILABLE = True +except Exception: + RAPIDFUZZ_AVAILABLE = False + from difflib import SequenceMatcher + +# ------------------------ +# Konfiguration +# ------------------------ +BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro" +NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") +CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json") +LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log") + +STOPWORDS = { + "mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an", + "als","bei","für","aus","dem","den","des","eines","einer" +} +CONF_THRESHOLD = 0.75 + +# ------------------------ +# Logging +# ------------------------ +def log(msg, level="INFO"): + ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{ts}] [{level}] {msg}\n" + try: + os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line) + except Exception: + pass + +# ------------------------ +# Cache laden +# ------------------------ +try: + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, "r", encoding="utf-8") as f: + CACHE = json.load(f) + else: + CACHE = {} +except Exception as e: + CACHE = {} + log(f"Fehler beim Laden des Caches: {e}", level="ERROR") + +# ------------------------ +# Textnormalisierung & Lemma +# ------------------------ +lemma_cache = {} + +def normalize_text(s): + if not s: + return "" + s = str(s).strip().lower() + s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) + s = re.sub(r"\s+", " ", s) + return s + +def lemmatize_term(term): + term_norm = normalize_text(term) + if term_norm in lemma_cache: + return lemma_cache[term_norm] + if SPACY_AVAILABLE and nlp: + try: + doc = nlp(term_norm) + lemma = " ".join([t.lemma_ for t in doc]) + except Exception: + lemma = term_norm + else: + lemma = term_norm + lemma_cache[term_norm] = lemma + return lemma + +# ------------------------ +# Kompositum-Splitting +# ------------------------ +def compound_split(term): + if not term: + return [] + parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term) + if parts: + return parts + parts = [p for p in re.split(r'[-\s]+', term) if p] + return parts or [term] + +# ------------------------ +# NV_MASTER indexieren +# ------------------------ +def build_norm_index(nv_path): + norm_dict = {} + lemma_index = {} + if not PANDAS_AVAILABLE: + log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR") + return norm_dict, lemma_index + try: + sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") + except Exception as e: + log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR") + return norm_dict, lemma_index + + for sheet_name, df in sheets.items(): + if str(sheet_name).strip().lower() == "master": + continue + df = df.fillna("") + cols = [str(c).strip().lower() for c in df.columns] + id_col = None + word_col = None + for i, c in enumerate(cols): + if "id" in c: + id_col = df.columns[i] + if "wort" in c or "vokabel" in c: + word_col = df.columns[i] + if word_col is None and len(df.columns) >= 1: + word_col = df.columns[-1] + if id_col is None and len(df.columns) >= 1: + id_col = df.columns[0] + + current_parent_id = None + for _, row in df.iterrows(): + id_val = str(row[id_col]).strip() if id_col in df.columns else "" + word_val = str(row[word_col]).strip() if word_col in df.columns else "" + if id_val: + current_parent_id = id_val + if not word_val: + continue + norm_name = normalize_text(word_val) + lemma = lemmatize_term(word_val) + entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} + norm_dict.setdefault(norm_name, []).append(entry) + lemma_index.setdefault(lemma, []).append(entry) + + log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}") + return norm_dict, lemma_index + +# ------------------------ +# Fuzzy / Vorschläge +# ------------------------ +def fuzzy_score(a, b): + if RAPIDFUZZ_AVAILABLE: + try: + return fuzz.token_set_ratio(a, b) / 100.0 + except Exception: + return 0.0 + else: + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + +def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD): + candidates = [] + for key_lemma, entries in lemma_index.items(): + score = fuzzy_score(term_lemma, key_lemma) + if key_lemma.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + for norm_key, entries in norm_dict.items(): + score = fuzzy_score(term_lemma, norm_key) + if norm_key.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + candidates.sort(key=lambda t: t[0], reverse=True) + seen = set() + results = [] + for score, name, id_ in candidates: + key = (name, id_) + if key in seen: + continue + seen.add(key) + results.append({"score": score, "name": name, "id": id_}) + return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] + +# ------------------------ +# Mapping eines Terms (mit Cache) +# ------------------------ +def map_term_with_indexes(term, norm_dict, lemma_index): + term_norm = normalize_text(term) + term_lemma = lemmatize_term(term) + if term_lemma in CACHE: + c = CACHE[term_lemma] + return c.get("hits", []), c.get("suggestions", []), c.get("ids", []) + + hits = [] + suggestions = [] + ids = [] + + if term_norm in norm_dict: + for e in norm_dict[term_norm]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + if not hits and term_lemma in lemma_index: + for e in lemma_index[term_lemma]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index) + + if not hits: + tokens = compound_split(term) + for t in tokens: + t_lemma = lemmatize_term(t) + if t_lemma in lemma_index: + for e in lemma_index[t_lemma]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + else: + suggestions.extend(get_suggestions_for_term(t_lemma, norm_dict, lemma_index)) + + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + hits = uniq(hits) + suggestions = uniq(suggestions) + ids = uniq(ids) + + CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids} + return hits, suggestions, ids + +# ------------------------ +# Header + Spalten +# ------------------------ +def find_header_and_cols(sheet): + try: + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + dr = cursor.getRangeAddress() + except Exception: + return None, None, None + header_row = None + objekt_col = None + for r in range(0, min(5, dr.EndRow + 1)): + for c in range(0, dr.EndColumn + 1): + try: + val = str(sheet.getCellByPosition(c, r).String).strip().lower() + except Exception: + val = "" + if val == "objektbeschreibung": + header_row = r + objekt_col = c + break + if objekt_col is not None: + break + + if header_row is None: + return None, None, dr + existing = {} + for c in range(0, dr.EndColumn + 1): + try: + h = str(sheet.getCellByPosition(c, header_row).String).strip() + except Exception: + h = "" + if h == "Norm_Treffer": + existing["Norm_Treffer"] = c + if h == "Norm_Vorschlag": + existing["Norm_Vorschlag"] = c + if h == "Norm_ID": + existing["Norm_ID"] = c + return header_row, objekt_col, dr, existing + +# ------------------------ +# Optimierter Live-Handler (nur Objektbeschreibung) +# ------------------------ +def on_objektbeschreibung_change(oEvent=None): + try: + doc = XSCRIPTCONTEXT.getDocument() + sheet = doc.CurrentController.ActiveSheet + except Exception as e: + log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR") + return + + cell = None + try: + if oEvent and hasattr(oEvent, "Range") and oEvent.Range is not None: + cell = oEvent.Range + elif oEvent and hasattr(oEvent, "Source") and oEvent.Source is not None: + cell = oEvent.Source + except Exception: + cell = None + if cell is None: + try: + sel = doc.CurrentSelection + if hasattr(sel, "getCellByPosition"): + cell = sel + else: + cell = sel.getCellByPosition(0, 0) + except Exception as e: + log(f"Keine Selektion: {e}", level="ERROR") + return + + try: + row_index = cell.CellAddress.Row + col_index = cell.CellAddress.Column + except Exception: + return + + try: + header_row, objekt_col, dr, existing = find_header_and_cols(sheet) + if header_row is None or col_index != objekt_col: + return # nur die Objektbeschreibung-Spalte bearbeiten + last_col = dr.EndColumn + if "Norm_Vorschlag" not in existing: + last_col += 1 + existing["Norm_Vorschlag"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" + norm_sug_col = existing["Norm_Vorschlag"] + except Exception as e: + log(f"Fehler Spaltenbestimmung: {e}", level="ERROR") + return + + try: + txt = str(cell.String).strip() + if not txt: + sheet.getCellByPosition(norm_sug_col, row_index).String = "" + return + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + suggestions_acc = [] + clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] + for cl in clauses: + parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()] + for p in parts: + if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p): + continue + for sp in compound_split(p): + _, sugs, _ = map_term_with_indexes(sp, norm_dict, lemma_index) + suggestions_acc.extend(sugs) + + seen = set() + ordered = [] + for s in suggestions_acc: + if s not in seen: + seen.add(s) + ordered.append(s) + sheet.getCellByPosition(norm_sug_col, row_index).String = " | ".join(ordered) + + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + + except Exception as e: + log(f"Fehler im Live-Handler: {e}", level="ERROR") + +# ------------------------ +# Batch-Durchlauf +# ------------------------ +def run_mapper_macro(): + log("=== mapper_macro 1.5 gestartet ===", level="INFO") + try: + doc = XSCRIPTCONTEXT.getDocument() + sheet = doc.CurrentController.ActiveSheet + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + dr = cursor.getRangeAddress() + except Exception as e: + log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR") + return + + header_row, objekt_col, dr, existing = find_header_and_cols(sheet) + if objekt_col is None: + log("Spalte 'Objektbeschreibung' nicht gefunden.", level="ERROR") + return + if "Norm_Treffer" not in existing: + last_col = dr.EndColumn + 1 + existing["Norm_Treffer"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" + if "Norm_Vorschlag" not in existing: + last_col = dr.EndColumn + 2 + existing["Norm_Vorschlag"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" + if "Norm_ID" not in existing: + last_col = dr.EndColumn + 3 + existing["Norm_ID"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_ID" + + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + GREEN, YELLOW, RED = 0xADFF2F, 0xFFA500, 0xCC0000 + + for r in range(header_row + 1, dr.EndRow + 1): + try: + cell = sheet.getCellByPosition(objekt_col, r) + txt = str(cell.String).strip() + if not txt: + continue + clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] + terms = [] + for cl in clauses: + for p in [p.strip() for p in re.split(r"\s+", cl) if p.strip()]: + if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p): + continue + terms.extend([sp.strip() for sp in compound_split(p) if sp.strip()]) + + row_hits, row_sugs, row_ids = [], [], [] + any_unmapped = False + for term in terms: + hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index) + row_hits.extend(hits) + row_sugs.extend(sugs) + row_ids.extend(ids) + if not hits and not sugs: + any_unmapped = True + + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + row_hits, row_sugs, row_ids = map(uniq, [row_hits, row_sugs, row_ids]) + sheet.getCellByPosition(existing["Norm_Treffer"], r).String = " | ".join(row_hits) + sheet.getCellByPosition(existing["Norm_Vorschlag"], r).String = " | ".join(row_sugs) + sheet.getCellByPosition(existing["Norm_ID"], r).String = " | ".join(row_ids) + + cell.CellBackColor = RED if any_unmapped else 0xFFFFFF + sheet.getCellByPosition(existing["Norm_Treffer"], r).CellBackColor = GREEN if row_hits and not any_unmapped else 0xFFFFFF + sheet.getCellByPosition(existing["Norm_Vorschlag"], r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF + + except Exception as e: + log(f"Fehler in Zeile {r}: {e}", level="ERROR") + continue + + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + log("=== mapper_macro 1.5 fertig ===", level="INFO") + +# ------------------------ +# Export +# ------------------------ +g_exportedScripts = ( + run_mapper_macro, + on_objektbeschreibung_change +) diff --git a/mapper_macro_1.5.py b/mapper_macro_1.5.py new file mode 100644 index 0000000..9ed712f --- /dev/null +++ b/mapper_macro_1.5.py @@ -0,0 +1,508 @@ +# -*- coding: utf-8 -*- +# mapper_macro 1.5 - korrigiert: Logging im Dokumentverzeichnis, stabile Button-Erstellung, +# keine Listener, optimiertes Mapping (ohne Listener-Teil) + +import os +import re +import json +import datetime + +# optionale Module (Pandas, Spacy, RapidFuzz) +try: + import pandas as pd + PANDAS_AVAILABLE = True +except Exception: + PANDAS_AVAILABLE = False + +try: + import spacy + nlp = spacy.load("de_core_news_sm") + SPACY_AVAILABLE = True +except Exception: + SPACY_AVAILABLE = False + nlp = None + +try: + from rapidfuzz import fuzz + RAPIDFUZZ_AVAILABLE = True +except Exception: + RAPIDFUZZ_AVAILABLE = False + +from difflib import SequenceMatcher + +# UNO (für Button/Paths) +try: + import uno +except Exception: + uno = None + +# ------------------------ +# Konfiguration (Fallback-BASE_DIR) +# ------------------------ +BASE_DIR = os.path.expanduser("~/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro") +NV_MASTER_FILENAME = "NV_MASTER.ods" +CACHE_FILENAME = "mapper_cache.json" +LOG_FILENAME = "mapper_macro.log" + +STOPWORDS = { + "mit", "ohne", "der", "die", "das", "ein", "eine", "und", "zu", "von", "im", "in", "auf", "an", + "als", "bei", "für", "aus", "dem", "den", "des", "eines", "einer" +} +CONF_THRESHOLD = 0.82 +FUZZY_CUTOFF = 0.88 + +# Per-document paths (initialized by set_paths_from_doc) +DOC_DIR = BASE_DIR +NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME) +CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME) +LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME) + +# in-memory cache +try: + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, "r", encoding="utf-8") as f: + CACHE = json.load(f) + else: + CACHE = {} +except Exception: + CACHE = {} + +# ------------------------ +# Pfade im Dokument setzen +# ------------------------ +def set_paths_from_doc(doc): + global DOC_DIR, NV_MASTER_PATH, CACHE_FILE, LOG_FILE + try: + url = getattr(doc, "URL", "") + if url and url.strip(): + # UNO liefert file:///... + try: + system_path = uno.fileUrlToSystemPath(url) + except Exception: + # fallback: try simple unquote + from urllib.parse import unquote, urlparse + parsed = urlparse(url) + if parsed.scheme == "file": + system_path = unquote(parsed.path) + else: + system_path = "" + if system_path: + d = os.path.dirname(system_path) + if os.path.isdir(d): + DOC_DIR = d + except Exception: + DOC_DIR = BASE_DIR + NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME) + CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME) + LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME) + +# ------------------------ +# Logging (Dokumentdir, robust) +# ------------------------ +def log(msg, level="INFO"): + ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + line = f"[{ts}] [{level}] {msg}\n" + try: + # ensure directory exists + os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line) + except Exception: + # absolute fallback: try writing into BASE_DIR + try: + fallback = os.path.join(BASE_DIR, LOG_FILENAME) + os.makedirs(os.path.dirname(fallback), exist_ok=True) + with open(fallback, "a", encoding="utf-8") as f: + f.write(line) + except Exception: + # last resort: silent + pass + +# ------------------------ +# Textvorbereitung & Helpers +# ------------------------ +lemma_cache = {} + +def normalize_text(s): + if not s: + return "" + s = str(s).strip().lower() + s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) + s = re.sub(r"\s+", " ", s) + return s + +def lemmatize_term(term): + term_norm = normalize_text(term) + if term_norm in lemma_cache: + return lemma_cache[term_norm] + if SPACY_AVAILABLE and nlp: + try: + doc = nlp(term_norm) + lemma = " ".join([t.lemma_ for t in doc]) + except Exception: + lemma = term_norm + else: + lemma = term_norm + lemma_cache[term_norm] = lemma + return lemma + +def compound_split(term): + if not term: + return [] + parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term) + if parts: + return parts + parts = [p for p in re.split(r'[-\s]+', term) if p] + return parts or [term] + +# ------------------------ +# NV_MASTER indexieren +# ------------------------ +def build_norm_index(nv_path): + norm_dict = {} + lemma_index = {} + if not PANDAS_AVAILABLE: + log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR") + return norm_dict, lemma_index + try: + sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") + except Exception as e: + log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR") + return norm_dict, lemma_index + + for sheet_name, df in sheets.items(): + if str(sheet_name).strip().lower() == "master": + continue + df = df.fillna("") + cols = [str(c).strip().lower() for c in df.columns] + # find id/word columns with fallback + id_col = None + word_col = None + for i, c in enumerate(cols): + if "id" in c: + id_col = df.columns[i] + if "wort" in c or "vokabel" in c: + word_col = df.columns[i] + if word_col is None and len(df.columns) >= 1: + word_col = df.columns[-1] + if id_col is None and len(df.columns) >= 1: + id_col = df.columns[0] + + current_parent_id = None + for _, row in df.iterrows(): + id_val = str(row[id_col]).strip() if id_col in df.columns else "" + word_val = str(row[word_col]).strip() if word_col in df.columns else "" + if id_val: + current_parent_id = id_val + if not word_val: + continue + norm_name = normalize_text(word_val) + lemma = lemmatize_term(word_val) + entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} + norm_dict.setdefault(norm_name, []).append(entry) + lemma_index.setdefault(lemma, []).append(entry) + + log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}", level="INFO") + return norm_dict, lemma_index + +# ------------------------ +# Fuzzy Matching +# ------------------------ +def fuzzy_score(a, b): + a = (a or "").lower() + b = (b or "").lower() + if RAPIDFUZZ_AVAILABLE: + try: + return fuzz.token_sort_ratio(a, b) / 100.0 + except Exception: + return 0.0 + else: + return SequenceMatcher(None, a, b).ratio() + +def get_suggestions(term_lemma, norm_dict, lemma_index, threshold=FUZZY_CUTOFF, max_sugs=6): + candidates = [] + term_norm = term_lemma or "" + for key_lemma, entries in lemma_index.items(): + if not key_lemma: + continue + score = fuzzy_score(term_norm, key_lemma) + if key_lemma.startswith(term_norm): + score = min(score + 0.08, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + # also check normalized names + for norm_key, entries in norm_dict.items(): + score = fuzzy_score(term_norm, norm_key) + if norm_key.startswith(term_norm): + score = min(score + 0.08, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + # sort & dedupe + candidates.sort(key=lambda t: t[0], reverse=True) + seen = set() + out = [] + for score, name, id_ in candidates: + key = (name, id_) + if key in seen: + continue + seen.add(key) + if id_: + out.append(f"{name} ({id_})") + else: + out.append(name) + if len(out) >= max_sugs: + break + return out + +# ------------------------ +# Mapping mit Cache +# ------------------------ +def map_term(term, norm_dict, lemma_index): + term_norm = normalize_text(term) + term_lemma = lemmatize_term(term) + if term_lemma in CACHE: + return CACHE[term_lemma] + + hits = [] + suggestions = [] + ids = [] + + # exact + if term_norm in norm_dict: + for e in norm_dict[term_norm]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + + # lemma + if not hits and term_lemma in lemma_index: + for e in lemma_index[term_lemma]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + + # suggestions only if no hit + if not hits: + suggestions = get_suggestions(term_lemma, norm_dict, lemma_index) + + # remove suggestions that are equal/contain hits + suggestions = [s for s in suggestions if not any(h.lower() in s.lower() for h in hits)] + + result = {"hits": hits, "suggestions": suggestions, "ids": ids} + CACHE[term_lemma] = result + return result + +# ------------------------ +# Button erstellen (sicher) +# ------------------------ +def add_macro_button(sheet): + try: + doc = XSCRIPTCONTEXT.getDocument() + except Exception: + log("add_macro_button: kein Dokument-Kontext", level="WARN") + return + try: + draw_page = sheet.DrawPage + # avoid duplicate + for shape in draw_page: + try: + if getattr(shape, "Name", "") == "MapperStartButton": + return + except Exception: + continue + + # create shape and button model + shape = doc.createInstance("com.sun.star.drawing.ControlShape") + shape.Name = "MapperStartButton" + shape.Position = uno.createUnoStruct("com.sun.star.awt.Point") + shape.Position.X = 1000 + shape.Position.Y = 200 + shape.Size = uno.createUnoStruct("com.sun.star.awt.Size") + shape.Size.Width = 3000 + shape.Size.Height = 1000 + + button_model = doc.createInstance("com.sun.star.form.component.CommandButton") + button_model.Label = "Start Mapping" + button_model.HelpText = "Startet das Mapping (run_mapper_macro)" + # assign macro via ActionCommand is not enough; user must link in UI; we add the control and label + + shape.Control = button_model + draw_page.add(shape) + log("Button 'MapperStartButton' erstellt.", level="INFO") + except Exception as e: + log(f"add_macro_button Fehler: {e}", level="ERROR") + +# ------------------------ +# Hauptlauf (ohne Listener) +# ------------------------ +def run_mapper_macro(): + try: + doc = XSCRIPTCONTEXT.getDocument() + set_paths_from_doc(doc) + log("=== mapper_macro gestartet ===", level="INFO") + sheet = doc.CurrentController.ActiveSheet + add_macro_button(sheet) + + # used area + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + dr = cursor.getRangeAddress() + + # find header and objekt col + header_row = None + objekt_col = None + for r in range(0, min(10, dr.EndRow + 1)): + for c in range(0, dr.EndColumn + 1): + try: + val = str(sheet.getCellByPosition(c, r).String).strip().lower() + except Exception: + val = "" + if val == "Objektbeschreibung": + header_row = r + objekt_col = c + break + if objekt_col is not None: + break + + if objekt_col is None: + log("run_mapper_macro: 'Objektbeschreibung' Header nicht gefunden.", level="ERROR") + return + + # ensure result cols + existing = {} + last_col = dr.EndColumn + for c in range(0, dr.EndColumn + 1): + try: + h = str(sheet.getCellByPosition(c, header_row).String).strip() + except Exception: + h = "" + if h == "Norm_Treffer": + existing["Norm_Treffer"] = c + if h == "Norm_Vorschlag": + existing["Norm_Vorschlag"] = c + if h == "Norm_ID": + existing["Norm_ID"] = c + + if "Norm_Treffer" not in existing: + last_col += 1 + existing["Norm_Treffer"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" + if "Norm_Vorschlag" not in existing: + last_col += 1 + existing["Norm_Vorschlag"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" + if "Norm_ID" not in existing: + last_col += 1 + existing["Norm_ID"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_ID" + + norm_tr_col = existing["Norm_Treffer"] + norm_sug_col = existing["Norm_Vorschlag"] + norm_id_col = existing["Norm_ID"] + + # build index + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + if not norm_dict and not lemma_index: + log("run_mapper_macro: NV_MASTER leer oder nicht lesbar.", level="ERROR") + return + + GREEN, YELLOW, RED = 0xADFF2F, 0xFFFF66, 0xFF9999 + rows_processed = 0 + + for r in range(header_row + 1, dr.EndRow + 1): + try: + cell = sheet.getCellByPosition(objekt_col, r) + txt = str(cell.String).strip() + if not txt: + continue + + # phrase-first: try entire cleaned phrase (remove stopwords) + tokens = [t.strip() for t in re.split(r'\s+', normalize_text(txt)) if t and t not in STOPWORDS] + phrase = " ".join(tokens).strip() + terms = [] + if phrase: + # first try phrase as whole + mapped_phrase = map_term(phrase, norm_dict, lemma_index) + if mapped_phrase["hits"] or mapped_phrase["suggestions"]: + # use phrase result (flatten hits+suggestions for output) + row_hits = mapped_phrase["hits"] + row_sugs = mapped_phrase["suggestions"] + row_ids = mapped_phrase["ids"] + any_unmapped = False if (row_hits or row_sugs) else True + else: + # fallback to token/compound processing + for p in [p for p in re.split(r'[,\s]+', txt) if p.strip()]: + if p.lower() in STOPWORDS or re.fullmatch(r'\d+', p): + continue + for sp in compound_split(p): + if sp and sp.strip(): + terms.append(sp.strip()) + row_hits = [] + row_sugs = [] + row_ids = [] + any_unmapped = False + for term in terms: + mapped = map_term(term, norm_dict, lemma_index) + hits, sugs, ids = mapped["hits"], mapped["suggestions"], mapped["ids"] + if hits: + row_hits.extend(hits) + if sugs: + row_sugs.extend(sugs) + if ids: + row_ids.extend(ids) + if not hits and not sugs: + any_unmapped = True + else: + row_hits, row_sugs, row_ids = [], [], [] + any_unmapped = True + + # dedupe preserving order + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + row_hits = uniq(row_hits) + row_sugs = uniq(row_sugs) + row_ids = uniq(row_ids) + + # write + sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits) + sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs) + sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids) + + cell.CellBackColor = RED if any_unmapped else 0xFFFFFF + sheet.getCellByPosition(norm_tr_col, r).CellBackColor = GREEN if row_hits else 0xFFFFFF + sheet.getCellByPosition(norm_sug_col, r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF + + rows_processed += 1 + except Exception as e: + log(f"Fehler in Zeile {r}: {e}", level="ERROR") + continue + + # persist cache file to DOC_DIR + try: + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + except Exception as e: + log(f"Cache speichern fehlgeschlagen: {e}", level="WARN") + + log(f"=== mapper_macro fertig. Zeilen verarbeitet: {rows_processed} ===", level="INFO") + except Exception as e: + # top-level safety + try: + log(f"run_mapper_macro: Unhandled exception: {e}", level="ERROR") + except Exception: + pass + +# ------------------------ +# Export +# ------------------------ +g_exportedScripts = (run_mapper_macro,) diff --git a/mapper_macro_2.0.py b/mapper_macro_2.0.py new file mode 100644 index 0000000..7c39076 --- /dev/null +++ b/mapper_macro_2.0.py @@ -0,0 +1,343 @@ +# -*- coding: utf-8 -*- +""" +LibreOffice Calc Makro: NV_MASTER-Abgleich (verbessertes semantisches Matching) +Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro/mapper_macro.py +""" + +import os +import re +import json +import traceback + +# ------------------------------------------------------------ +# LIBRARIES & MODELS +# ------------------------------------------------------------ +try: + import pandas as pd + PANDAS_AVAILABLE = True +except Exception: + PANDAS_AVAILABLE = False + +try: + import spacy + # Verwende das mittlere Modell für semantische Ähnlichkeit + nlp = spacy.load("de_core_news_md") + SPACY_AVAILABLE = True +except Exception: + SPACY_AVAILABLE = False + nlp = None + +try: + from rapidfuzz import fuzz + RAPIDFUZZ_AVAILABLE = True +except Exception: + RAPIDFUZZ_AVAILABLE = False + from difflib import SequenceMatcher + +# ------------------------------------------------------------ +# KONFIGURATION +# ------------------------------------------------------------ +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") +LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log") +CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json") + +STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"} +CONF_THRESHOLD = 0.70 # etwas großzügiger für semantisches Matching + +# ------------------------------------------------------------ +# LOGGING +# ------------------------------------------------------------ +def log(msg): + """Schreibt technische Logs ins Makroverzeichnis.""" + try: + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(msg.strip() + "\n") + except Exception: + pass + +log("Makro gestartet") + +# ------------------------------------------------------------ +# CACHE +# ------------------------------------------------------------ +try: + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, "r", encoding="utf-8") as f: + CACHE = json.load(f) + else: + CACHE = {} +except Exception: + CACHE = {} + +# ------------------------------------------------------------ +# TEXTNORMALISIERUNG & LEMMATISIERUNG +# ------------------------------------------------------------ +def normalize_text(s): + if not s: + return "" + s = str(s).strip().lower() + s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) + s = re.sub(r"\s+", " ", s) + return s + +lemma_cache = {} +def lemmatize_term(term): + t = normalize_text(term) + if t in lemma_cache: + return lemma_cache[t] + if SPACY_AVAILABLE and nlp: + try: + doc = nlp(t) + lemma = " ".join([token.lemma_ for token in doc]) + except Exception: + lemma = t + else: + lemma = t + lemma_cache[t] = lemma + return lemma + +# ------------------------------------------------------------ +# NV_MASTER LADEN +# ------------------------------------------------------------ +def build_norm_index(nv_path): + norm_dict = {} + lemma_index = {} + + if not PANDAS_AVAILABLE: + log("Pandas nicht verfügbar – NV_MASTER kann nicht geladen werden.") + return norm_dict, lemma_index + + try: + sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") + except Exception as e: + log(f"Fehler beim Laden von NV_MASTER: {e}") + return norm_dict, lemma_index + + for sheet_name, df in sheets.items(): + if str(sheet_name).strip().lower() == "master": + continue + df = df.fillna("") + cols = [str(c).strip().lower() for c in df.columns] + id_col = next((df.columns[i] for i, c in enumerate(cols) if "id" in c), df.columns[0]) + word_col = next((df.columns[i] for i, c in enumerate(cols) if "wort" in c or "vokabel" in c), df.columns[-1]) + + current_parent_id = None + for _, row in df.iterrows(): + id_val = str(row[id_col]).strip() + word_val = str(row[word_col]).strip() + if id_val: + current_parent_id = id_val + if not word_val: + continue + norm_name = normalize_text(word_val) + lemma = lemmatize_term(word_val) + entry = {"Name": word_val, "ID": current_parent_id or "", "Sheet": sheet_name} + norm_dict.setdefault(norm_name, []).append(entry) + lemma_index.setdefault(lemma, []).append(entry) + + log(f"NV_MASTER geladen: {sum(len(v) for v in norm_dict.values())} Begriffe.") + return norm_dict, lemma_index + +# ------------------------------------------------------------ +# SCORING: FUZZY + SEMANTISCH +# ------------------------------------------------------------ +def fuzzy_score(a, b): + if RAPIDFUZZ_AVAILABLE: + try: + return fuzz.token_set_ratio(a, b) / 100.0 + except Exception: + return 0.0 + else: + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + +def semantic_similarity(a, b): + if not SPACY_AVAILABLE or not hasattr(nlp.vocab, "vectors"): + return 0.0 + try: + doc_a, doc_b = nlp(a), nlp(b) + if doc_a.vector_norm and doc_b.vector_norm: + return float(doc_a.similarity(doc_b)) + return 0.0 + except Exception: + return 0.0 + +def combined_score(a, b): + sf = fuzzy_score(a, b) + ss = semantic_similarity(a, b) + return max(sf, ss) + +# ------------------------------------------------------------ +# MATCHING & VORSCHLÄGE +# ------------------------------------------------------------ +def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD): + candidates = [] + for key_lemma, entries in lemma_index.items(): + score = combined_score(term_lemma, key_lemma) + if key_lemma.startswith(term_lemma): + score = min(score + 0.05, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + for norm_key, entries in norm_dict.items(): + score = combined_score(term_lemma, norm_key) + if norm_key.startswith(term_lemma): + score = min(score + 0.05, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + candidates.sort(key=lambda x: x[0], reverse=True) + seen, results = set(), [] + for score, name, id_ in candidates: + key = (name.lower(), id_.lower() if id_ else "") + if key in seen: + continue + seen.add(key) + results.append({"score": score, "name": name, "id": id_}) + if len(results) >= top_n: + break + return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] + +def map_term_with_indexes(term, norm_dict, lemma_index): + term_norm = normalize_text(term) + term_lemma = lemmatize_term(term) + + if term_lemma in CACHE: + return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"], CACHE[term_lemma]["ids"] + + hits, suggestions, ids = [], [], [] + + if term_norm in norm_dict: + for e in norm_dict[term_norm]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + + if not hits and term_lemma in lemma_index: + for e in lemma_index[term_lemma]: + hits.append(e["Name"]) + if e["ID"]: + ids.append(e["ID"]) + + suggs = get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD) + filtered_suggs = [] + for s in suggs: + s_clean = normalize_text(s.split(" (")[0]) + if s_clean not in [normalize_text(h) for h in hits]: + filtered_suggs.append(s) + suggestions = filtered_suggs + + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + hits, suggestions, ids = uniq(hits), uniq(suggestions), uniq(ids) + CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids} + + log(f"TERM: {term} | HITS: {hits} | SUGGS: {suggestions}") + return hits, suggestions, ids + +# ------------------------------------------------------------ +# HAUPTMAKRO +# ------------------------------------------------------------ +def run_mapper_macro(): + try: + doc = XSCRIPTCONTEXT.getDocument() + sheet = doc.CurrentController.ActiveSheet + except Exception as e: + log(f"Fehler beim Zugriff auf Dokument: {e}") + return + + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + if not norm_dict: + log("Fehler: NV_MASTER leer oder nicht gefunden.") + return + + try: + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + used = cursor.getRangeAddress() + except Exception as e: + log(f"Cursor-Fehler: {e}") + return + + header_row = 0 + objekt_col = None + for c in range(0, used.EndColumn + 1): + val = str(sheet.getCellByPosition(c, header_row).String).strip().lower() + if val == "objektbeschreibung": + objekt_col = c + break + if objekt_col is None: + log("Keine Spalte 'Objektbeschreibung' gefunden.") + return + + existing = {} + for c in range(0, used.EndColumn + 1): + h = str(sheet.getCellByPosition(c, header_row).String).strip() + if h == "Norm_Treffer": existing["Norm_Treffer"] = c + if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c + if h == "Norm_ID": existing["Norm_ID"] = c + + last_col = used.EndColumn + for name in ["Norm_Treffer", "Norm_Vorschlag", "Norm_ID"]: + if name not in existing: + last_col += 1 + existing[name] = last_col + sheet.getCellByPosition(last_col, header_row).String = name + + GREEN, YELLOW, RED = 0xADFF2F, 0xFFD700, 0xCC0000 + norm_tr_col, norm_sug_col, norm_id_col = existing["Norm_Treffer"], existing["Norm_Vorschlag"], existing["Norm_ID"] + + rows = 0 + for r in range(header_row + 1, used.EndRow + 1): + txt = str(sheet.getCellByPosition(objekt_col, r).String).strip() + if not txt: + continue + terms = [t.strip() for t in re.split(r",|\s+", txt) if t.strip() and t.lower() not in STOPWORDS] + row_hits, row_sugs, row_ids, any_unmapped = [], [], [], False + for term in terms: + hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index) + if hits: row_hits.extend(hits) + if sugs: row_sugs.extend(sugs) + if ids: row_ids.extend(ids) + if not hits and not sugs: any_unmapped = True + + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + row_hits, row_sugs, row_ids = uniq(row_hits), uniq(row_sugs), uniq(row_ids) + sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits) + sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs) + sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids) + + obj_cell = sheet.getCellByPosition(objekt_col, r) + sug_cell = sheet.getCellByPosition(norm_sug_col, r) + tr_cell = sheet.getCellByPosition(norm_tr_col, r) + + if any_unmapped: + obj_cell.CellBackColor = RED + elif row_hits: + tr_cell.CellBackColor = GREEN + if row_sugs: + sug_cell.CellBackColor = YELLOW + + rows += 1 + + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + log(f"Makro abgeschlossen, {rows} Zeilen verarbeitet.") + +g_exportedScripts = (run_mapper_macro,) diff --git a/mapper_macro_2.1.py b/mapper_macro_2.1.py new file mode 100644 index 0000000..9be4afd --- /dev/null +++ b/mapper_macro_2.1.py @@ -0,0 +1,365 @@ +# -*- coding: utf-8 -*- +# LibreOffice Calc macro: NV_MASTER-Abgleich, Pandas+odf, Cache, Farben +# Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro/mapper_macro_2.1.py + +import os +import re +import json +import traceback + +# UNO-Context wird zur Laufzeit zur Verfügung gestellt (XSCRIPTCONTEXT) +try: + import pandas as pd + PANDAS_AVAILABLE = True +except Exception: + PANDAS_AVAILABLE = False + +try: + import spacy + nlp = spacy.load("de_core_news_sm") + SPACY_AVAILABLE = True +except Exception: + SPACY_AVAILABLE = False + nlp = None + +try: + from rapidfuzz import fuzz + RAPIDFUZZ_AVAILABLE = True +except Exception: + RAPIDFUZZ_AVAILABLE = False + from difflib import SequenceMatcher + +# ------------------------ +# Konfiguration +# ------------------------ +BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro" +NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") +LOG_FILE = os.path.join(BASE_DIR, "mapper_macro_2.1.log") +CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache_2.1.json") + +STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"} +CONF_THRESHOLD = 0.75 # Basis-Schwelle für Vorschläge + +# ------------------------ +# Utilities: Logging & safe I/O +# ------------------------ +def log(msg): + try: + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(msg + "\n") + except Exception: + pass + +# ------------------------ +# Cache laden +# ------------------------ +try: + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, "r", encoding="utf-8") as f: + CACHE = json.load(f) + else: + CACHE = {} +except Exception: + CACHE = {} + +# ------------------------ +# Text-Normalisierung & Lemma +# ------------------------ +def normalize_text(s): + if not s: + return "" + s = str(s).strip().lower() + s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) + s = re.sub(r"\s+", " ", s) + return s + +lemma_cache = {} +def lemmatize_term(term): + term_norm = normalize_text(term) + if term_norm in lemma_cache: + return lemma_cache[term_norm] + if SPACY_AVAILABLE and nlp: + try: + doc = nlp(term_norm) + lemma = " ".join([token.lemma_ for token in doc]) + except Exception: + lemma = term_norm + else: + lemma = term_norm + lemma_cache[term_norm] = lemma + return lemma + +# ------------------------ +# NV_MASTER robust laden (pandas + odf) +# ------------------------ +def build_norm_index(nv_path): + norm_dict = {} # normalized_name -> list of entries (Name, ID, Sheet) + lemma_index = {} # lemma -> list of entries + if not PANDAS_AVAILABLE: + log("Pandas nicht verfügbar. NV_MASTER kann nicht zuverlässig gelesen werden.") + return norm_dict, lemma_index + + try: + sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") + except Exception as e: + log(f"Fehler beim Einlesen von NV_MASTER mit pandas: {e}") + return norm_dict, lemma_index + + for sheet_name, df in sheets.items(): + if str(sheet_name).strip().lower() == "master": + continue + df = df.fillna("") + cols = [str(c).strip().lower() for c in df.columns] + id_col = None + word_col = None + for i, c in enumerate(cols): + if "id" in c: + id_col = df.columns[i] + if "wort" in c or "vokabel" in c: + word_col = df.columns[i] + if word_col is None and len(df.columns) >= 1: + word_col = df.columns[-1] + if id_col is None and len(df.columns) >= 1: + id_col = df.columns[0] + + current_parent_id = None + for _, row in df.iterrows(): + id_val = str(row[id_col]).strip() if id_col in df.columns else "" + word_val = str(row[word_col]).strip() if word_col in df.columns else "" + if id_val: + current_parent_id = id_val + if not word_val: + continue + norm_name = normalize_text(word_val) + lemma = lemmatize_term(word_val) + entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} + norm_dict.setdefault(norm_name, []).append(entry) + lemma_index.setdefault(lemma, []).append(entry) + + log(f"NV_MASTER geladen ({NV_MASTER_PATH}). Begriffe: {sum(len(v) for v in norm_dict.values())}") + return norm_dict, lemma_index + +# ------------------------ +# Matching: exakter Treffer, Lemma-Treffer, Fuzzy-Vorschläge +# ------------------------ +def fuzzy_score(a, b): + if RAPIDFUZZ_AVAILABLE: + try: + return fuzz.token_set_ratio(a, b) / 100.0 + except Exception: + return 0.0 + else: + try: + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + except Exception: + return 0.0 + +def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD): + candidates = [] + for key_lemma, entries in lemma_index.items(): + score = fuzzy_score(term_lemma, key_lemma) + if key_lemma.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + for norm_key, entries in norm_dict.items(): + score = fuzzy_score(term_lemma, norm_key) + if norm_key.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: + candidates.append((score, e["Name"], e["ID"])) + candidates.sort(key=lambda t: t[0], reverse=True) + seen = set() + results = [] + for score, name, id_ in candidates: + key = (name, id_) + if key in seen: + continue + seen.add(key) + results.append({"score": score, "name": name, "id": id_}) + return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] + +def map_term_with_indexes(term, norm_dict, lemma_index): + term_norm = normalize_text(term) + term_lemma = lemmatize_term(term) + if term_lemma in CACHE: + return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"] + + hits = [] + suggestions = [] + + if term_norm in norm_dict: + for e in norm_dict[term_norm]: + hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"]) + + if not hits and term_lemma in lemma_index: + for e in lemma_index[term_lemma]: + hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"]) + + if not hits: + suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index) + + def unique_preserve(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + hits = unique_preserve(hits) + suggestions = unique_preserve(suggestions) + + CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions} + return hits, suggestions + +# ------------------------ +# Haupt-Makro +# ------------------------ +def run_mapper_macro(): + try: + doc = XSCRIPTCONTEXT.getDocument() + sheet = doc.CurrentController.ActiveSheet + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + data_range = cursor.getRangeAddress() + except Exception as e: + log("Fehler: konnte Dokument/Sheet nicht öffnen: " + str(e)) + return + + header_row = None + objekt_col = None + max_col = data_range.EndColumn + for r in range(0, min(5, data_range.EndRow+1)): + for c in range(0, max_col+1): + try: + val = str(sheet.getCellByPosition(c, r).String).strip().lower() + except Exception: + val = "" + if val == "objektbeschreibung": + header_row = r + objekt_col = c + break + if objekt_col is not None: + break + + if objekt_col is None: + log("Spalte 'Objektbeschreibung' nicht gefunden. Abbruch.") + return + + # Prüfen/Anlegen der Ergebnis-Spalten + existing = {} + for c in range(0, data_range.EndColumn+1): + try: + h = str(sheet.getCellByPosition(c, header_row).String).strip() + except Exception: + h = "" + if h == "Norm_Treffer": + existing["Norm_Treffer"] = c + if h == "Norm_Vorschlag": + existing["Norm_Vorschlag"] = c + + last_col = data_range.EndColumn + if "Norm_Treffer" not in existing: + last_col += 1 + existing["Norm_Treffer"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" + if "Norm_Vorschlag" not in existing: + last_col += 1 + existing["Norm_Vorschlag"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" + + norm_tr_col = existing["Norm_Treffer"] + norm_sug_col = existing["Norm_Vorschlag"] + + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + if not norm_dict and not lemma_index: + log("NV_MASTER leer oder nicht lesbar. Abbruch.") + return + + GREEN = 0xADFF2F + YELLOW = 0xFFA500 + RED = 0xCC0000 + WHITE = 0xFFFFFF + + rows_processed = 0 + for r in range(header_row + 1, data_range.EndRow + 1): + try: + cell = sheet.getCellByPosition(objekt_col, r) + txt = str(cell.String).strip() + if not txt: + continue + + clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] + terms = [] + for cl in clauses: + parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()] + for p in parts: + if p.lower() in STOPWORDS: + continue + if re.fullmatch(r"\d+", p): + continue + terms.append(p) + + row_hits = [] + row_sugs = [] + unmapped_terms = [] + + for term in terms: + hits, sugs = map_term_with_indexes(term, norm_dict, lemma_index) + if hits: + row_hits.extend(hits) + else: + unmapped_terms.append(term) + if sugs: + row_sugs.extend(sugs) + + def uniq(seq): + seen = set() + out = [] + for x in seq: + if x not in seen: + seen.add(x) + out.append(x) + return out + + row_hits = uniq(row_hits) + row_sugs = uniq(row_sugs) + + # Farb-Logik für Objektbeschreibung + if terms and not unmapped_terms and row_hits: + cell.CellBackColor = GREEN + row_sugs = [] + elif row_hits: + cell.CellBackColor = YELLOW + else: + cell.CellBackColor = RED + + # Ergebniszellen + tr_cell = sheet.getCellByPosition(norm_tr_col, r) + tr_cell.String = " | ".join(row_hits) + tr_cell.CellBackColor = GREEN if row_hits else WHITE + + sug_cell = sheet.getCellByPosition(norm_sug_col, r) + sug_cell.String = " | ".join(row_sugs) + sug_cell.CellBackColor = YELLOW if row_sugs else WHITE + + rows_processed += 1 + + except Exception as e: + log(f"Fehler in Zeile {r}: {e}\n{traceback.format_exc()}") + + try: + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + except Exception: + pass + + log(f"run_mapper_macro fertig. Zeilen verarbeitet: {rows_processed}") + +# Export für LibreOffice +g_exportedScripts = (run_mapper_macro,) diff --git a/mapper_macro_2.2.py b/mapper_macro_2.2.py new file mode 100644 index 0000000..dbbe503 --- /dev/null +++ b/mapper_macro_2.2.py @@ -0,0 +1,353 @@ +# -*- coding: utf-8 -*- +# LibreOffice / Excel macro: NV_MASTER-Abgleich, Pandas+odf, Cache, Farben +# Version: 2.2 +# Speicherort (Linux/Windows automatisch erkannt) + +""" +Mapper Macro 2.2 +================ +Dieses Makro liest die Spalte 'Objektbeschreibung' im aktiven Sheet und versucht, +jedes Wort einem Eintrag im Normvokabular zuzuordnen. + +Features: +- Direkte Treffer werden unter "Norm_Treffer" gelistet (mit ID in Klammern) +- Vorschläge (Fuzzy Matching) werden unter "Norm_Vorschlag" gelistet +- Farbregeln: + * Grün: Alle Begriffe in der Zeile haben direkte Treffer + * Gelb: Mindestens ein Begriff hat Treffer, aber nicht alle + * Rot: Kein Treffer für alle Begriffe +- Logging aller Schritte in mapper_macro_2.2.log (selbes Verzeichnis wie Makro) +- Cache für bereits gematchte Begriffe +- OS-Erkennung (Linux/Windows) und automatische Pfadwahl +- Unterstützt LibreOffice und Excel (pandas für .ods/.xlsx) +""" + +import os +import re +import json +import traceback +import platform + +# ------------------------ +# OS-basierte Pfade +# ------------------------ +if platform.system().lower().startswith("win"): + BASE_DIR = os.path.join(os.environ["APPDATA"], "LibreOffice", "4", "user", "Scripts", "python", "Vokabular_Abgleich_Makro") +else: + BASE_DIR = os.path.expanduser("~/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro") + +NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") +LOG_FILE = os.path.join(BASE_DIR, "mapper_macro_2.2.log") +CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache_2.2.json") + +# Verzeichnis ggf. anlegen +os.makedirs(BASE_DIR, exist_ok=True) + +# ------------------------ +# Abhängigkeiten +# ------------------------ +try: + import pandas as pd + PANDAS_AVAILABLE = True +except Exception: + PANDAS_AVAILABLE = False + +try: + import spacy + nlp = spacy.load("de_core_news_sm") + SPACY_AVAILABLE = True +except Exception: + SPACY_AVAILABLE = False + nlp = None + +try: + from rapidfuzz import fuzz + RAPIDFUZZ_AVAILABLE = True +except Exception: + RAPIDFUZZ_AVAILABLE = False + from difflib import SequenceMatcher + +# ------------------------ +# Konfiguration +# ------------------------ +STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"} +CONF_THRESHOLD = 0.75 # Basis-Schwelle für Vorschläge + +# ------------------------ +# Logging +# ------------------------ +def log(msg): + try: + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(msg + "\n") + except Exception: + pass + +# ------------------------ +# Cache laden +# ------------------------ +try: + if os.path.exists(CACHE_FILE): + with open(CACHE_FILE, "r", encoding="utf-8") as f: + CACHE = json.load(f) + else: + CACHE = {} +except Exception: + CACHE = {} + +# ------------------------ +# Text-Normalisierung & Lemma +# ------------------------ +def normalize_text(s): + if not s: + return "" + s = str(s).strip().lower() + s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) + s = re.sub(r"\s+", " ", s) + return s + +lemma_cache = {} +def lemmatize_term(term): + term_norm = normalize_text(term) + if term_norm in lemma_cache: + return lemma_cache[term_norm] + if SPACY_AVAILABLE and nlp: + try: + doc = nlp(term_norm) + lemma = " ".join([token.lemma_ for token in doc]) + except Exception: + lemma = term_norm + else: + lemma = term_norm + lemma_cache[term_norm] = lemma + return lemma + +# ------------------------ +# NV_MASTER laden +# ------------------------ +def build_norm_index(nv_path): + norm_dict = {} + lemma_index = {} + if not PANDAS_AVAILABLE: + log("Pandas nicht verfügbar. NV_MASTER kann nicht gelesen werden.") + return norm_dict, lemma_index + try: + sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") + except Exception as e: + log(f"Fehler beim Einlesen von NV_MASTER: {e}") + return norm_dict, lemma_index + for sheet_name, df in sheets.items(): + if str(sheet_name).strip().lower() == "master": + continue + df = df.fillna("") + cols = [str(c).strip().lower() for c in df.columns] + id_col = None + word_col = None + for i, c in enumerate(cols): + if "id" in c: id_col = df.columns[i] + if "wort" in c or "vokabel" in c: word_col = df.columns[i] + if word_col is None and len(df.columns) >= 1: word_col = df.columns[-1] + if id_col is None and len(df.columns) >= 1: id_col = df.columns[0] + current_parent_id = None + for _, row in df.iterrows(): + id_val = str(row[id_col]).strip() if id_col in df.columns else "" + word_val = str(row[word_col]).strip() if word_col in df.columns else "" + if id_val: current_parent_id = id_val + if not word_val: continue + norm_name = normalize_text(word_val) + lemma = lemmatize_term(word_val) + entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} + norm_dict.setdefault(norm_name, []).append(entry) + lemma_index.setdefault(lemma, []).append(entry) + log(f"NV_MASTER geladen ({NV_MASTER_PATH}). Begriffe: {sum(len(v) for v in norm_dict.values())}") + return norm_dict, lemma_index + +# ------------------------ +# Matching +# ------------------------ +def fuzzy_score(a, b): + if RAPIDFUZZ_AVAILABLE: + try: + return fuzz.token_set_ratio(a, b) / 100.0 + except Exception: + return 0.0 + else: + try: + return SequenceMatcher(None, a.lower(), b.lower()).ratio() + except Exception: + return 0.0 + +def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD): + candidates = [] + for key_lemma, entries in lemma_index.items(): + score = fuzzy_score(term_lemma, key_lemma) + if key_lemma.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: candidates.append((score, e["Name"], e["ID"])) + for norm_key, entries in norm_dict.items(): + score = fuzzy_score(term_lemma, norm_key) + if norm_key.startswith(term_lemma): + score = min(score + 0.1, 1.0) + if score >= threshold: + for e in entries: candidates.append((score, e["Name"], e["ID"])) + candidates.sort(key=lambda t: t[0], reverse=True) + seen = set() + results = [] + for score, name, id_ in candidates: + key = (name, id_) + if key in seen: continue + seen.add(key) + results.append({"score": score, "name": name, "id": id_}) + return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] + +def map_term_with_indexes(term, norm_dict, lemma_index): + term_norm = normalize_text(term) + term_lemma = lemmatize_term(term) + if term_lemma in CACHE: + cache_entry = CACHE[term_lemma] + hits = cache_entry.get("hits", []) + suggestions = cache_entry.get("suggestions", []) + return hits, suggestions + hits = [] + suggestions = [] + if term_norm in norm_dict: + for e in norm_dict[term_norm]: hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"]) + if not hits and term_lemma in lemma_index: + for e in lemma_index[term_lemma]: hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"]) + if not hits: + suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index) + # deduplicate + hits = list(dict.fromkeys(hits)) + suggestions = list(dict.fromkeys(suggestions)) + CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions} + return hits, suggestions + +# ------------------------ +# Haupt-Makro +# ------------------------ +def run_mapper_macro(): + try: + doc = XSCRIPTCONTEXT.getDocument() + sheet = doc.CurrentController.ActiveSheet + cursor = sheet.createCursor() + cursor.gotoStartOfUsedArea(False) + cursor.gotoEndOfUsedArea(True) + data_range = cursor.getRangeAddress() + except Exception as e: + log("Fehler: konnte Dokument/Sheet nicht öffnen: " + str(e)) + return + + header_row = None + objekt_col = None + max_col = data_range.EndColumn + for r in range(0, min(5, data_range.EndRow+1)): + for c in range(0, max_col+1): + try: + val = str(sheet.getCellByPosition(c, r).String).strip().lower() + except Exception: + val = "" + if val == "objektbeschreibung": + header_row = r + objekt_col = c + break + if objekt_col is not None: + break + if objekt_col is None: + log("Spalte 'Objektbeschreibung' nicht gefunden. Abbruch.") + return + + existing = {} + for c in range(0, data_range.EndColumn+1): + try: + h = str(sheet.getCellByPosition(c, header_row).String).strip() + except Exception: + h = "" + if h == "Norm_Treffer": existing["Norm_Treffer"] = c + if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c + + last_col = data_range.EndColumn + if "Norm_Treffer" not in existing: + last_col += 1 + existing["Norm_Treffer"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" + if "Norm_Vorschlag" not in existing: + last_col += 1 + existing["Norm_Vorschlag"] = last_col + sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" + + norm_tr_col = existing["Norm_Treffer"] + norm_sug_col = existing["Norm_Vorschlag"] + + norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) + if not norm_dict and not lemma_index: + log("NV_MASTER leer oder nicht lesbar. Abbruch.") + return + + GREEN = 0xADFF2F + YELLOW = 0xFFA500 + RED = 0xCC0000 + WHITE = 0xFFFFFF + + rows_processed = 0 + for r in range(header_row + 1, data_range.EndRow + 1): + try: + cell = sheet.getCellByPosition(objekt_col, r) + txt = str(cell.String).strip() + if not txt: continue + + clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] + terms = [] + for cl in clauses: + parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()] + for p in parts: + if p.lower() in STOPWORDS: continue + if re.fullmatch(r"\d+", p): continue + terms.append(p) + + row_hits = [] + row_sugs = [] + unmapped_terms = [] + + for term in terms: + hits, sugs = map_term_with_indexes(term, norm_dict, lemma_index) + if hits: row_hits.extend(hits) + else: + unmapped_terms.append(term) + if sugs: row_sugs.extend(sugs) + + row_hits = list(dict.fromkeys(row_hits)) + row_sugs = list(dict.fromkeys(row_sugs)) + + # Farblogik für Objektbeschreibung + if terms and not unmapped_terms and row_hits: + cell.CellBackColor = GREEN + row_sugs = [] + elif row_hits: + cell.CellBackColor = YELLOW + else: + cell.CellBackColor = RED + + tr_cell = sheet.getCellByPosition(norm_tr_col, r) + tr_cell.String = " | ".join(row_hits) + tr_cell.CellBackColor = GREEN if row_hits else WHITE + + sug_cell = sheet.getCellByPosition(norm_sug_col, r) + sug_cell.String = " | ".join(row_sugs) + sug_cell.CellBackColor = YELLOW if row_sugs else WHITE + + rows_processed += 1 + + except Exception as e: + log(f"Fehler in Zeile {r}: {e}\n{traceback.format_exc()}") + + try: + with open(CACHE_FILE, "w", encoding="utf-8") as f: + json.dump(CACHE, f, ensure_ascii=False, indent=2) + except Exception: + pass + + log(f"run_mapper_macro 2.2 fertig. Zeilen verarbeitet: {rows_processed}") + +# Export für LibreOffice +g_exportedScripts = (run_mapper_macro,)