From 94e62cc26732ff8ad5e669c5b7e48fee20f5f904 Mon Sep 17 00:00:00 2001 From: gumuArnold Date: Thu, 16 Oct 2025 13:34:54 +0000 Subject: [PATCH] Delete mapper_macro_1.5.py --- mapper_macro_1.5.py | 508 -------------------------------------------- 1 file changed, 508 deletions(-) delete mode 100644 mapper_macro_1.5.py diff --git a/mapper_macro_1.5.py b/mapper_macro_1.5.py deleted file mode 100644 index 9ed712f..0000000 --- a/mapper_macro_1.5.py +++ /dev/null @@ -1,508 +0,0 @@ -# -*- coding: utf-8 -*- -# mapper_macro 1.5 - korrigiert: Logging im Dokumentverzeichnis, stabile Button-Erstellung, -# keine Listener, optimiertes Mapping (ohne Listener-Teil) - -import os -import re -import json -import datetime - -# optionale Module (Pandas, Spacy, RapidFuzz) -try: - import pandas as pd - PANDAS_AVAILABLE = True -except Exception: - PANDAS_AVAILABLE = False - -try: - import spacy - nlp = spacy.load("de_core_news_sm") - SPACY_AVAILABLE = True -except Exception: - SPACY_AVAILABLE = False - nlp = None - -try: - from rapidfuzz import fuzz - RAPIDFUZZ_AVAILABLE = True -except Exception: - RAPIDFUZZ_AVAILABLE = False - -from difflib import SequenceMatcher - -# UNO (für Button/Paths) -try: - import uno -except Exception: - uno = None - -# ------------------------ -# Konfiguration (Fallback-BASE_DIR) -# ------------------------ -BASE_DIR = os.path.expanduser("~/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro") -NV_MASTER_FILENAME = "NV_MASTER.ods" -CACHE_FILENAME = "mapper_cache.json" -LOG_FILENAME = "mapper_macro.log" - -STOPWORDS = { - "mit", "ohne", "der", "die", "das", "ein", "eine", "und", "zu", "von", "im", "in", "auf", "an", - "als", "bei", "für", "aus", "dem", "den", "des", "eines", "einer" -} -CONF_THRESHOLD = 0.82 -FUZZY_CUTOFF = 0.88 - -# Per-document paths (initialized by set_paths_from_doc) -DOC_DIR = BASE_DIR -NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME) -CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME) -LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME) - -# in-memory cache -try: - if os.path.exists(CACHE_FILE): - with open(CACHE_FILE, "r", encoding="utf-8") as f: - CACHE = json.load(f) - else: - CACHE = {} -except Exception: - CACHE = {} - -# ------------------------ -# Pfade im Dokument setzen -# ------------------------ -def set_paths_from_doc(doc): - global DOC_DIR, NV_MASTER_PATH, CACHE_FILE, LOG_FILE - try: - url = getattr(doc, "URL", "") - if url and url.strip(): - # UNO liefert file:///... - try: - system_path = uno.fileUrlToSystemPath(url) - except Exception: - # fallback: try simple unquote - from urllib.parse import unquote, urlparse - parsed = urlparse(url) - if parsed.scheme == "file": - system_path = unquote(parsed.path) - else: - system_path = "" - if system_path: - d = os.path.dirname(system_path) - if os.path.isdir(d): - DOC_DIR = d - except Exception: - DOC_DIR = BASE_DIR - NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME) - CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME) - LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME) - -# ------------------------ -# Logging (Dokumentdir, robust) -# ------------------------ -def log(msg, level="INFO"): - ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - line = f"[{ts}] [{level}] {msg}\n" - try: - # ensure directory exists - os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) - with open(LOG_FILE, "a", encoding="utf-8") as f: - f.write(line) - except Exception: - # absolute fallback: try writing into BASE_DIR - try: - fallback = os.path.join(BASE_DIR, LOG_FILENAME) - os.makedirs(os.path.dirname(fallback), exist_ok=True) - with open(fallback, "a", encoding="utf-8") as f: - f.write(line) - except Exception: - # last resort: silent - pass - -# ------------------------ -# Textvorbereitung & Helpers -# ------------------------ -lemma_cache = {} - -def normalize_text(s): - if not s: - return "" - s = str(s).strip().lower() - s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) - s = re.sub(r"\s+", " ", s) - return s - -def lemmatize_term(term): - term_norm = normalize_text(term) - if term_norm in lemma_cache: - return lemma_cache[term_norm] - if SPACY_AVAILABLE and nlp: - try: - doc = nlp(term_norm) - lemma = " ".join([t.lemma_ for t in doc]) - except Exception: - lemma = term_norm - else: - lemma = term_norm - lemma_cache[term_norm] = lemma - return lemma - -def compound_split(term): - if not term: - return [] - parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term) - if parts: - return parts - parts = [p for p in re.split(r'[-\s]+', term) if p] - return parts or [term] - -# ------------------------ -# NV_MASTER indexieren -# ------------------------ -def build_norm_index(nv_path): - norm_dict = {} - lemma_index = {} - if not PANDAS_AVAILABLE: - log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR") - return norm_dict, lemma_index - try: - sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") - except Exception as e: - log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR") - return norm_dict, lemma_index - - for sheet_name, df in sheets.items(): - if str(sheet_name).strip().lower() == "master": - continue - df = df.fillna("") - cols = [str(c).strip().lower() for c in df.columns] - # find id/word columns with fallback - id_col = None - word_col = None - for i, c in enumerate(cols): - if "id" in c: - id_col = df.columns[i] - if "wort" in c or "vokabel" in c: - word_col = df.columns[i] - if word_col is None and len(df.columns) >= 1: - word_col = df.columns[-1] - if id_col is None and len(df.columns) >= 1: - id_col = df.columns[0] - - current_parent_id = None - for _, row in df.iterrows(): - id_val = str(row[id_col]).strip() if id_col in df.columns else "" - word_val = str(row[word_col]).strip() if word_col in df.columns else "" - if id_val: - current_parent_id = id_val - if not word_val: - continue - norm_name = normalize_text(word_val) - lemma = lemmatize_term(word_val) - entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} - norm_dict.setdefault(norm_name, []).append(entry) - lemma_index.setdefault(lemma, []).append(entry) - - log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}", level="INFO") - return norm_dict, lemma_index - -# ------------------------ -# Fuzzy Matching -# ------------------------ -def fuzzy_score(a, b): - a = (a or "").lower() - b = (b or "").lower() - if RAPIDFUZZ_AVAILABLE: - try: - return fuzz.token_sort_ratio(a, b) / 100.0 - except Exception: - return 0.0 - else: - return SequenceMatcher(None, a, b).ratio() - -def get_suggestions(term_lemma, norm_dict, lemma_index, threshold=FUZZY_CUTOFF, max_sugs=6): - candidates = [] - term_norm = term_lemma or "" - for key_lemma, entries in lemma_index.items(): - if not key_lemma: - continue - score = fuzzy_score(term_norm, key_lemma) - if key_lemma.startswith(term_norm): - score = min(score + 0.08, 1.0) - if score >= threshold: - for e in entries: - candidates.append((score, e["Name"], e["ID"])) - # also check normalized names - for norm_key, entries in norm_dict.items(): - score = fuzzy_score(term_norm, norm_key) - if norm_key.startswith(term_norm): - score = min(score + 0.08, 1.0) - if score >= threshold: - for e in entries: - candidates.append((score, e["Name"], e["ID"])) - # sort & dedupe - candidates.sort(key=lambda t: t[0], reverse=True) - seen = set() - out = [] - for score, name, id_ in candidates: - key = (name, id_) - if key in seen: - continue - seen.add(key) - if id_: - out.append(f"{name} ({id_})") - else: - out.append(name) - if len(out) >= max_sugs: - break - return out - -# ------------------------ -# Mapping mit Cache -# ------------------------ -def map_term(term, norm_dict, lemma_index): - term_norm = normalize_text(term) - term_lemma = lemmatize_term(term) - if term_lemma in CACHE: - return CACHE[term_lemma] - - hits = [] - suggestions = [] - ids = [] - - # exact - if term_norm in norm_dict: - for e in norm_dict[term_norm]: - hits.append(e["Name"]) - if e["ID"]: - ids.append(e["ID"]) - - # lemma - if not hits and term_lemma in lemma_index: - for e in lemma_index[term_lemma]: - hits.append(e["Name"]) - if e["ID"]: - ids.append(e["ID"]) - - # suggestions only if no hit - if not hits: - suggestions = get_suggestions(term_lemma, norm_dict, lemma_index) - - # remove suggestions that are equal/contain hits - suggestions = [s for s in suggestions if not any(h.lower() in s.lower() for h in hits)] - - result = {"hits": hits, "suggestions": suggestions, "ids": ids} - CACHE[term_lemma] = result - return result - -# ------------------------ -# Button erstellen (sicher) -# ------------------------ -def add_macro_button(sheet): - try: - doc = XSCRIPTCONTEXT.getDocument() - except Exception: - log("add_macro_button: kein Dokument-Kontext", level="WARN") - return - try: - draw_page = sheet.DrawPage - # avoid duplicate - for shape in draw_page: - try: - if getattr(shape, "Name", "") == "MapperStartButton": - return - except Exception: - continue - - # create shape and button model - shape = doc.createInstance("com.sun.star.drawing.ControlShape") - shape.Name = "MapperStartButton" - shape.Position = uno.createUnoStruct("com.sun.star.awt.Point") - shape.Position.X = 1000 - shape.Position.Y = 200 - shape.Size = uno.createUnoStruct("com.sun.star.awt.Size") - shape.Size.Width = 3000 - shape.Size.Height = 1000 - - button_model = doc.createInstance("com.sun.star.form.component.CommandButton") - button_model.Label = "Start Mapping" - button_model.HelpText = "Startet das Mapping (run_mapper_macro)" - # assign macro via ActionCommand is not enough; user must link in UI; we add the control and label - - shape.Control = button_model - draw_page.add(shape) - log("Button 'MapperStartButton' erstellt.", level="INFO") - except Exception as e: - log(f"add_macro_button Fehler: {e}", level="ERROR") - -# ------------------------ -# Hauptlauf (ohne Listener) -# ------------------------ -def run_mapper_macro(): - try: - doc = XSCRIPTCONTEXT.getDocument() - set_paths_from_doc(doc) - log("=== mapper_macro gestartet ===", level="INFO") - sheet = doc.CurrentController.ActiveSheet - add_macro_button(sheet) - - # used area - cursor = sheet.createCursor() - cursor.gotoStartOfUsedArea(False) - cursor.gotoEndOfUsedArea(True) - dr = cursor.getRangeAddress() - - # find header and objekt col - header_row = None - objekt_col = None - for r in range(0, min(10, dr.EndRow + 1)): - for c in range(0, dr.EndColumn + 1): - try: - val = str(sheet.getCellByPosition(c, r).String).strip().lower() - except Exception: - val = "" - if val == "Objektbeschreibung": - header_row = r - objekt_col = c - break - if objekt_col is not None: - break - - if objekt_col is None: - log("run_mapper_macro: 'Objektbeschreibung' Header nicht gefunden.", level="ERROR") - return - - # ensure result cols - existing = {} - last_col = dr.EndColumn - for c in range(0, dr.EndColumn + 1): - try: - h = str(sheet.getCellByPosition(c, header_row).String).strip() - except Exception: - h = "" - if h == "Norm_Treffer": - existing["Norm_Treffer"] = c - if h == "Norm_Vorschlag": - existing["Norm_Vorschlag"] = c - if h == "Norm_ID": - existing["Norm_ID"] = c - - if "Norm_Treffer" not in existing: - last_col += 1 - existing["Norm_Treffer"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" - if "Norm_Vorschlag" not in existing: - last_col += 1 - existing["Norm_Vorschlag"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" - if "Norm_ID" not in existing: - last_col += 1 - existing["Norm_ID"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_ID" - - norm_tr_col = existing["Norm_Treffer"] - norm_sug_col = existing["Norm_Vorschlag"] - norm_id_col = existing["Norm_ID"] - - # build index - norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) - if not norm_dict and not lemma_index: - log("run_mapper_macro: NV_MASTER leer oder nicht lesbar.", level="ERROR") - return - - GREEN, YELLOW, RED = 0xADFF2F, 0xFFFF66, 0xFF9999 - rows_processed = 0 - - for r in range(header_row + 1, dr.EndRow + 1): - try: - cell = sheet.getCellByPosition(objekt_col, r) - txt = str(cell.String).strip() - if not txt: - continue - - # phrase-first: try entire cleaned phrase (remove stopwords) - tokens = [t.strip() for t in re.split(r'\s+', normalize_text(txt)) if t and t not in STOPWORDS] - phrase = " ".join(tokens).strip() - terms = [] - if phrase: - # first try phrase as whole - mapped_phrase = map_term(phrase, norm_dict, lemma_index) - if mapped_phrase["hits"] or mapped_phrase["suggestions"]: - # use phrase result (flatten hits+suggestions for output) - row_hits = mapped_phrase["hits"] - row_sugs = mapped_phrase["suggestions"] - row_ids = mapped_phrase["ids"] - any_unmapped = False if (row_hits or row_sugs) else True - else: - # fallback to token/compound processing - for p in [p for p in re.split(r'[,\s]+', txt) if p.strip()]: - if p.lower() in STOPWORDS or re.fullmatch(r'\d+', p): - continue - for sp in compound_split(p): - if sp and sp.strip(): - terms.append(sp.strip()) - row_hits = [] - row_sugs = [] - row_ids = [] - any_unmapped = False - for term in terms: - mapped = map_term(term, norm_dict, lemma_index) - hits, sugs, ids = mapped["hits"], mapped["suggestions"], mapped["ids"] - if hits: - row_hits.extend(hits) - if sugs: - row_sugs.extend(sugs) - if ids: - row_ids.extend(ids) - if not hits and not sugs: - any_unmapped = True - else: - row_hits, row_sugs, row_ids = [], [], [] - any_unmapped = True - - # dedupe preserving order - def uniq(seq): - seen = set() - out = [] - for x in seq: - if x not in seen: - seen.add(x) - out.append(x) - return out - - row_hits = uniq(row_hits) - row_sugs = uniq(row_sugs) - row_ids = uniq(row_ids) - - # write - sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits) - sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs) - sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids) - - cell.CellBackColor = RED if any_unmapped else 0xFFFFFF - sheet.getCellByPosition(norm_tr_col, r).CellBackColor = GREEN if row_hits else 0xFFFFFF - sheet.getCellByPosition(norm_sug_col, r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF - - rows_processed += 1 - except Exception as e: - log(f"Fehler in Zeile {r}: {e}", level="ERROR") - continue - - # persist cache file to DOC_DIR - try: - with open(CACHE_FILE, "w", encoding="utf-8") as f: - json.dump(CACHE, f, ensure_ascii=False, indent=2) - except Exception as e: - log(f"Cache speichern fehlgeschlagen: {e}", level="WARN") - - log(f"=== mapper_macro fertig. Zeilen verarbeitet: {rows_processed} ===", level="INFO") - except Exception as e: - # top-level safety - try: - log(f"run_mapper_macro: Unhandled exception: {e}", level="ERROR") - except Exception: - pass - -# ------------------------ -# Export -# ------------------------ -g_exportedScripts = (run_mapper_macro,)