From 41ad23e8df2d320ebe766558e0c150f41fd03665 Mon Sep 17 00:00:00 2001 From: gumuArnold Date: Thu, 16 Oct 2025 13:34:49 +0000 Subject: [PATCH] Delete mapper_macro_1.4.py --- mapper_macro_1.4.py | 469 -------------------------------------------- 1 file changed, 469 deletions(-) delete mode 100644 mapper_macro_1.4.py diff --git a/mapper_macro_1.4.py b/mapper_macro_1.4.py deleted file mode 100644 index a22c231..0000000 --- a/mapper_macro_1.4.py +++ /dev/null @@ -1,469 +0,0 @@ -# -*- coding: utf-8 -*- -# mapper_macro 1.5 - LibreOffice Calc -# Features: Kompositum-Split, Cache, Live-Vorschläge nur auf 'Objektbeschreibung', Logging - -import os -import re -import json -import datetime - -# optional imports (Pandas, Spacy, RapidFuzz) -try: - import pandas as pd - PANDAS_AVAILABLE = True -except Exception: - PANDAS_AVAILABLE = False - -try: - import spacy - nlp = spacy.load("de_core_news_sm") - SPACY_AVAILABLE = True -except Exception: - SPACY_AVAILABLE = False - nlp = None - -try: - from rapidfuzz import fuzz - RAPIDFUZZ_AVAILABLE = True -except Exception: - RAPIDFUZZ_AVAILABLE = False - from difflib import SequenceMatcher - -# ------------------------ -# Konfiguration -# ------------------------ -BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro" -NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods") -CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json") -LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log") - -STOPWORDS = { - "mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an", - "als","bei","für","aus","dem","den","des","eines","einer" -} -CONF_THRESHOLD = 0.75 - -# ------------------------ -# Logging -# ------------------------ -def log(msg, level="INFO"): - ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - line = f"[{ts}] [{level}] {msg}\n" - try: - os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True) - with open(LOG_FILE, "a", encoding="utf-8") as f: - f.write(line) - except Exception: - pass - -# ------------------------ -# Cache laden -# ------------------------ -try: - if os.path.exists(CACHE_FILE): - with open(CACHE_FILE, "r", encoding="utf-8") as f: - CACHE = json.load(f) - else: - CACHE = {} -except Exception as e: - CACHE = {} - log(f"Fehler beim Laden des Caches: {e}", level="ERROR") - -# ------------------------ -# Textnormalisierung & Lemma -# ------------------------ -lemma_cache = {} - -def normalize_text(s): - if not s: - return "" - s = str(s).strip().lower() - s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s) - s = re.sub(r"\s+", " ", s) - return s - -def lemmatize_term(term): - term_norm = normalize_text(term) - if term_norm in lemma_cache: - return lemma_cache[term_norm] - if SPACY_AVAILABLE and nlp: - try: - doc = nlp(term_norm) - lemma = " ".join([t.lemma_ for t in doc]) - except Exception: - lemma = term_norm - else: - lemma = term_norm - lemma_cache[term_norm] = lemma - return lemma - -# ------------------------ -# Kompositum-Splitting -# ------------------------ -def compound_split(term): - if not term: - return [] - parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term) - if parts: - return parts - parts = [p for p in re.split(r'[-\s]+', term) if p] - return parts or [term] - -# ------------------------ -# NV_MASTER indexieren -# ------------------------ -def build_norm_index(nv_path): - norm_dict = {} - lemma_index = {} - if not PANDAS_AVAILABLE: - log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR") - return norm_dict, lemma_index - try: - sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf") - except Exception as e: - log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR") - return norm_dict, lemma_index - - for sheet_name, df in sheets.items(): - if str(sheet_name).strip().lower() == "master": - continue - df = df.fillna("") - cols = [str(c).strip().lower() for c in df.columns] - id_col = None - word_col = None - for i, c in enumerate(cols): - if "id" in c: - id_col = df.columns[i] - if "wort" in c or "vokabel" in c: - word_col = df.columns[i] - if word_col is None and len(df.columns) >= 1: - word_col = df.columns[-1] - if id_col is None and len(df.columns) >= 1: - id_col = df.columns[0] - - current_parent_id = None - for _, row in df.iterrows(): - id_val = str(row[id_col]).strip() if id_col in df.columns else "" - word_val = str(row[word_col]).strip() if word_col in df.columns else "" - if id_val: - current_parent_id = id_val - if not word_val: - continue - norm_name = normalize_text(word_val) - lemma = lemmatize_term(word_val) - entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name} - norm_dict.setdefault(norm_name, []).append(entry) - lemma_index.setdefault(lemma, []).append(entry) - - log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}") - return norm_dict, lemma_index - -# ------------------------ -# Fuzzy / Vorschläge -# ------------------------ -def fuzzy_score(a, b): - if RAPIDFUZZ_AVAILABLE: - try: - return fuzz.token_set_ratio(a, b) / 100.0 - except Exception: - return 0.0 - else: - return SequenceMatcher(None, a.lower(), b.lower()).ratio() - -def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD): - candidates = [] - for key_lemma, entries in lemma_index.items(): - score = fuzzy_score(term_lemma, key_lemma) - if key_lemma.startswith(term_lemma): - score = min(score + 0.1, 1.0) - if score >= threshold: - for e in entries: - candidates.append((score, e["Name"], e["ID"])) - for norm_key, entries in norm_dict.items(): - score = fuzzy_score(term_lemma, norm_key) - if norm_key.startswith(term_lemma): - score = min(score + 0.1, 1.0) - if score >= threshold: - for e in entries: - candidates.append((score, e["Name"], e["ID"])) - candidates.sort(key=lambda t: t[0], reverse=True) - seen = set() - results = [] - for score, name, id_ in candidates: - key = (name, id_) - if key in seen: - continue - seen.add(key) - results.append({"score": score, "name": name, "id": id_}) - return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results] - -# ------------------------ -# Mapping eines Terms (mit Cache) -# ------------------------ -def map_term_with_indexes(term, norm_dict, lemma_index): - term_norm = normalize_text(term) - term_lemma = lemmatize_term(term) - if term_lemma in CACHE: - c = CACHE[term_lemma] - return c.get("hits", []), c.get("suggestions", []), c.get("ids", []) - - hits = [] - suggestions = [] - ids = [] - - if term_norm in norm_dict: - for e in norm_dict[term_norm]: - hits.append(e["Name"]) - if e["ID"]: - ids.append(e["ID"]) - if not hits and term_lemma in lemma_index: - for e in lemma_index[term_lemma]: - hits.append(e["Name"]) - if e["ID"]: - ids.append(e["ID"]) - suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index) - - if not hits: - tokens = compound_split(term) - for t in tokens: - t_lemma = lemmatize_term(t) - if t_lemma in lemma_index: - for e in lemma_index[t_lemma]: - hits.append(e["Name"]) - if e["ID"]: - ids.append(e["ID"]) - else: - suggestions.extend(get_suggestions_for_term(t_lemma, norm_dict, lemma_index)) - - def uniq(seq): - seen = set() - out = [] - for x in seq: - if x not in seen: - seen.add(x) - out.append(x) - return out - - hits = uniq(hits) - suggestions = uniq(suggestions) - ids = uniq(ids) - - CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids} - return hits, suggestions, ids - -# ------------------------ -# Header + Spalten -# ------------------------ -def find_header_and_cols(sheet): - try: - cursor = sheet.createCursor() - cursor.gotoStartOfUsedArea(False) - cursor.gotoEndOfUsedArea(True) - dr = cursor.getRangeAddress() - except Exception: - return None, None, None - header_row = None - objekt_col = None - for r in range(0, min(5, dr.EndRow + 1)): - for c in range(0, dr.EndColumn + 1): - try: - val = str(sheet.getCellByPosition(c, r).String).strip().lower() - except Exception: - val = "" - if val == "objektbeschreibung": - header_row = r - objekt_col = c - break - if objekt_col is not None: - break - - if header_row is None: - return None, None, dr - existing = {} - for c in range(0, dr.EndColumn + 1): - try: - h = str(sheet.getCellByPosition(c, header_row).String).strip() - except Exception: - h = "" - if h == "Norm_Treffer": - existing["Norm_Treffer"] = c - if h == "Norm_Vorschlag": - existing["Norm_Vorschlag"] = c - if h == "Norm_ID": - existing["Norm_ID"] = c - return header_row, objekt_col, dr, existing - -# ------------------------ -# Optimierter Live-Handler (nur Objektbeschreibung) -# ------------------------ -def on_objektbeschreibung_change(oEvent=None): - try: - doc = XSCRIPTCONTEXT.getDocument() - sheet = doc.CurrentController.ActiveSheet - except Exception as e: - log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR") - return - - cell = None - try: - if oEvent and hasattr(oEvent, "Range") and oEvent.Range is not None: - cell = oEvent.Range - elif oEvent and hasattr(oEvent, "Source") and oEvent.Source is not None: - cell = oEvent.Source - except Exception: - cell = None - if cell is None: - try: - sel = doc.CurrentSelection - if hasattr(sel, "getCellByPosition"): - cell = sel - else: - cell = sel.getCellByPosition(0, 0) - except Exception as e: - log(f"Keine Selektion: {e}", level="ERROR") - return - - try: - row_index = cell.CellAddress.Row - col_index = cell.CellAddress.Column - except Exception: - return - - try: - header_row, objekt_col, dr, existing = find_header_and_cols(sheet) - if header_row is None or col_index != objekt_col: - return # nur die Objektbeschreibung-Spalte bearbeiten - last_col = dr.EndColumn - if "Norm_Vorschlag" not in existing: - last_col += 1 - existing["Norm_Vorschlag"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" - norm_sug_col = existing["Norm_Vorschlag"] - except Exception as e: - log(f"Fehler Spaltenbestimmung: {e}", level="ERROR") - return - - try: - txt = str(cell.String).strip() - if not txt: - sheet.getCellByPosition(norm_sug_col, row_index).String = "" - return - norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) - suggestions_acc = [] - clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] - for cl in clauses: - parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()] - for p in parts: - if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p): - continue - for sp in compound_split(p): - _, sugs, _ = map_term_with_indexes(sp, norm_dict, lemma_index) - suggestions_acc.extend(sugs) - - seen = set() - ordered = [] - for s in suggestions_acc: - if s not in seen: - seen.add(s) - ordered.append(s) - sheet.getCellByPosition(norm_sug_col, row_index).String = " | ".join(ordered) - - with open(CACHE_FILE, "w", encoding="utf-8") as f: - json.dump(CACHE, f, ensure_ascii=False, indent=2) - - except Exception as e: - log(f"Fehler im Live-Handler: {e}", level="ERROR") - -# ------------------------ -# Batch-Durchlauf -# ------------------------ -def run_mapper_macro(): - log("=== mapper_macro 1.5 gestartet ===", level="INFO") - try: - doc = XSCRIPTCONTEXT.getDocument() - sheet = doc.CurrentController.ActiveSheet - cursor = sheet.createCursor() - cursor.gotoStartOfUsedArea(False) - cursor.gotoEndOfUsedArea(True) - dr = cursor.getRangeAddress() - except Exception as e: - log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR") - return - - header_row, objekt_col, dr, existing = find_header_and_cols(sheet) - if objekt_col is None: - log("Spalte 'Objektbeschreibung' nicht gefunden.", level="ERROR") - return - if "Norm_Treffer" not in existing: - last_col = dr.EndColumn + 1 - existing["Norm_Treffer"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer" - if "Norm_Vorschlag" not in existing: - last_col = dr.EndColumn + 2 - existing["Norm_Vorschlag"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag" - if "Norm_ID" not in existing: - last_col = dr.EndColumn + 3 - existing["Norm_ID"] = last_col - sheet.getCellByPosition(last_col, header_row).String = "Norm_ID" - - norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH) - GREEN, YELLOW, RED = 0xADFF2F, 0xFFA500, 0xCC0000 - - for r in range(header_row + 1, dr.EndRow + 1): - try: - cell = sheet.getCellByPosition(objekt_col, r) - txt = str(cell.String).strip() - if not txt: - continue - clauses = [c.strip() for c in re.split(r",", txt) if c.strip()] - terms = [] - for cl in clauses: - for p in [p.strip() for p in re.split(r"\s+", cl) if p.strip()]: - if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p): - continue - terms.extend([sp.strip() for sp in compound_split(p) if sp.strip()]) - - row_hits, row_sugs, row_ids = [], [], [] - any_unmapped = False - for term in terms: - hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index) - row_hits.extend(hits) - row_sugs.extend(sugs) - row_ids.extend(ids) - if not hits and not sugs: - any_unmapped = True - - def uniq(seq): - seen = set() - out = [] - for x in seq: - if x not in seen: - seen.add(x) - out.append(x) - return out - - row_hits, row_sugs, row_ids = map(uniq, [row_hits, row_sugs, row_ids]) - sheet.getCellByPosition(existing["Norm_Treffer"], r).String = " | ".join(row_hits) - sheet.getCellByPosition(existing["Norm_Vorschlag"], r).String = " | ".join(row_sugs) - sheet.getCellByPosition(existing["Norm_ID"], r).String = " | ".join(row_ids) - - cell.CellBackColor = RED if any_unmapped else 0xFFFFFF - sheet.getCellByPosition(existing["Norm_Treffer"], r).CellBackColor = GREEN if row_hits and not any_unmapped else 0xFFFFFF - sheet.getCellByPosition(existing["Norm_Vorschlag"], r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF - - except Exception as e: - log(f"Fehler in Zeile {r}: {e}", level="ERROR") - continue - - with open(CACHE_FILE, "w", encoding="utf-8") as f: - json.dump(CACHE, f, ensure_ascii=False, indent=2) - log("=== mapper_macro 1.5 fertig ===", level="INFO") - -# ------------------------ -# Export -# ------------------------ -g_exportedScripts = ( - run_mapper_macro, - on_objektbeschreibung_change -)