Upload files to "/"
Alle Alten Versionen des Mapper-Makros
This commit is contained in:
commit
9869e72c53
469
mapper_macro_1.4.py
Normal file
469
mapper_macro_1.4.py
Normal file
@ -0,0 +1,469 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# mapper_macro 1.5 - LibreOffice Calc
|
||||
# Features: Kompositum-Split, Cache, Live-Vorschläge nur auf 'Objektbeschreibung', Logging
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import datetime
|
||||
|
||||
# optional imports (Pandas, Spacy, RapidFuzz)
|
||||
try:
|
||||
import pandas as pd
|
||||
PANDAS_AVAILABLE = True
|
||||
except Exception:
|
||||
PANDAS_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import spacy
|
||||
nlp = spacy.load("de_core_news_sm")
|
||||
SPACY_AVAILABLE = True
|
||||
except Exception:
|
||||
SPACY_AVAILABLE = False
|
||||
nlp = None
|
||||
|
||||
try:
|
||||
from rapidfuzz import fuzz
|
||||
RAPIDFUZZ_AVAILABLE = True
|
||||
except Exception:
|
||||
RAPIDFUZZ_AVAILABLE = False
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# ------------------------
|
||||
# Konfiguration
|
||||
# ------------------------
|
||||
BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro"
|
||||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
||||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json")
|
||||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log")
|
||||
|
||||
STOPWORDS = {
|
||||
"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an",
|
||||
"als","bei","für","aus","dem","den","des","eines","einer"
|
||||
}
|
||||
CONF_THRESHOLD = 0.75
|
||||
|
||||
# ------------------------
|
||||
# Logging
|
||||
# ------------------------
|
||||
def log(msg, level="INFO"):
|
||||
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
line = f"[{ts}] [{level}] {msg}\n"
|
||||
try:
|
||||
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(line)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------
|
||||
# Cache laden
|
||||
# ------------------------
|
||||
try:
|
||||
if os.path.exists(CACHE_FILE):
|
||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||||
CACHE = json.load(f)
|
||||
else:
|
||||
CACHE = {}
|
||||
except Exception as e:
|
||||
CACHE = {}
|
||||
log(f"Fehler beim Laden des Caches: {e}", level="ERROR")
|
||||
|
||||
# ------------------------
|
||||
# Textnormalisierung & Lemma
|
||||
# ------------------------
|
||||
lemma_cache = {}
|
||||
|
||||
def normalize_text(s):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).strip().lower()
|
||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s
|
||||
|
||||
def lemmatize_term(term):
|
||||
term_norm = normalize_text(term)
|
||||
if term_norm in lemma_cache:
|
||||
return lemma_cache[term_norm]
|
||||
if SPACY_AVAILABLE and nlp:
|
||||
try:
|
||||
doc = nlp(term_norm)
|
||||
lemma = " ".join([t.lemma_ for t in doc])
|
||||
except Exception:
|
||||
lemma = term_norm
|
||||
else:
|
||||
lemma = term_norm
|
||||
lemma_cache[term_norm] = lemma
|
||||
return lemma
|
||||
|
||||
# ------------------------
|
||||
# Kompositum-Splitting
|
||||
# ------------------------
|
||||
def compound_split(term):
|
||||
if not term:
|
||||
return []
|
||||
parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term)
|
||||
if parts:
|
||||
return parts
|
||||
parts = [p for p in re.split(r'[-\s]+', term) if p]
|
||||
return parts or [term]
|
||||
|
||||
# ------------------------
|
||||
# NV_MASTER indexieren
|
||||
# ------------------------
|
||||
def build_norm_index(nv_path):
|
||||
norm_dict = {}
|
||||
lemma_index = {}
|
||||
if not PANDAS_AVAILABLE:
|
||||
log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR")
|
||||
return norm_dict, lemma_index
|
||||
try:
|
||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
for sheet_name, df in sheets.items():
|
||||
if str(sheet_name).strip().lower() == "master":
|
||||
continue
|
||||
df = df.fillna("")
|
||||
cols = [str(c).strip().lower() for c in df.columns]
|
||||
id_col = None
|
||||
word_col = None
|
||||
for i, c in enumerate(cols):
|
||||
if "id" in c:
|
||||
id_col = df.columns[i]
|
||||
if "wort" in c or "vokabel" in c:
|
||||
word_col = df.columns[i]
|
||||
if word_col is None and len(df.columns) >= 1:
|
||||
word_col = df.columns[-1]
|
||||
if id_col is None and len(df.columns) >= 1:
|
||||
id_col = df.columns[0]
|
||||
|
||||
current_parent_id = None
|
||||
for _, row in df.iterrows():
|
||||
id_val = str(row[id_col]).strip() if id_col in df.columns else ""
|
||||
word_val = str(row[word_col]).strip() if word_col in df.columns else ""
|
||||
if id_val:
|
||||
current_parent_id = id_val
|
||||
if not word_val:
|
||||
continue
|
||||
norm_name = normalize_text(word_val)
|
||||
lemma = lemmatize_term(word_val)
|
||||
entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name}
|
||||
norm_dict.setdefault(norm_name, []).append(entry)
|
||||
lemma_index.setdefault(lemma, []).append(entry)
|
||||
|
||||
log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
# ------------------------
|
||||
# Fuzzy / Vorschläge
|
||||
# ------------------------
|
||||
def fuzzy_score(a, b):
|
||||
if RAPIDFUZZ_AVAILABLE:
|
||||
try:
|
||||
return fuzz.token_set_ratio(a, b) / 100.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
else:
|
||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||||
|
||||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD):
|
||||
candidates = []
|
||||
for key_lemma, entries in lemma_index.items():
|
||||
score = fuzzy_score(term_lemma, key_lemma)
|
||||
if key_lemma.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
for norm_key, entries in norm_dict.items():
|
||||
score = fuzzy_score(term_lemma, norm_key)
|
||||
if norm_key.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
candidates.sort(key=lambda t: t[0], reverse=True)
|
||||
seen = set()
|
||||
results = []
|
||||
for score, name, id_ in candidates:
|
||||
key = (name, id_)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
results.append({"score": score, "name": name, "id": id_})
|
||||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
||||
|
||||
# ------------------------
|
||||
# Mapping eines Terms (mit Cache)
|
||||
# ------------------------
|
||||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
||||
term_norm = normalize_text(term)
|
||||
term_lemma = lemmatize_term(term)
|
||||
if term_lemma in CACHE:
|
||||
c = CACHE[term_lemma]
|
||||
return c.get("hits", []), c.get("suggestions", []), c.get("ids", [])
|
||||
|
||||
hits = []
|
||||
suggestions = []
|
||||
ids = []
|
||||
|
||||
if term_norm in norm_dict:
|
||||
for e in norm_dict[term_norm]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
if not hits and term_lemma in lemma_index:
|
||||
for e in lemma_index[term_lemma]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index)
|
||||
|
||||
if not hits:
|
||||
tokens = compound_split(term)
|
||||
for t in tokens:
|
||||
t_lemma = lemmatize_term(t)
|
||||
if t_lemma in lemma_index:
|
||||
for e in lemma_index[t_lemma]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
else:
|
||||
suggestions.extend(get_suggestions_for_term(t_lemma, norm_dict, lemma_index))
|
||||
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
hits = uniq(hits)
|
||||
suggestions = uniq(suggestions)
|
||||
ids = uniq(ids)
|
||||
|
||||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids}
|
||||
return hits, suggestions, ids
|
||||
|
||||
# ------------------------
|
||||
# Header + Spalten
|
||||
# ------------------------
|
||||
def find_header_and_cols(sheet):
|
||||
try:
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
dr = cursor.getRangeAddress()
|
||||
except Exception:
|
||||
return None, None, None
|
||||
header_row = None
|
||||
objekt_col = None
|
||||
for r in range(0, min(5, dr.EndRow + 1)):
|
||||
for c in range(0, dr.EndColumn + 1):
|
||||
try:
|
||||
val = str(sheet.getCellByPosition(c, r).String).strip().lower()
|
||||
except Exception:
|
||||
val = ""
|
||||
if val == "objektbeschreibung":
|
||||
header_row = r
|
||||
objekt_col = c
|
||||
break
|
||||
if objekt_col is not None:
|
||||
break
|
||||
|
||||
if header_row is None:
|
||||
return None, None, dr
|
||||
existing = {}
|
||||
for c in range(0, dr.EndColumn + 1):
|
||||
try:
|
||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||||
except Exception:
|
||||
h = ""
|
||||
if h == "Norm_Treffer":
|
||||
existing["Norm_Treffer"] = c
|
||||
if h == "Norm_Vorschlag":
|
||||
existing["Norm_Vorschlag"] = c
|
||||
if h == "Norm_ID":
|
||||
existing["Norm_ID"] = c
|
||||
return header_row, objekt_col, dr, existing
|
||||
|
||||
# ------------------------
|
||||
# Optimierter Live-Handler (nur Objektbeschreibung)
|
||||
# ------------------------
|
||||
def on_objektbeschreibung_change(oEvent=None):
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
except Exception as e:
|
||||
log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR")
|
||||
return
|
||||
|
||||
cell = None
|
||||
try:
|
||||
if oEvent and hasattr(oEvent, "Range") and oEvent.Range is not None:
|
||||
cell = oEvent.Range
|
||||
elif oEvent and hasattr(oEvent, "Source") and oEvent.Source is not None:
|
||||
cell = oEvent.Source
|
||||
except Exception:
|
||||
cell = None
|
||||
if cell is None:
|
||||
try:
|
||||
sel = doc.CurrentSelection
|
||||
if hasattr(sel, "getCellByPosition"):
|
||||
cell = sel
|
||||
else:
|
||||
cell = sel.getCellByPosition(0, 0)
|
||||
except Exception as e:
|
||||
log(f"Keine Selektion: {e}", level="ERROR")
|
||||
return
|
||||
|
||||
try:
|
||||
row_index = cell.CellAddress.Row
|
||||
col_index = cell.CellAddress.Column
|
||||
except Exception:
|
||||
return
|
||||
|
||||
try:
|
||||
header_row, objekt_col, dr, existing = find_header_and_cols(sheet)
|
||||
if header_row is None or col_index != objekt_col:
|
||||
return # nur die Objektbeschreibung-Spalte bearbeiten
|
||||
last_col = dr.EndColumn
|
||||
if "Norm_Vorschlag" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Vorschlag"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
||||
norm_sug_col = existing["Norm_Vorschlag"]
|
||||
except Exception as e:
|
||||
log(f"Fehler Spaltenbestimmung: {e}", level="ERROR")
|
||||
return
|
||||
|
||||
try:
|
||||
txt = str(cell.String).strip()
|
||||
if not txt:
|
||||
sheet.getCellByPosition(norm_sug_col, row_index).String = ""
|
||||
return
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
suggestions_acc = []
|
||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
||||
for cl in clauses:
|
||||
parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()]
|
||||
for p in parts:
|
||||
if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p):
|
||||
continue
|
||||
for sp in compound_split(p):
|
||||
_, sugs, _ = map_term_with_indexes(sp, norm_dict, lemma_index)
|
||||
suggestions_acc.extend(sugs)
|
||||
|
||||
seen = set()
|
||||
ordered = []
|
||||
for s in suggestions_acc:
|
||||
if s not in seen:
|
||||
seen.add(s)
|
||||
ordered.append(s)
|
||||
sheet.getCellByPosition(norm_sug_col, row_index).String = " | ".join(ordered)
|
||||
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
|
||||
except Exception as e:
|
||||
log(f"Fehler im Live-Handler: {e}", level="ERROR")
|
||||
|
||||
# ------------------------
|
||||
# Batch-Durchlauf
|
||||
# ------------------------
|
||||
def run_mapper_macro():
|
||||
log("=== mapper_macro 1.5 gestartet ===", level="INFO")
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
dr = cursor.getRangeAddress()
|
||||
except Exception as e:
|
||||
log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR")
|
||||
return
|
||||
|
||||
header_row, objekt_col, dr, existing = find_header_and_cols(sheet)
|
||||
if objekt_col is None:
|
||||
log("Spalte 'Objektbeschreibung' nicht gefunden.", level="ERROR")
|
||||
return
|
||||
if "Norm_Treffer" not in existing:
|
||||
last_col = dr.EndColumn + 1
|
||||
existing["Norm_Treffer"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer"
|
||||
if "Norm_Vorschlag" not in existing:
|
||||
last_col = dr.EndColumn + 2
|
||||
existing["Norm_Vorschlag"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
||||
if "Norm_ID" not in existing:
|
||||
last_col = dr.EndColumn + 3
|
||||
existing["Norm_ID"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_ID"
|
||||
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
GREEN, YELLOW, RED = 0xADFF2F, 0xFFA500, 0xCC0000
|
||||
|
||||
for r in range(header_row + 1, dr.EndRow + 1):
|
||||
try:
|
||||
cell = sheet.getCellByPosition(objekt_col, r)
|
||||
txt = str(cell.String).strip()
|
||||
if not txt:
|
||||
continue
|
||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
||||
terms = []
|
||||
for cl in clauses:
|
||||
for p in [p.strip() for p in re.split(r"\s+", cl) if p.strip()]:
|
||||
if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p):
|
||||
continue
|
||||
terms.extend([sp.strip() for sp in compound_split(p) if sp.strip()])
|
||||
|
||||
row_hits, row_sugs, row_ids = [], [], []
|
||||
any_unmapped = False
|
||||
for term in terms:
|
||||
hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index)
|
||||
row_hits.extend(hits)
|
||||
row_sugs.extend(sugs)
|
||||
row_ids.extend(ids)
|
||||
if not hits and not sugs:
|
||||
any_unmapped = True
|
||||
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
row_hits, row_sugs, row_ids = map(uniq, [row_hits, row_sugs, row_ids])
|
||||
sheet.getCellByPosition(existing["Norm_Treffer"], r).String = " | ".join(row_hits)
|
||||
sheet.getCellByPosition(existing["Norm_Vorschlag"], r).String = " | ".join(row_sugs)
|
||||
sheet.getCellByPosition(existing["Norm_ID"], r).String = " | ".join(row_ids)
|
||||
|
||||
cell.CellBackColor = RED if any_unmapped else 0xFFFFFF
|
||||
sheet.getCellByPosition(existing["Norm_Treffer"], r).CellBackColor = GREEN if row_hits and not any_unmapped else 0xFFFFFF
|
||||
sheet.getCellByPosition(existing["Norm_Vorschlag"], r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF
|
||||
|
||||
except Exception as e:
|
||||
log(f"Fehler in Zeile {r}: {e}", level="ERROR")
|
||||
continue
|
||||
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
log("=== mapper_macro 1.5 fertig ===", level="INFO")
|
||||
|
||||
# ------------------------
|
||||
# Export
|
||||
# ------------------------
|
||||
g_exportedScripts = (
|
||||
run_mapper_macro,
|
||||
on_objektbeschreibung_change
|
||||
)
|
||||
508
mapper_macro_1.5.py
Normal file
508
mapper_macro_1.5.py
Normal file
@ -0,0 +1,508 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# mapper_macro 1.5 - korrigiert: Logging im Dokumentverzeichnis, stabile Button-Erstellung,
|
||||
# keine Listener, optimiertes Mapping (ohne Listener-Teil)
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import datetime
|
||||
|
||||
# optionale Module (Pandas, Spacy, RapidFuzz)
|
||||
try:
|
||||
import pandas as pd
|
||||
PANDAS_AVAILABLE = True
|
||||
except Exception:
|
||||
PANDAS_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import spacy
|
||||
nlp = spacy.load("de_core_news_sm")
|
||||
SPACY_AVAILABLE = True
|
||||
except Exception:
|
||||
SPACY_AVAILABLE = False
|
||||
nlp = None
|
||||
|
||||
try:
|
||||
from rapidfuzz import fuzz
|
||||
RAPIDFUZZ_AVAILABLE = True
|
||||
except Exception:
|
||||
RAPIDFUZZ_AVAILABLE = False
|
||||
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# UNO (für Button/Paths)
|
||||
try:
|
||||
import uno
|
||||
except Exception:
|
||||
uno = None
|
||||
|
||||
# ------------------------
|
||||
# Konfiguration (Fallback-BASE_DIR)
|
||||
# ------------------------
|
||||
BASE_DIR = os.path.expanduser("~/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro")
|
||||
NV_MASTER_FILENAME = "NV_MASTER.ods"
|
||||
CACHE_FILENAME = "mapper_cache.json"
|
||||
LOG_FILENAME = "mapper_macro.log"
|
||||
|
||||
STOPWORDS = {
|
||||
"mit", "ohne", "der", "die", "das", "ein", "eine", "und", "zu", "von", "im", "in", "auf", "an",
|
||||
"als", "bei", "für", "aus", "dem", "den", "des", "eines", "einer"
|
||||
}
|
||||
CONF_THRESHOLD = 0.82
|
||||
FUZZY_CUTOFF = 0.88
|
||||
|
||||
# Per-document paths (initialized by set_paths_from_doc)
|
||||
DOC_DIR = BASE_DIR
|
||||
NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME)
|
||||
CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME)
|
||||
LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME)
|
||||
|
||||
# in-memory cache
|
||||
try:
|
||||
if os.path.exists(CACHE_FILE):
|
||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||||
CACHE = json.load(f)
|
||||
else:
|
||||
CACHE = {}
|
||||
except Exception:
|
||||
CACHE = {}
|
||||
|
||||
# ------------------------
|
||||
# Pfade im Dokument setzen
|
||||
# ------------------------
|
||||
def set_paths_from_doc(doc):
|
||||
global DOC_DIR, NV_MASTER_PATH, CACHE_FILE, LOG_FILE
|
||||
try:
|
||||
url = getattr(doc, "URL", "")
|
||||
if url and url.strip():
|
||||
# UNO liefert file:///...
|
||||
try:
|
||||
system_path = uno.fileUrlToSystemPath(url)
|
||||
except Exception:
|
||||
# fallback: try simple unquote
|
||||
from urllib.parse import unquote, urlparse
|
||||
parsed = urlparse(url)
|
||||
if parsed.scheme == "file":
|
||||
system_path = unquote(parsed.path)
|
||||
else:
|
||||
system_path = ""
|
||||
if system_path:
|
||||
d = os.path.dirname(system_path)
|
||||
if os.path.isdir(d):
|
||||
DOC_DIR = d
|
||||
except Exception:
|
||||
DOC_DIR = BASE_DIR
|
||||
NV_MASTER_PATH = os.path.join(DOC_DIR, NV_MASTER_FILENAME)
|
||||
CACHE_FILE = os.path.join(DOC_DIR, CACHE_FILENAME)
|
||||
LOG_FILE = os.path.join(DOC_DIR, LOG_FILENAME)
|
||||
|
||||
# ------------------------
|
||||
# Logging (Dokumentdir, robust)
|
||||
# ------------------------
|
||||
def log(msg, level="INFO"):
|
||||
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
line = f"[{ts}] [{level}] {msg}\n"
|
||||
try:
|
||||
# ensure directory exists
|
||||
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(line)
|
||||
except Exception:
|
||||
# absolute fallback: try writing into BASE_DIR
|
||||
try:
|
||||
fallback = os.path.join(BASE_DIR, LOG_FILENAME)
|
||||
os.makedirs(os.path.dirname(fallback), exist_ok=True)
|
||||
with open(fallback, "a", encoding="utf-8") as f:
|
||||
f.write(line)
|
||||
except Exception:
|
||||
# last resort: silent
|
||||
pass
|
||||
|
||||
# ------------------------
|
||||
# Textvorbereitung & Helpers
|
||||
# ------------------------
|
||||
lemma_cache = {}
|
||||
|
||||
def normalize_text(s):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).strip().lower()
|
||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s
|
||||
|
||||
def lemmatize_term(term):
|
||||
term_norm = normalize_text(term)
|
||||
if term_norm in lemma_cache:
|
||||
return lemma_cache[term_norm]
|
||||
if SPACY_AVAILABLE and nlp:
|
||||
try:
|
||||
doc = nlp(term_norm)
|
||||
lemma = " ".join([t.lemma_ for t in doc])
|
||||
except Exception:
|
||||
lemma = term_norm
|
||||
else:
|
||||
lemma = term_norm
|
||||
lemma_cache[term_norm] = lemma
|
||||
return lemma
|
||||
|
||||
def compound_split(term):
|
||||
if not term:
|
||||
return []
|
||||
parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term)
|
||||
if parts:
|
||||
return parts
|
||||
parts = [p for p in re.split(r'[-\s]+', term) if p]
|
||||
return parts or [term]
|
||||
|
||||
# ------------------------
|
||||
# NV_MASTER indexieren
|
||||
# ------------------------
|
||||
def build_norm_index(nv_path):
|
||||
norm_dict = {}
|
||||
lemma_index = {}
|
||||
if not PANDAS_AVAILABLE:
|
||||
log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR")
|
||||
return norm_dict, lemma_index
|
||||
try:
|
||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
for sheet_name, df in sheets.items():
|
||||
if str(sheet_name).strip().lower() == "master":
|
||||
continue
|
||||
df = df.fillna("")
|
||||
cols = [str(c).strip().lower() for c in df.columns]
|
||||
# find id/word columns with fallback
|
||||
id_col = None
|
||||
word_col = None
|
||||
for i, c in enumerate(cols):
|
||||
if "id" in c:
|
||||
id_col = df.columns[i]
|
||||
if "wort" in c or "vokabel" in c:
|
||||
word_col = df.columns[i]
|
||||
if word_col is None and len(df.columns) >= 1:
|
||||
word_col = df.columns[-1]
|
||||
if id_col is None and len(df.columns) >= 1:
|
||||
id_col = df.columns[0]
|
||||
|
||||
current_parent_id = None
|
||||
for _, row in df.iterrows():
|
||||
id_val = str(row[id_col]).strip() if id_col in df.columns else ""
|
||||
word_val = str(row[word_col]).strip() if word_col in df.columns else ""
|
||||
if id_val:
|
||||
current_parent_id = id_val
|
||||
if not word_val:
|
||||
continue
|
||||
norm_name = normalize_text(word_val)
|
||||
lemma = lemmatize_term(word_val)
|
||||
entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name}
|
||||
norm_dict.setdefault(norm_name, []).append(entry)
|
||||
lemma_index.setdefault(lemma, []).append(entry)
|
||||
|
||||
log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}", level="INFO")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
# ------------------------
|
||||
# Fuzzy Matching
|
||||
# ------------------------
|
||||
def fuzzy_score(a, b):
|
||||
a = (a or "").lower()
|
||||
b = (b or "").lower()
|
||||
if RAPIDFUZZ_AVAILABLE:
|
||||
try:
|
||||
return fuzz.token_sort_ratio(a, b) / 100.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
else:
|
||||
return SequenceMatcher(None, a, b).ratio()
|
||||
|
||||
def get_suggestions(term_lemma, norm_dict, lemma_index, threshold=FUZZY_CUTOFF, max_sugs=6):
|
||||
candidates = []
|
||||
term_norm = term_lemma or ""
|
||||
for key_lemma, entries in lemma_index.items():
|
||||
if not key_lemma:
|
||||
continue
|
||||
score = fuzzy_score(term_norm, key_lemma)
|
||||
if key_lemma.startswith(term_norm):
|
||||
score = min(score + 0.08, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
# also check normalized names
|
||||
for norm_key, entries in norm_dict.items():
|
||||
score = fuzzy_score(term_norm, norm_key)
|
||||
if norm_key.startswith(term_norm):
|
||||
score = min(score + 0.08, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
# sort & dedupe
|
||||
candidates.sort(key=lambda t: t[0], reverse=True)
|
||||
seen = set()
|
||||
out = []
|
||||
for score, name, id_ in candidates:
|
||||
key = (name, id_)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
if id_:
|
||||
out.append(f"{name} ({id_})")
|
||||
else:
|
||||
out.append(name)
|
||||
if len(out) >= max_sugs:
|
||||
break
|
||||
return out
|
||||
|
||||
# ------------------------
|
||||
# Mapping mit Cache
|
||||
# ------------------------
|
||||
def map_term(term, norm_dict, lemma_index):
|
||||
term_norm = normalize_text(term)
|
||||
term_lemma = lemmatize_term(term)
|
||||
if term_lemma in CACHE:
|
||||
return CACHE[term_lemma]
|
||||
|
||||
hits = []
|
||||
suggestions = []
|
||||
ids = []
|
||||
|
||||
# exact
|
||||
if term_norm in norm_dict:
|
||||
for e in norm_dict[term_norm]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
|
||||
# lemma
|
||||
if not hits and term_lemma in lemma_index:
|
||||
for e in lemma_index[term_lemma]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
|
||||
# suggestions only if no hit
|
||||
if not hits:
|
||||
suggestions = get_suggestions(term_lemma, norm_dict, lemma_index)
|
||||
|
||||
# remove suggestions that are equal/contain hits
|
||||
suggestions = [s for s in suggestions if not any(h.lower() in s.lower() for h in hits)]
|
||||
|
||||
result = {"hits": hits, "suggestions": suggestions, "ids": ids}
|
||||
CACHE[term_lemma] = result
|
||||
return result
|
||||
|
||||
# ------------------------
|
||||
# Button erstellen (sicher)
|
||||
# ------------------------
|
||||
def add_macro_button(sheet):
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
except Exception:
|
||||
log("add_macro_button: kein Dokument-Kontext", level="WARN")
|
||||
return
|
||||
try:
|
||||
draw_page = sheet.DrawPage
|
||||
# avoid duplicate
|
||||
for shape in draw_page:
|
||||
try:
|
||||
if getattr(shape, "Name", "") == "MapperStartButton":
|
||||
return
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
# create shape and button model
|
||||
shape = doc.createInstance("com.sun.star.drawing.ControlShape")
|
||||
shape.Name = "MapperStartButton"
|
||||
shape.Position = uno.createUnoStruct("com.sun.star.awt.Point")
|
||||
shape.Position.X = 1000
|
||||
shape.Position.Y = 200
|
||||
shape.Size = uno.createUnoStruct("com.sun.star.awt.Size")
|
||||
shape.Size.Width = 3000
|
||||
shape.Size.Height = 1000
|
||||
|
||||
button_model = doc.createInstance("com.sun.star.form.component.CommandButton")
|
||||
button_model.Label = "Start Mapping"
|
||||
button_model.HelpText = "Startet das Mapping (run_mapper_macro)"
|
||||
# assign macro via ActionCommand is not enough; user must link in UI; we add the control and label
|
||||
|
||||
shape.Control = button_model
|
||||
draw_page.add(shape)
|
||||
log("Button 'MapperStartButton' erstellt.", level="INFO")
|
||||
except Exception as e:
|
||||
log(f"add_macro_button Fehler: {e}", level="ERROR")
|
||||
|
||||
# ------------------------
|
||||
# Hauptlauf (ohne Listener)
|
||||
# ------------------------
|
||||
def run_mapper_macro():
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
set_paths_from_doc(doc)
|
||||
log("=== mapper_macro gestartet ===", level="INFO")
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
add_macro_button(sheet)
|
||||
|
||||
# used area
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
dr = cursor.getRangeAddress()
|
||||
|
||||
# find header and objekt col
|
||||
header_row = None
|
||||
objekt_col = None
|
||||
for r in range(0, min(10, dr.EndRow + 1)):
|
||||
for c in range(0, dr.EndColumn + 1):
|
||||
try:
|
||||
val = str(sheet.getCellByPosition(c, r).String).strip().lower()
|
||||
except Exception:
|
||||
val = ""
|
||||
if val == "Objektbeschreibung":
|
||||
header_row = r
|
||||
objekt_col = c
|
||||
break
|
||||
if objekt_col is not None:
|
||||
break
|
||||
|
||||
if objekt_col is None:
|
||||
log("run_mapper_macro: 'Objektbeschreibung' Header nicht gefunden.", level="ERROR")
|
||||
return
|
||||
|
||||
# ensure result cols
|
||||
existing = {}
|
||||
last_col = dr.EndColumn
|
||||
for c in range(0, dr.EndColumn + 1):
|
||||
try:
|
||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||||
except Exception:
|
||||
h = ""
|
||||
if h == "Norm_Treffer":
|
||||
existing["Norm_Treffer"] = c
|
||||
if h == "Norm_Vorschlag":
|
||||
existing["Norm_Vorschlag"] = c
|
||||
if h == "Norm_ID":
|
||||
existing["Norm_ID"] = c
|
||||
|
||||
if "Norm_Treffer" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Treffer"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer"
|
||||
if "Norm_Vorschlag" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Vorschlag"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
||||
if "Norm_ID" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_ID"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_ID"
|
||||
|
||||
norm_tr_col = existing["Norm_Treffer"]
|
||||
norm_sug_col = existing["Norm_Vorschlag"]
|
||||
norm_id_col = existing["Norm_ID"]
|
||||
|
||||
# build index
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
if not norm_dict and not lemma_index:
|
||||
log("run_mapper_macro: NV_MASTER leer oder nicht lesbar.", level="ERROR")
|
||||
return
|
||||
|
||||
GREEN, YELLOW, RED = 0xADFF2F, 0xFFFF66, 0xFF9999
|
||||
rows_processed = 0
|
||||
|
||||
for r in range(header_row + 1, dr.EndRow + 1):
|
||||
try:
|
||||
cell = sheet.getCellByPosition(objekt_col, r)
|
||||
txt = str(cell.String).strip()
|
||||
if not txt:
|
||||
continue
|
||||
|
||||
# phrase-first: try entire cleaned phrase (remove stopwords)
|
||||
tokens = [t.strip() for t in re.split(r'\s+', normalize_text(txt)) if t and t not in STOPWORDS]
|
||||
phrase = " ".join(tokens).strip()
|
||||
terms = []
|
||||
if phrase:
|
||||
# first try phrase as whole
|
||||
mapped_phrase = map_term(phrase, norm_dict, lemma_index)
|
||||
if mapped_phrase["hits"] or mapped_phrase["suggestions"]:
|
||||
# use phrase result (flatten hits+suggestions for output)
|
||||
row_hits = mapped_phrase["hits"]
|
||||
row_sugs = mapped_phrase["suggestions"]
|
||||
row_ids = mapped_phrase["ids"]
|
||||
any_unmapped = False if (row_hits or row_sugs) else True
|
||||
else:
|
||||
# fallback to token/compound processing
|
||||
for p in [p for p in re.split(r'[,\s]+', txt) if p.strip()]:
|
||||
if p.lower() in STOPWORDS or re.fullmatch(r'\d+', p):
|
||||
continue
|
||||
for sp in compound_split(p):
|
||||
if sp and sp.strip():
|
||||
terms.append(sp.strip())
|
||||
row_hits = []
|
||||
row_sugs = []
|
||||
row_ids = []
|
||||
any_unmapped = False
|
||||
for term in terms:
|
||||
mapped = map_term(term, norm_dict, lemma_index)
|
||||
hits, sugs, ids = mapped["hits"], mapped["suggestions"], mapped["ids"]
|
||||
if hits:
|
||||
row_hits.extend(hits)
|
||||
if sugs:
|
||||
row_sugs.extend(sugs)
|
||||
if ids:
|
||||
row_ids.extend(ids)
|
||||
if not hits and not sugs:
|
||||
any_unmapped = True
|
||||
else:
|
||||
row_hits, row_sugs, row_ids = [], [], []
|
||||
any_unmapped = True
|
||||
|
||||
# dedupe preserving order
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
row_hits = uniq(row_hits)
|
||||
row_sugs = uniq(row_sugs)
|
||||
row_ids = uniq(row_ids)
|
||||
|
||||
# write
|
||||
sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits)
|
||||
sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs)
|
||||
sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids)
|
||||
|
||||
cell.CellBackColor = RED if any_unmapped else 0xFFFFFF
|
||||
sheet.getCellByPosition(norm_tr_col, r).CellBackColor = GREEN if row_hits else 0xFFFFFF
|
||||
sheet.getCellByPosition(norm_sug_col, r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF
|
||||
|
||||
rows_processed += 1
|
||||
except Exception as e:
|
||||
log(f"Fehler in Zeile {r}: {e}", level="ERROR")
|
||||
continue
|
||||
|
||||
# persist cache file to DOC_DIR
|
||||
try:
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
except Exception as e:
|
||||
log(f"Cache speichern fehlgeschlagen: {e}", level="WARN")
|
||||
|
||||
log(f"=== mapper_macro fertig. Zeilen verarbeitet: {rows_processed} ===", level="INFO")
|
||||
except Exception as e:
|
||||
# top-level safety
|
||||
try:
|
||||
log(f"run_mapper_macro: Unhandled exception: {e}", level="ERROR")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------
|
||||
# Export
|
||||
# ------------------------
|
||||
g_exportedScripts = (run_mapper_macro,)
|
||||
343
mapper_macro_2.0.py
Normal file
343
mapper_macro_2.0.py
Normal file
@ -0,0 +1,343 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
LibreOffice Calc Makro: NV_MASTER-Abgleich (verbessertes semantisches Matching)
|
||||
Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro/mapper_macro.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import traceback
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# LIBRARIES & MODELS
|
||||
# ------------------------------------------------------------
|
||||
try:
|
||||
import pandas as pd
|
||||
PANDAS_AVAILABLE = True
|
||||
except Exception:
|
||||
PANDAS_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import spacy
|
||||
# Verwende das mittlere Modell für semantische Ähnlichkeit
|
||||
nlp = spacy.load("de_core_news_md")
|
||||
SPACY_AVAILABLE = True
|
||||
except Exception:
|
||||
SPACY_AVAILABLE = False
|
||||
nlp = None
|
||||
|
||||
try:
|
||||
from rapidfuzz import fuzz
|
||||
RAPIDFUZZ_AVAILABLE = True
|
||||
except Exception:
|
||||
RAPIDFUZZ_AVAILABLE = False
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# KONFIGURATION
|
||||
# ------------------------------------------------------------
|
||||
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
||||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log")
|
||||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json")
|
||||
|
||||
STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"}
|
||||
CONF_THRESHOLD = 0.70 # etwas großzügiger für semantisches Matching
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# LOGGING
|
||||
# ------------------------------------------------------------
|
||||
def log(msg):
|
||||
"""Schreibt technische Logs ins Makroverzeichnis."""
|
||||
try:
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(msg.strip() + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
log("Makro gestartet")
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# CACHE
|
||||
# ------------------------------------------------------------
|
||||
try:
|
||||
if os.path.exists(CACHE_FILE):
|
||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||||
CACHE = json.load(f)
|
||||
else:
|
||||
CACHE = {}
|
||||
except Exception:
|
||||
CACHE = {}
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# TEXTNORMALISIERUNG & LEMMATISIERUNG
|
||||
# ------------------------------------------------------------
|
||||
def normalize_text(s):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).strip().lower()
|
||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s
|
||||
|
||||
lemma_cache = {}
|
||||
def lemmatize_term(term):
|
||||
t = normalize_text(term)
|
||||
if t in lemma_cache:
|
||||
return lemma_cache[t]
|
||||
if SPACY_AVAILABLE and nlp:
|
||||
try:
|
||||
doc = nlp(t)
|
||||
lemma = " ".join([token.lemma_ for token in doc])
|
||||
except Exception:
|
||||
lemma = t
|
||||
else:
|
||||
lemma = t
|
||||
lemma_cache[t] = lemma
|
||||
return lemma
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# NV_MASTER LADEN
|
||||
# ------------------------------------------------------------
|
||||
def build_norm_index(nv_path):
|
||||
norm_dict = {}
|
||||
lemma_index = {}
|
||||
|
||||
if not PANDAS_AVAILABLE:
|
||||
log("Pandas nicht verfügbar – NV_MASTER kann nicht geladen werden.")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
try:
|
||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Laden von NV_MASTER: {e}")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
for sheet_name, df in sheets.items():
|
||||
if str(sheet_name).strip().lower() == "master":
|
||||
continue
|
||||
df = df.fillna("")
|
||||
cols = [str(c).strip().lower() for c in df.columns]
|
||||
id_col = next((df.columns[i] for i, c in enumerate(cols) if "id" in c), df.columns[0])
|
||||
word_col = next((df.columns[i] for i, c in enumerate(cols) if "wort" in c or "vokabel" in c), df.columns[-1])
|
||||
|
||||
current_parent_id = None
|
||||
for _, row in df.iterrows():
|
||||
id_val = str(row[id_col]).strip()
|
||||
word_val = str(row[word_col]).strip()
|
||||
if id_val:
|
||||
current_parent_id = id_val
|
||||
if not word_val:
|
||||
continue
|
||||
norm_name = normalize_text(word_val)
|
||||
lemma = lemmatize_term(word_val)
|
||||
entry = {"Name": word_val, "ID": current_parent_id or "", "Sheet": sheet_name}
|
||||
norm_dict.setdefault(norm_name, []).append(entry)
|
||||
lemma_index.setdefault(lemma, []).append(entry)
|
||||
|
||||
log(f"NV_MASTER geladen: {sum(len(v) for v in norm_dict.values())} Begriffe.")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# SCORING: FUZZY + SEMANTISCH
|
||||
# ------------------------------------------------------------
|
||||
def fuzzy_score(a, b):
|
||||
if RAPIDFUZZ_AVAILABLE:
|
||||
try:
|
||||
return fuzz.token_set_ratio(a, b) / 100.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
else:
|
||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||||
|
||||
def semantic_similarity(a, b):
|
||||
if not SPACY_AVAILABLE or not hasattr(nlp.vocab, "vectors"):
|
||||
return 0.0
|
||||
try:
|
||||
doc_a, doc_b = nlp(a), nlp(b)
|
||||
if doc_a.vector_norm and doc_b.vector_norm:
|
||||
return float(doc_a.similarity(doc_b))
|
||||
return 0.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
def combined_score(a, b):
|
||||
sf = fuzzy_score(a, b)
|
||||
ss = semantic_similarity(a, b)
|
||||
return max(sf, ss)
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# MATCHING & VORSCHLÄGE
|
||||
# ------------------------------------------------------------
|
||||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD):
|
||||
candidates = []
|
||||
for key_lemma, entries in lemma_index.items():
|
||||
score = combined_score(term_lemma, key_lemma)
|
||||
if key_lemma.startswith(term_lemma):
|
||||
score = min(score + 0.05, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
for norm_key, entries in norm_dict.items():
|
||||
score = combined_score(term_lemma, norm_key)
|
||||
if norm_key.startswith(term_lemma):
|
||||
score = min(score + 0.05, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
candidates.sort(key=lambda x: x[0], reverse=True)
|
||||
seen, results = set(), []
|
||||
for score, name, id_ in candidates:
|
||||
key = (name.lower(), id_.lower() if id_ else "")
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
results.append({"score": score, "name": name, "id": id_})
|
||||
if len(results) >= top_n:
|
||||
break
|
||||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
||||
|
||||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
||||
term_norm = normalize_text(term)
|
||||
term_lemma = lemmatize_term(term)
|
||||
|
||||
if term_lemma in CACHE:
|
||||
return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"], CACHE[term_lemma]["ids"]
|
||||
|
||||
hits, suggestions, ids = [], [], []
|
||||
|
||||
if term_norm in norm_dict:
|
||||
for e in norm_dict[term_norm]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
|
||||
if not hits and term_lemma in lemma_index:
|
||||
for e in lemma_index[term_lemma]:
|
||||
hits.append(e["Name"])
|
||||
if e["ID"]:
|
||||
ids.append(e["ID"])
|
||||
|
||||
suggs = get_suggestions_for_term(term_lemma, norm_dict, lemma_index, top_n=3, threshold=CONF_THRESHOLD)
|
||||
filtered_suggs = []
|
||||
for s in suggs:
|
||||
s_clean = normalize_text(s.split(" (")[0])
|
||||
if s_clean not in [normalize_text(h) for h in hits]:
|
||||
filtered_suggs.append(s)
|
||||
suggestions = filtered_suggs
|
||||
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
hits, suggestions, ids = uniq(hits), uniq(suggestions), uniq(ids)
|
||||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids}
|
||||
|
||||
log(f"TERM: {term} | HITS: {hits} | SUGGS: {suggestions}")
|
||||
return hits, suggestions, ids
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# HAUPTMAKRO
|
||||
# ------------------------------------------------------------
|
||||
def run_mapper_macro():
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Zugriff auf Dokument: {e}")
|
||||
return
|
||||
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
if not norm_dict:
|
||||
log("Fehler: NV_MASTER leer oder nicht gefunden.")
|
||||
return
|
||||
|
||||
try:
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
used = cursor.getRangeAddress()
|
||||
except Exception as e:
|
||||
log(f"Cursor-Fehler: {e}")
|
||||
return
|
||||
|
||||
header_row = 0
|
||||
objekt_col = None
|
||||
for c in range(0, used.EndColumn + 1):
|
||||
val = str(sheet.getCellByPosition(c, header_row).String).strip().lower()
|
||||
if val == "objektbeschreibung":
|
||||
objekt_col = c
|
||||
break
|
||||
if objekt_col is None:
|
||||
log("Keine Spalte 'Objektbeschreibung' gefunden.")
|
||||
return
|
||||
|
||||
existing = {}
|
||||
for c in range(0, used.EndColumn + 1):
|
||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||||
if h == "Norm_Treffer": existing["Norm_Treffer"] = c
|
||||
if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c
|
||||
if h == "Norm_ID": existing["Norm_ID"] = c
|
||||
|
||||
last_col = used.EndColumn
|
||||
for name in ["Norm_Treffer", "Norm_Vorschlag", "Norm_ID"]:
|
||||
if name not in existing:
|
||||
last_col += 1
|
||||
existing[name] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = name
|
||||
|
||||
GREEN, YELLOW, RED = 0xADFF2F, 0xFFD700, 0xCC0000
|
||||
norm_tr_col, norm_sug_col, norm_id_col = existing["Norm_Treffer"], existing["Norm_Vorschlag"], existing["Norm_ID"]
|
||||
|
||||
rows = 0
|
||||
for r in range(header_row + 1, used.EndRow + 1):
|
||||
txt = str(sheet.getCellByPosition(objekt_col, r).String).strip()
|
||||
if not txt:
|
||||
continue
|
||||
terms = [t.strip() for t in re.split(r",|\s+", txt) if t.strip() and t.lower() not in STOPWORDS]
|
||||
row_hits, row_sugs, row_ids, any_unmapped = [], [], [], False
|
||||
for term in terms:
|
||||
hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index)
|
||||
if hits: row_hits.extend(hits)
|
||||
if sugs: row_sugs.extend(sugs)
|
||||
if ids: row_ids.extend(ids)
|
||||
if not hits and not sugs: any_unmapped = True
|
||||
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
row_hits, row_sugs, row_ids = uniq(row_hits), uniq(row_sugs), uniq(row_ids)
|
||||
sheet.getCellByPosition(norm_tr_col, r).String = " | ".join(row_hits)
|
||||
sheet.getCellByPosition(norm_sug_col, r).String = " | ".join(row_sugs)
|
||||
sheet.getCellByPosition(norm_id_col, r).String = " | ".join(row_ids)
|
||||
|
||||
obj_cell = sheet.getCellByPosition(objekt_col, r)
|
||||
sug_cell = sheet.getCellByPosition(norm_sug_col, r)
|
||||
tr_cell = sheet.getCellByPosition(norm_tr_col, r)
|
||||
|
||||
if any_unmapped:
|
||||
obj_cell.CellBackColor = RED
|
||||
elif row_hits:
|
||||
tr_cell.CellBackColor = GREEN
|
||||
if row_sugs:
|
||||
sug_cell.CellBackColor = YELLOW
|
||||
|
||||
rows += 1
|
||||
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
log(f"Makro abgeschlossen, {rows} Zeilen verarbeitet.")
|
||||
|
||||
g_exportedScripts = (run_mapper_macro,)
|
||||
365
mapper_macro_2.1.py
Normal file
365
mapper_macro_2.1.py
Normal file
@ -0,0 +1,365 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# LibreOffice Calc macro: NV_MASTER-Abgleich, Pandas+odf, Cache, Farben
|
||||
# Speicherort: /home/jarnold/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro/mapper_macro_2.1.py
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import traceback
|
||||
|
||||
# UNO-Context wird zur Laufzeit zur Verfügung gestellt (XSCRIPTCONTEXT)
|
||||
try:
|
||||
import pandas as pd
|
||||
PANDAS_AVAILABLE = True
|
||||
except Exception:
|
||||
PANDAS_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import spacy
|
||||
nlp = spacy.load("de_core_news_sm")
|
||||
SPACY_AVAILABLE = True
|
||||
except Exception:
|
||||
SPACY_AVAILABLE = False
|
||||
nlp = None
|
||||
|
||||
try:
|
||||
from rapidfuzz import fuzz
|
||||
RAPIDFUZZ_AVAILABLE = True
|
||||
except Exception:
|
||||
RAPIDFUZZ_AVAILABLE = False
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# ------------------------
|
||||
# Konfiguration
|
||||
# ------------------------
|
||||
BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro"
|
||||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
||||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro_2.1.log")
|
||||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache_2.1.json")
|
||||
|
||||
STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"}
|
||||
CONF_THRESHOLD = 0.75 # Basis-Schwelle für Vorschläge
|
||||
|
||||
# ------------------------
|
||||
# Utilities: Logging & safe I/O
|
||||
# ------------------------
|
||||
def log(msg):
|
||||
try:
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(msg + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------
|
||||
# Cache laden
|
||||
# ------------------------
|
||||
try:
|
||||
if os.path.exists(CACHE_FILE):
|
||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||||
CACHE = json.load(f)
|
||||
else:
|
||||
CACHE = {}
|
||||
except Exception:
|
||||
CACHE = {}
|
||||
|
||||
# ------------------------
|
||||
# Text-Normalisierung & Lemma
|
||||
# ------------------------
|
||||
def normalize_text(s):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).strip().lower()
|
||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s
|
||||
|
||||
lemma_cache = {}
|
||||
def lemmatize_term(term):
|
||||
term_norm = normalize_text(term)
|
||||
if term_norm in lemma_cache:
|
||||
return lemma_cache[term_norm]
|
||||
if SPACY_AVAILABLE and nlp:
|
||||
try:
|
||||
doc = nlp(term_norm)
|
||||
lemma = " ".join([token.lemma_ for token in doc])
|
||||
except Exception:
|
||||
lemma = term_norm
|
||||
else:
|
||||
lemma = term_norm
|
||||
lemma_cache[term_norm] = lemma
|
||||
return lemma
|
||||
|
||||
# ------------------------
|
||||
# NV_MASTER robust laden (pandas + odf)
|
||||
# ------------------------
|
||||
def build_norm_index(nv_path):
|
||||
norm_dict = {} # normalized_name -> list of entries (Name, ID, Sheet)
|
||||
lemma_index = {} # lemma -> list of entries
|
||||
if not PANDAS_AVAILABLE:
|
||||
log("Pandas nicht verfügbar. NV_MASTER kann nicht zuverlässig gelesen werden.")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
try:
|
||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Einlesen von NV_MASTER mit pandas: {e}")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
for sheet_name, df in sheets.items():
|
||||
if str(sheet_name).strip().lower() == "master":
|
||||
continue
|
||||
df = df.fillna("")
|
||||
cols = [str(c).strip().lower() for c in df.columns]
|
||||
id_col = None
|
||||
word_col = None
|
||||
for i, c in enumerate(cols):
|
||||
if "id" in c:
|
||||
id_col = df.columns[i]
|
||||
if "wort" in c or "vokabel" in c:
|
||||
word_col = df.columns[i]
|
||||
if word_col is None and len(df.columns) >= 1:
|
||||
word_col = df.columns[-1]
|
||||
if id_col is None and len(df.columns) >= 1:
|
||||
id_col = df.columns[0]
|
||||
|
||||
current_parent_id = None
|
||||
for _, row in df.iterrows():
|
||||
id_val = str(row[id_col]).strip() if id_col in df.columns else ""
|
||||
word_val = str(row[word_col]).strip() if word_col in df.columns else ""
|
||||
if id_val:
|
||||
current_parent_id = id_val
|
||||
if not word_val:
|
||||
continue
|
||||
norm_name = normalize_text(word_val)
|
||||
lemma = lemmatize_term(word_val)
|
||||
entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name}
|
||||
norm_dict.setdefault(norm_name, []).append(entry)
|
||||
lemma_index.setdefault(lemma, []).append(entry)
|
||||
|
||||
log(f"NV_MASTER geladen ({NV_MASTER_PATH}). Begriffe: {sum(len(v) for v in norm_dict.values())}")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
# ------------------------
|
||||
# Matching: exakter Treffer, Lemma-Treffer, Fuzzy-Vorschläge
|
||||
# ------------------------
|
||||
def fuzzy_score(a, b):
|
||||
if RAPIDFUZZ_AVAILABLE:
|
||||
try:
|
||||
return fuzz.token_set_ratio(a, b) / 100.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
else:
|
||||
try:
|
||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD):
|
||||
candidates = []
|
||||
for key_lemma, entries in lemma_index.items():
|
||||
score = fuzzy_score(term_lemma, key_lemma)
|
||||
if key_lemma.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
for norm_key, entries in norm_dict.items():
|
||||
score = fuzzy_score(term_lemma, norm_key)
|
||||
if norm_key.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries:
|
||||
candidates.append((score, e["Name"], e["ID"]))
|
||||
candidates.sort(key=lambda t: t[0], reverse=True)
|
||||
seen = set()
|
||||
results = []
|
||||
for score, name, id_ in candidates:
|
||||
key = (name, id_)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
results.append({"score": score, "name": name, "id": id_})
|
||||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
||||
|
||||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
||||
term_norm = normalize_text(term)
|
||||
term_lemma = lemmatize_term(term)
|
||||
if term_lemma in CACHE:
|
||||
return CACHE[term_lemma]["hits"], CACHE[term_lemma]["suggestions"]
|
||||
|
||||
hits = []
|
||||
suggestions = []
|
||||
|
||||
if term_norm in norm_dict:
|
||||
for e in norm_dict[term_norm]:
|
||||
hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"])
|
||||
|
||||
if not hits and term_lemma in lemma_index:
|
||||
for e in lemma_index[term_lemma]:
|
||||
hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"])
|
||||
|
||||
if not hits:
|
||||
suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index)
|
||||
|
||||
def unique_preserve(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
hits = unique_preserve(hits)
|
||||
suggestions = unique_preserve(suggestions)
|
||||
|
||||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions}
|
||||
return hits, suggestions
|
||||
|
||||
# ------------------------
|
||||
# Haupt-Makro
|
||||
# ------------------------
|
||||
def run_mapper_macro():
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
data_range = cursor.getRangeAddress()
|
||||
except Exception as e:
|
||||
log("Fehler: konnte Dokument/Sheet nicht öffnen: " + str(e))
|
||||
return
|
||||
|
||||
header_row = None
|
||||
objekt_col = None
|
||||
max_col = data_range.EndColumn
|
||||
for r in range(0, min(5, data_range.EndRow+1)):
|
||||
for c in range(0, max_col+1):
|
||||
try:
|
||||
val = str(sheet.getCellByPosition(c, r).String).strip().lower()
|
||||
except Exception:
|
||||
val = ""
|
||||
if val == "objektbeschreibung":
|
||||
header_row = r
|
||||
objekt_col = c
|
||||
break
|
||||
if objekt_col is not None:
|
||||
break
|
||||
|
||||
if objekt_col is None:
|
||||
log("Spalte 'Objektbeschreibung' nicht gefunden. Abbruch.")
|
||||
return
|
||||
|
||||
# Prüfen/Anlegen der Ergebnis-Spalten
|
||||
existing = {}
|
||||
for c in range(0, data_range.EndColumn+1):
|
||||
try:
|
||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||||
except Exception:
|
||||
h = ""
|
||||
if h == "Norm_Treffer":
|
||||
existing["Norm_Treffer"] = c
|
||||
if h == "Norm_Vorschlag":
|
||||
existing["Norm_Vorschlag"] = c
|
||||
|
||||
last_col = data_range.EndColumn
|
||||
if "Norm_Treffer" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Treffer"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer"
|
||||
if "Norm_Vorschlag" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Vorschlag"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
||||
|
||||
norm_tr_col = existing["Norm_Treffer"]
|
||||
norm_sug_col = existing["Norm_Vorschlag"]
|
||||
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
if not norm_dict and not lemma_index:
|
||||
log("NV_MASTER leer oder nicht lesbar. Abbruch.")
|
||||
return
|
||||
|
||||
GREEN = 0xADFF2F
|
||||
YELLOW = 0xFFA500
|
||||
RED = 0xCC0000
|
||||
WHITE = 0xFFFFFF
|
||||
|
||||
rows_processed = 0
|
||||
for r in range(header_row + 1, data_range.EndRow + 1):
|
||||
try:
|
||||
cell = sheet.getCellByPosition(objekt_col, r)
|
||||
txt = str(cell.String).strip()
|
||||
if not txt:
|
||||
continue
|
||||
|
||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
||||
terms = []
|
||||
for cl in clauses:
|
||||
parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()]
|
||||
for p in parts:
|
||||
if p.lower() in STOPWORDS:
|
||||
continue
|
||||
if re.fullmatch(r"\d+", p):
|
||||
continue
|
||||
terms.append(p)
|
||||
|
||||
row_hits = []
|
||||
row_sugs = []
|
||||
unmapped_terms = []
|
||||
|
||||
for term in terms:
|
||||
hits, sugs = map_term_with_indexes(term, norm_dict, lemma_index)
|
||||
if hits:
|
||||
row_hits.extend(hits)
|
||||
else:
|
||||
unmapped_terms.append(term)
|
||||
if sugs:
|
||||
row_sugs.extend(sugs)
|
||||
|
||||
def uniq(seq):
|
||||
seen = set()
|
||||
out = []
|
||||
for x in seq:
|
||||
if x not in seen:
|
||||
seen.add(x)
|
||||
out.append(x)
|
||||
return out
|
||||
|
||||
row_hits = uniq(row_hits)
|
||||
row_sugs = uniq(row_sugs)
|
||||
|
||||
# Farb-Logik für Objektbeschreibung
|
||||
if terms and not unmapped_terms and row_hits:
|
||||
cell.CellBackColor = GREEN
|
||||
row_sugs = []
|
||||
elif row_hits:
|
||||
cell.CellBackColor = YELLOW
|
||||
else:
|
||||
cell.CellBackColor = RED
|
||||
|
||||
# Ergebniszellen
|
||||
tr_cell = sheet.getCellByPosition(norm_tr_col, r)
|
||||
tr_cell.String = " | ".join(row_hits)
|
||||
tr_cell.CellBackColor = GREEN if row_hits else WHITE
|
||||
|
||||
sug_cell = sheet.getCellByPosition(norm_sug_col, r)
|
||||
sug_cell.String = " | ".join(row_sugs)
|
||||
sug_cell.CellBackColor = YELLOW if row_sugs else WHITE
|
||||
|
||||
rows_processed += 1
|
||||
|
||||
except Exception as e:
|
||||
log(f"Fehler in Zeile {r}: {e}\n{traceback.format_exc()}")
|
||||
|
||||
try:
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
log(f"run_mapper_macro fertig. Zeilen verarbeitet: {rows_processed}")
|
||||
|
||||
# Export für LibreOffice
|
||||
g_exportedScripts = (run_mapper_macro,)
|
||||
353
mapper_macro_2.2.py
Normal file
353
mapper_macro_2.2.py
Normal file
@ -0,0 +1,353 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# LibreOffice / Excel macro: NV_MASTER-Abgleich, Pandas+odf, Cache, Farben
|
||||
# Version: 2.2
|
||||
# Speicherort (Linux/Windows automatisch erkannt)
|
||||
|
||||
"""
|
||||
Mapper Macro 2.2
|
||||
================
|
||||
Dieses Makro liest die Spalte 'Objektbeschreibung' im aktiven Sheet und versucht,
|
||||
jedes Wort einem Eintrag im Normvokabular <NV_MASTER.ods> zuzuordnen.
|
||||
|
||||
Features:
|
||||
- Direkte Treffer werden unter "Norm_Treffer" gelistet (mit ID in Klammern)
|
||||
- Vorschläge (Fuzzy Matching) werden unter "Norm_Vorschlag" gelistet
|
||||
- Farbregeln:
|
||||
* Grün: Alle Begriffe in der Zeile haben direkte Treffer
|
||||
* Gelb: Mindestens ein Begriff hat Treffer, aber nicht alle
|
||||
* Rot: Kein Treffer für alle Begriffe
|
||||
- Logging aller Schritte in mapper_macro_2.2.log (selbes Verzeichnis wie Makro)
|
||||
- Cache für bereits gematchte Begriffe
|
||||
- OS-Erkennung (Linux/Windows) und automatische Pfadwahl
|
||||
- Unterstützt LibreOffice und Excel (pandas für .ods/.xlsx)
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import traceback
|
||||
import platform
|
||||
|
||||
# ------------------------
|
||||
# OS-basierte Pfade
|
||||
# ------------------------
|
||||
if platform.system().lower().startswith("win"):
|
||||
BASE_DIR = os.path.join(os.environ["APPDATA"], "LibreOffice", "4", "user", "Scripts", "python", "Vokabular_Abgleich_Makro")
|
||||
else:
|
||||
BASE_DIR = os.path.expanduser("~/.config/libreoffice/4/user/Scripts/python/Vokabular_Abgleich_Makro")
|
||||
|
||||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
||||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro_2.2.log")
|
||||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache_2.2.json")
|
||||
|
||||
# Verzeichnis ggf. anlegen
|
||||
os.makedirs(BASE_DIR, exist_ok=True)
|
||||
|
||||
# ------------------------
|
||||
# Abhängigkeiten
|
||||
# ------------------------
|
||||
try:
|
||||
import pandas as pd
|
||||
PANDAS_AVAILABLE = True
|
||||
except Exception:
|
||||
PANDAS_AVAILABLE = False
|
||||
|
||||
try:
|
||||
import spacy
|
||||
nlp = spacy.load("de_core_news_sm")
|
||||
SPACY_AVAILABLE = True
|
||||
except Exception:
|
||||
SPACY_AVAILABLE = False
|
||||
nlp = None
|
||||
|
||||
try:
|
||||
from rapidfuzz import fuzz
|
||||
RAPIDFUZZ_AVAILABLE = True
|
||||
except Exception:
|
||||
RAPIDFUZZ_AVAILABLE = False
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
# ------------------------
|
||||
# Konfiguration
|
||||
# ------------------------
|
||||
STOPWORDS = {"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an","als","bei","für","aus","dem","den","des","eines","einer"}
|
||||
CONF_THRESHOLD = 0.75 # Basis-Schwelle für Vorschläge
|
||||
|
||||
# ------------------------
|
||||
# Logging
|
||||
# ------------------------
|
||||
def log(msg):
|
||||
try:
|
||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
||||
f.write(msg + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ------------------------
|
||||
# Cache laden
|
||||
# ------------------------
|
||||
try:
|
||||
if os.path.exists(CACHE_FILE):
|
||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
||||
CACHE = json.load(f)
|
||||
else:
|
||||
CACHE = {}
|
||||
except Exception:
|
||||
CACHE = {}
|
||||
|
||||
# ------------------------
|
||||
# Text-Normalisierung & Lemma
|
||||
# ------------------------
|
||||
def normalize_text(s):
|
||||
if not s:
|
||||
return ""
|
||||
s = str(s).strip().lower()
|
||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
||||
s = re.sub(r"\s+", " ", s)
|
||||
return s
|
||||
|
||||
lemma_cache = {}
|
||||
def lemmatize_term(term):
|
||||
term_norm = normalize_text(term)
|
||||
if term_norm in lemma_cache:
|
||||
return lemma_cache[term_norm]
|
||||
if SPACY_AVAILABLE and nlp:
|
||||
try:
|
||||
doc = nlp(term_norm)
|
||||
lemma = " ".join([token.lemma_ for token in doc])
|
||||
except Exception:
|
||||
lemma = term_norm
|
||||
else:
|
||||
lemma = term_norm
|
||||
lemma_cache[term_norm] = lemma
|
||||
return lemma
|
||||
|
||||
# ------------------------
|
||||
# NV_MASTER laden
|
||||
# ------------------------
|
||||
def build_norm_index(nv_path):
|
||||
norm_dict = {}
|
||||
lemma_index = {}
|
||||
if not PANDAS_AVAILABLE:
|
||||
log("Pandas nicht verfügbar. NV_MASTER kann nicht gelesen werden.")
|
||||
return norm_dict, lemma_index
|
||||
try:
|
||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
||||
except Exception as e:
|
||||
log(f"Fehler beim Einlesen von NV_MASTER: {e}")
|
||||
return norm_dict, lemma_index
|
||||
for sheet_name, df in sheets.items():
|
||||
if str(sheet_name).strip().lower() == "master":
|
||||
continue
|
||||
df = df.fillna("")
|
||||
cols = [str(c).strip().lower() for c in df.columns]
|
||||
id_col = None
|
||||
word_col = None
|
||||
for i, c in enumerate(cols):
|
||||
if "id" in c: id_col = df.columns[i]
|
||||
if "wort" in c or "vokabel" in c: word_col = df.columns[i]
|
||||
if word_col is None and len(df.columns) >= 1: word_col = df.columns[-1]
|
||||
if id_col is None and len(df.columns) >= 1: id_col = df.columns[0]
|
||||
current_parent_id = None
|
||||
for _, row in df.iterrows():
|
||||
id_val = str(row[id_col]).strip() if id_col in df.columns else ""
|
||||
word_val = str(row[word_col]).strip() if word_col in df.columns else ""
|
||||
if id_val: current_parent_id = id_val
|
||||
if not word_val: continue
|
||||
norm_name = normalize_text(word_val)
|
||||
lemma = lemmatize_term(word_val)
|
||||
entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name}
|
||||
norm_dict.setdefault(norm_name, []).append(entry)
|
||||
lemma_index.setdefault(lemma, []).append(entry)
|
||||
log(f"NV_MASTER geladen ({NV_MASTER_PATH}). Begriffe: {sum(len(v) for v in norm_dict.values())}")
|
||||
return norm_dict, lemma_index
|
||||
|
||||
# ------------------------
|
||||
# Matching
|
||||
# ------------------------
|
||||
def fuzzy_score(a, b):
|
||||
if RAPIDFUZZ_AVAILABLE:
|
||||
try:
|
||||
return fuzz.token_set_ratio(a, b) / 100.0
|
||||
except Exception:
|
||||
return 0.0
|
||||
else:
|
||||
try:
|
||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
||||
except Exception:
|
||||
return 0.0
|
||||
|
||||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD):
|
||||
candidates = []
|
||||
for key_lemma, entries in lemma_index.items():
|
||||
score = fuzzy_score(term_lemma, key_lemma)
|
||||
if key_lemma.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries: candidates.append((score, e["Name"], e["ID"]))
|
||||
for norm_key, entries in norm_dict.items():
|
||||
score = fuzzy_score(term_lemma, norm_key)
|
||||
if norm_key.startswith(term_lemma):
|
||||
score = min(score + 0.1, 1.0)
|
||||
if score >= threshold:
|
||||
for e in entries: candidates.append((score, e["Name"], e["ID"]))
|
||||
candidates.sort(key=lambda t: t[0], reverse=True)
|
||||
seen = set()
|
||||
results = []
|
||||
for score, name, id_ in candidates:
|
||||
key = (name, id_)
|
||||
if key in seen: continue
|
||||
seen.add(key)
|
||||
results.append({"score": score, "name": name, "id": id_})
|
||||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
||||
|
||||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
||||
term_norm = normalize_text(term)
|
||||
term_lemma = lemmatize_term(term)
|
||||
if term_lemma in CACHE:
|
||||
cache_entry = CACHE[term_lemma]
|
||||
hits = cache_entry.get("hits", [])
|
||||
suggestions = cache_entry.get("suggestions", [])
|
||||
return hits, suggestions
|
||||
hits = []
|
||||
suggestions = []
|
||||
if term_norm in norm_dict:
|
||||
for e in norm_dict[term_norm]: hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"])
|
||||
if not hits and term_lemma in lemma_index:
|
||||
for e in lemma_index[term_lemma]: hits.append(f'{e["Name"]} ({e["ID"]})' if e["ID"] else e["Name"])
|
||||
if not hits:
|
||||
suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index)
|
||||
# deduplicate
|
||||
hits = list(dict.fromkeys(hits))
|
||||
suggestions = list(dict.fromkeys(suggestions))
|
||||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions}
|
||||
return hits, suggestions
|
||||
|
||||
# ------------------------
|
||||
# Haupt-Makro
|
||||
# ------------------------
|
||||
def run_mapper_macro():
|
||||
try:
|
||||
doc = XSCRIPTCONTEXT.getDocument()
|
||||
sheet = doc.CurrentController.ActiveSheet
|
||||
cursor = sheet.createCursor()
|
||||
cursor.gotoStartOfUsedArea(False)
|
||||
cursor.gotoEndOfUsedArea(True)
|
||||
data_range = cursor.getRangeAddress()
|
||||
except Exception as e:
|
||||
log("Fehler: konnte Dokument/Sheet nicht öffnen: " + str(e))
|
||||
return
|
||||
|
||||
header_row = None
|
||||
objekt_col = None
|
||||
max_col = data_range.EndColumn
|
||||
for r in range(0, min(5, data_range.EndRow+1)):
|
||||
for c in range(0, max_col+1):
|
||||
try:
|
||||
val = str(sheet.getCellByPosition(c, r).String).strip().lower()
|
||||
except Exception:
|
||||
val = ""
|
||||
if val == "objektbeschreibung":
|
||||
header_row = r
|
||||
objekt_col = c
|
||||
break
|
||||
if objekt_col is not None:
|
||||
break
|
||||
if objekt_col is None:
|
||||
log("Spalte 'Objektbeschreibung' nicht gefunden. Abbruch.")
|
||||
return
|
||||
|
||||
existing = {}
|
||||
for c in range(0, data_range.EndColumn+1):
|
||||
try:
|
||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
||||
except Exception:
|
||||
h = ""
|
||||
if h == "Norm_Treffer": existing["Norm_Treffer"] = c
|
||||
if h == "Norm_Vorschlag": existing["Norm_Vorschlag"] = c
|
||||
|
||||
last_col = data_range.EndColumn
|
||||
if "Norm_Treffer" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Treffer"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer"
|
||||
if "Norm_Vorschlag" not in existing:
|
||||
last_col += 1
|
||||
existing["Norm_Vorschlag"] = last_col
|
||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
||||
|
||||
norm_tr_col = existing["Norm_Treffer"]
|
||||
norm_sug_col = existing["Norm_Vorschlag"]
|
||||
|
||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
||||
if not norm_dict and not lemma_index:
|
||||
log("NV_MASTER leer oder nicht lesbar. Abbruch.")
|
||||
return
|
||||
|
||||
GREEN = 0xADFF2F
|
||||
YELLOW = 0xFFA500
|
||||
RED = 0xCC0000
|
||||
WHITE = 0xFFFFFF
|
||||
|
||||
rows_processed = 0
|
||||
for r in range(header_row + 1, data_range.EndRow + 1):
|
||||
try:
|
||||
cell = sheet.getCellByPosition(objekt_col, r)
|
||||
txt = str(cell.String).strip()
|
||||
if not txt: continue
|
||||
|
||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
||||
terms = []
|
||||
for cl in clauses:
|
||||
parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()]
|
||||
for p in parts:
|
||||
if p.lower() in STOPWORDS: continue
|
||||
if re.fullmatch(r"\d+", p): continue
|
||||
terms.append(p)
|
||||
|
||||
row_hits = []
|
||||
row_sugs = []
|
||||
unmapped_terms = []
|
||||
|
||||
for term in terms:
|
||||
hits, sugs = map_term_with_indexes(term, norm_dict, lemma_index)
|
||||
if hits: row_hits.extend(hits)
|
||||
else:
|
||||
unmapped_terms.append(term)
|
||||
if sugs: row_sugs.extend(sugs)
|
||||
|
||||
row_hits = list(dict.fromkeys(row_hits))
|
||||
row_sugs = list(dict.fromkeys(row_sugs))
|
||||
|
||||
# Farblogik für Objektbeschreibung
|
||||
if terms and not unmapped_terms and row_hits:
|
||||
cell.CellBackColor = GREEN
|
||||
row_sugs = []
|
||||
elif row_hits:
|
||||
cell.CellBackColor = YELLOW
|
||||
else:
|
||||
cell.CellBackColor = RED
|
||||
|
||||
tr_cell = sheet.getCellByPosition(norm_tr_col, r)
|
||||
tr_cell.String = " | ".join(row_hits)
|
||||
tr_cell.CellBackColor = GREEN if row_hits else WHITE
|
||||
|
||||
sug_cell = sheet.getCellByPosition(norm_sug_col, r)
|
||||
sug_cell.String = " | ".join(row_sugs)
|
||||
sug_cell.CellBackColor = YELLOW if row_sugs else WHITE
|
||||
|
||||
rows_processed += 1
|
||||
|
||||
except Exception as e:
|
||||
log(f"Fehler in Zeile {r}: {e}\n{traceback.format_exc()}")
|
||||
|
||||
try:
|
||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
log(f"run_mapper_macro 2.2 fertig. Zeilen verarbeitet: {rows_processed}")
|
||||
|
||||
# Export für LibreOffice
|
||||
g_exportedScripts = (run_mapper_macro,)
|
||||
Loading…
x
Reference in New Issue
Block a user