Delete mapper_macro_1.4.py
This commit is contained in:
parent
fb30fcd877
commit
41ad23e8df
@ -1,469 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# mapper_macro 1.5 - LibreOffice Calc
|
|
||||||
# Features: Kompositum-Split, Cache, Live-Vorschläge nur auf 'Objektbeschreibung', Logging
|
|
||||||
|
|
||||||
import os
|
|
||||||
import re
|
|
||||||
import json
|
|
||||||
import datetime
|
|
||||||
|
|
||||||
# optional imports (Pandas, Spacy, RapidFuzz)
|
|
||||||
try:
|
|
||||||
import pandas as pd
|
|
||||||
PANDAS_AVAILABLE = True
|
|
||||||
except Exception:
|
|
||||||
PANDAS_AVAILABLE = False
|
|
||||||
|
|
||||||
try:
|
|
||||||
import spacy
|
|
||||||
nlp = spacy.load("de_core_news_sm")
|
|
||||||
SPACY_AVAILABLE = True
|
|
||||||
except Exception:
|
|
||||||
SPACY_AVAILABLE = False
|
|
||||||
nlp = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
from rapidfuzz import fuzz
|
|
||||||
RAPIDFUZZ_AVAILABLE = True
|
|
||||||
except Exception:
|
|
||||||
RAPIDFUZZ_AVAILABLE = False
|
|
||||||
from difflib import SequenceMatcher
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Konfiguration
|
|
||||||
# ------------------------
|
|
||||||
BASE_DIR = "/home/jarnold/.config/libreoffice/4/user/Scripts/python/NV Abgleich Makro"
|
|
||||||
NV_MASTER_PATH = os.path.join(BASE_DIR, "NV_MASTER.ods")
|
|
||||||
CACHE_FILE = os.path.join(BASE_DIR, "mapper_cache.json")
|
|
||||||
LOG_FILE = os.path.join(BASE_DIR, "mapper_macro.log")
|
|
||||||
|
|
||||||
STOPWORDS = {
|
|
||||||
"mit","ohne","der","die","das","ein","eine","und","zu","von","im","in","auf","an",
|
|
||||||
"als","bei","für","aus","dem","den","des","eines","einer"
|
|
||||||
}
|
|
||||||
CONF_THRESHOLD = 0.75
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Logging
|
|
||||||
# ------------------------
|
|
||||||
def log(msg, level="INFO"):
|
|
||||||
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
line = f"[{ts}] [{level}] {msg}\n"
|
|
||||||
try:
|
|
||||||
os.makedirs(os.path.dirname(LOG_FILE), exist_ok=True)
|
|
||||||
with open(LOG_FILE, "a", encoding="utf-8") as f:
|
|
||||||
f.write(line)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Cache laden
|
|
||||||
# ------------------------
|
|
||||||
try:
|
|
||||||
if os.path.exists(CACHE_FILE):
|
|
||||||
with open(CACHE_FILE, "r", encoding="utf-8") as f:
|
|
||||||
CACHE = json.load(f)
|
|
||||||
else:
|
|
||||||
CACHE = {}
|
|
||||||
except Exception as e:
|
|
||||||
CACHE = {}
|
|
||||||
log(f"Fehler beim Laden des Caches: {e}", level="ERROR")
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Textnormalisierung & Lemma
|
|
||||||
# ------------------------
|
|
||||||
lemma_cache = {}
|
|
||||||
|
|
||||||
def normalize_text(s):
|
|
||||||
if not s:
|
|
||||||
return ""
|
|
||||||
s = str(s).strip().lower()
|
|
||||||
s = re.sub(r"[\(\)\[\]\"'\\;:\?!,\.]", "", s)
|
|
||||||
s = re.sub(r"\s+", " ", s)
|
|
||||||
return s
|
|
||||||
|
|
||||||
def lemmatize_term(term):
|
|
||||||
term_norm = normalize_text(term)
|
|
||||||
if term_norm in lemma_cache:
|
|
||||||
return lemma_cache[term_norm]
|
|
||||||
if SPACY_AVAILABLE and nlp:
|
|
||||||
try:
|
|
||||||
doc = nlp(term_norm)
|
|
||||||
lemma = " ".join([t.lemma_ for t in doc])
|
|
||||||
except Exception:
|
|
||||||
lemma = term_norm
|
|
||||||
else:
|
|
||||||
lemma = term_norm
|
|
||||||
lemma_cache[term_norm] = lemma
|
|
||||||
return lemma
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Kompositum-Splitting
|
|
||||||
# ------------------------
|
|
||||||
def compound_split(term):
|
|
||||||
if not term:
|
|
||||||
return []
|
|
||||||
parts = re.findall(r'[A-ZÄÖÜ][a-zäöü]+', term)
|
|
||||||
if parts:
|
|
||||||
return parts
|
|
||||||
parts = [p for p in re.split(r'[-\s]+', term) if p]
|
|
||||||
return parts or [term]
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# NV_MASTER indexieren
|
|
||||||
# ------------------------
|
|
||||||
def build_norm_index(nv_path):
|
|
||||||
norm_dict = {}
|
|
||||||
lemma_index = {}
|
|
||||||
if not PANDAS_AVAILABLE:
|
|
||||||
log("Pandas nicht verfügbar, NV_MASTER kann nicht gelesen.", level="ERROR")
|
|
||||||
return norm_dict, lemma_index
|
|
||||||
try:
|
|
||||||
sheets = pd.read_excel(nv_path, sheet_name=None, engine="odf")
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Fehler beim Einlesen von NV_MASTER: {e}", level="ERROR")
|
|
||||||
return norm_dict, lemma_index
|
|
||||||
|
|
||||||
for sheet_name, df in sheets.items():
|
|
||||||
if str(sheet_name).strip().lower() == "master":
|
|
||||||
continue
|
|
||||||
df = df.fillna("")
|
|
||||||
cols = [str(c).strip().lower() for c in df.columns]
|
|
||||||
id_col = None
|
|
||||||
word_col = None
|
|
||||||
for i, c in enumerate(cols):
|
|
||||||
if "id" in c:
|
|
||||||
id_col = df.columns[i]
|
|
||||||
if "wort" in c or "vokabel" in c:
|
|
||||||
word_col = df.columns[i]
|
|
||||||
if word_col is None and len(df.columns) >= 1:
|
|
||||||
word_col = df.columns[-1]
|
|
||||||
if id_col is None and len(df.columns) >= 1:
|
|
||||||
id_col = df.columns[0]
|
|
||||||
|
|
||||||
current_parent_id = None
|
|
||||||
for _, row in df.iterrows():
|
|
||||||
id_val = str(row[id_col]).strip() if id_col in df.columns else ""
|
|
||||||
word_val = str(row[word_col]).strip() if word_col in df.columns else ""
|
|
||||||
if id_val:
|
|
||||||
current_parent_id = id_val
|
|
||||||
if not word_val:
|
|
||||||
continue
|
|
||||||
norm_name = normalize_text(word_val)
|
|
||||||
lemma = lemmatize_term(word_val)
|
|
||||||
entry = {"Name": word_val.strip(), "ID": current_parent_id or "", "Sheet": sheet_name}
|
|
||||||
norm_dict.setdefault(norm_name, []).append(entry)
|
|
||||||
lemma_index.setdefault(lemma, []).append(entry)
|
|
||||||
|
|
||||||
log(f"NV_MASTER geladen. Begriffe: {sum(len(v) for v in norm_dict.values())}")
|
|
||||||
return norm_dict, lemma_index
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Fuzzy / Vorschläge
|
|
||||||
# ------------------------
|
|
||||||
def fuzzy_score(a, b):
|
|
||||||
if RAPIDFUZZ_AVAILABLE:
|
|
||||||
try:
|
|
||||||
return fuzz.token_set_ratio(a, b) / 100.0
|
|
||||||
except Exception:
|
|
||||||
return 0.0
|
|
||||||
else:
|
|
||||||
return SequenceMatcher(None, a.lower(), b.lower()).ratio()
|
|
||||||
|
|
||||||
def get_suggestions_for_term(term_lemma, norm_dict, lemma_index, threshold=CONF_THRESHOLD):
|
|
||||||
candidates = []
|
|
||||||
for key_lemma, entries in lemma_index.items():
|
|
||||||
score = fuzzy_score(term_lemma, key_lemma)
|
|
||||||
if key_lemma.startswith(term_lemma):
|
|
||||||
score = min(score + 0.1, 1.0)
|
|
||||||
if score >= threshold:
|
|
||||||
for e in entries:
|
|
||||||
candidates.append((score, e["Name"], e["ID"]))
|
|
||||||
for norm_key, entries in norm_dict.items():
|
|
||||||
score = fuzzy_score(term_lemma, norm_key)
|
|
||||||
if norm_key.startswith(term_lemma):
|
|
||||||
score = min(score + 0.1, 1.0)
|
|
||||||
if score >= threshold:
|
|
||||||
for e in entries:
|
|
||||||
candidates.append((score, e["Name"], e["ID"]))
|
|
||||||
candidates.sort(key=lambda t: t[0], reverse=True)
|
|
||||||
seen = set()
|
|
||||||
results = []
|
|
||||||
for score, name, id_ in candidates:
|
|
||||||
key = (name, id_)
|
|
||||||
if key in seen:
|
|
||||||
continue
|
|
||||||
seen.add(key)
|
|
||||||
results.append({"score": score, "name": name, "id": id_})
|
|
||||||
return [f'{r["name"]} ({r["id"]})' if r["id"] else r["name"] for r in results]
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Mapping eines Terms (mit Cache)
|
|
||||||
# ------------------------
|
|
||||||
def map_term_with_indexes(term, norm_dict, lemma_index):
|
|
||||||
term_norm = normalize_text(term)
|
|
||||||
term_lemma = lemmatize_term(term)
|
|
||||||
if term_lemma in CACHE:
|
|
||||||
c = CACHE[term_lemma]
|
|
||||||
return c.get("hits", []), c.get("suggestions", []), c.get("ids", [])
|
|
||||||
|
|
||||||
hits = []
|
|
||||||
suggestions = []
|
|
||||||
ids = []
|
|
||||||
|
|
||||||
if term_norm in norm_dict:
|
|
||||||
for e in norm_dict[term_norm]:
|
|
||||||
hits.append(e["Name"])
|
|
||||||
if e["ID"]:
|
|
||||||
ids.append(e["ID"])
|
|
||||||
if not hits and term_lemma in lemma_index:
|
|
||||||
for e in lemma_index[term_lemma]:
|
|
||||||
hits.append(e["Name"])
|
|
||||||
if e["ID"]:
|
|
||||||
ids.append(e["ID"])
|
|
||||||
suggestions = get_suggestions_for_term(term_lemma, norm_dict, lemma_index)
|
|
||||||
|
|
||||||
if not hits:
|
|
||||||
tokens = compound_split(term)
|
|
||||||
for t in tokens:
|
|
||||||
t_lemma = lemmatize_term(t)
|
|
||||||
if t_lemma in lemma_index:
|
|
||||||
for e in lemma_index[t_lemma]:
|
|
||||||
hits.append(e["Name"])
|
|
||||||
if e["ID"]:
|
|
||||||
ids.append(e["ID"])
|
|
||||||
else:
|
|
||||||
suggestions.extend(get_suggestions_for_term(t_lemma, norm_dict, lemma_index))
|
|
||||||
|
|
||||||
def uniq(seq):
|
|
||||||
seen = set()
|
|
||||||
out = []
|
|
||||||
for x in seq:
|
|
||||||
if x not in seen:
|
|
||||||
seen.add(x)
|
|
||||||
out.append(x)
|
|
||||||
return out
|
|
||||||
|
|
||||||
hits = uniq(hits)
|
|
||||||
suggestions = uniq(suggestions)
|
|
||||||
ids = uniq(ids)
|
|
||||||
|
|
||||||
CACHE[term_lemma] = {"hits": hits, "suggestions": suggestions, "ids": ids}
|
|
||||||
return hits, suggestions, ids
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Header + Spalten
|
|
||||||
# ------------------------
|
|
||||||
def find_header_and_cols(sheet):
|
|
||||||
try:
|
|
||||||
cursor = sheet.createCursor()
|
|
||||||
cursor.gotoStartOfUsedArea(False)
|
|
||||||
cursor.gotoEndOfUsedArea(True)
|
|
||||||
dr = cursor.getRangeAddress()
|
|
||||||
except Exception:
|
|
||||||
return None, None, None
|
|
||||||
header_row = None
|
|
||||||
objekt_col = None
|
|
||||||
for r in range(0, min(5, dr.EndRow + 1)):
|
|
||||||
for c in range(0, dr.EndColumn + 1):
|
|
||||||
try:
|
|
||||||
val = str(sheet.getCellByPosition(c, r).String).strip().lower()
|
|
||||||
except Exception:
|
|
||||||
val = ""
|
|
||||||
if val == "objektbeschreibung":
|
|
||||||
header_row = r
|
|
||||||
objekt_col = c
|
|
||||||
break
|
|
||||||
if objekt_col is not None:
|
|
||||||
break
|
|
||||||
|
|
||||||
if header_row is None:
|
|
||||||
return None, None, dr
|
|
||||||
existing = {}
|
|
||||||
for c in range(0, dr.EndColumn + 1):
|
|
||||||
try:
|
|
||||||
h = str(sheet.getCellByPosition(c, header_row).String).strip()
|
|
||||||
except Exception:
|
|
||||||
h = ""
|
|
||||||
if h == "Norm_Treffer":
|
|
||||||
existing["Norm_Treffer"] = c
|
|
||||||
if h == "Norm_Vorschlag":
|
|
||||||
existing["Norm_Vorschlag"] = c
|
|
||||||
if h == "Norm_ID":
|
|
||||||
existing["Norm_ID"] = c
|
|
||||||
return header_row, objekt_col, dr, existing
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Optimierter Live-Handler (nur Objektbeschreibung)
|
|
||||||
# ------------------------
|
|
||||||
def on_objektbeschreibung_change(oEvent=None):
|
|
||||||
try:
|
|
||||||
doc = XSCRIPTCONTEXT.getDocument()
|
|
||||||
sheet = doc.CurrentController.ActiveSheet
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR")
|
|
||||||
return
|
|
||||||
|
|
||||||
cell = None
|
|
||||||
try:
|
|
||||||
if oEvent and hasattr(oEvent, "Range") and oEvent.Range is not None:
|
|
||||||
cell = oEvent.Range
|
|
||||||
elif oEvent and hasattr(oEvent, "Source") and oEvent.Source is not None:
|
|
||||||
cell = oEvent.Source
|
|
||||||
except Exception:
|
|
||||||
cell = None
|
|
||||||
if cell is None:
|
|
||||||
try:
|
|
||||||
sel = doc.CurrentSelection
|
|
||||||
if hasattr(sel, "getCellByPosition"):
|
|
||||||
cell = sel
|
|
||||||
else:
|
|
||||||
cell = sel.getCellByPosition(0, 0)
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Keine Selektion: {e}", level="ERROR")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
row_index = cell.CellAddress.Row
|
|
||||||
col_index = cell.CellAddress.Column
|
|
||||||
except Exception:
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
header_row, objekt_col, dr, existing = find_header_and_cols(sheet)
|
|
||||||
if header_row is None or col_index != objekt_col:
|
|
||||||
return # nur die Objektbeschreibung-Spalte bearbeiten
|
|
||||||
last_col = dr.EndColumn
|
|
||||||
if "Norm_Vorschlag" not in existing:
|
|
||||||
last_col += 1
|
|
||||||
existing["Norm_Vorschlag"] = last_col
|
|
||||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
|
||||||
norm_sug_col = existing["Norm_Vorschlag"]
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Fehler Spaltenbestimmung: {e}", level="ERROR")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
txt = str(cell.String).strip()
|
|
||||||
if not txt:
|
|
||||||
sheet.getCellByPosition(norm_sug_col, row_index).String = ""
|
|
||||||
return
|
|
||||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
|
||||||
suggestions_acc = []
|
|
||||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
|
||||||
for cl in clauses:
|
|
||||||
parts = [p.strip() for p in re.split(r"\s+", cl) if p.strip()]
|
|
||||||
for p in parts:
|
|
||||||
if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p):
|
|
||||||
continue
|
|
||||||
for sp in compound_split(p):
|
|
||||||
_, sugs, _ = map_term_with_indexes(sp, norm_dict, lemma_index)
|
|
||||||
suggestions_acc.extend(sugs)
|
|
||||||
|
|
||||||
seen = set()
|
|
||||||
ordered = []
|
|
||||||
for s in suggestions_acc:
|
|
||||||
if s not in seen:
|
|
||||||
seen.add(s)
|
|
||||||
ordered.append(s)
|
|
||||||
sheet.getCellByPosition(norm_sug_col, row_index).String = " | ".join(ordered)
|
|
||||||
|
|
||||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
|
||||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Fehler im Live-Handler: {e}", level="ERROR")
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Batch-Durchlauf
|
|
||||||
# ------------------------
|
|
||||||
def run_mapper_macro():
|
|
||||||
log("=== mapper_macro 1.5 gestartet ===", level="INFO")
|
|
||||||
try:
|
|
||||||
doc = XSCRIPTCONTEXT.getDocument()
|
|
||||||
sheet = doc.CurrentController.ActiveSheet
|
|
||||||
cursor = sheet.createCursor()
|
|
||||||
cursor.gotoStartOfUsedArea(False)
|
|
||||||
cursor.gotoEndOfUsedArea(True)
|
|
||||||
dr = cursor.getRangeAddress()
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Dokumentzugriff fehlgeschlagen: {e}", level="ERROR")
|
|
||||||
return
|
|
||||||
|
|
||||||
header_row, objekt_col, dr, existing = find_header_and_cols(sheet)
|
|
||||||
if objekt_col is None:
|
|
||||||
log("Spalte 'Objektbeschreibung' nicht gefunden.", level="ERROR")
|
|
||||||
return
|
|
||||||
if "Norm_Treffer" not in existing:
|
|
||||||
last_col = dr.EndColumn + 1
|
|
||||||
existing["Norm_Treffer"] = last_col
|
|
||||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Treffer"
|
|
||||||
if "Norm_Vorschlag" not in existing:
|
|
||||||
last_col = dr.EndColumn + 2
|
|
||||||
existing["Norm_Vorschlag"] = last_col
|
|
||||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_Vorschlag"
|
|
||||||
if "Norm_ID" not in existing:
|
|
||||||
last_col = dr.EndColumn + 3
|
|
||||||
existing["Norm_ID"] = last_col
|
|
||||||
sheet.getCellByPosition(last_col, header_row).String = "Norm_ID"
|
|
||||||
|
|
||||||
norm_dict, lemma_index = build_norm_index(NV_MASTER_PATH)
|
|
||||||
GREEN, YELLOW, RED = 0xADFF2F, 0xFFA500, 0xCC0000
|
|
||||||
|
|
||||||
for r in range(header_row + 1, dr.EndRow + 1):
|
|
||||||
try:
|
|
||||||
cell = sheet.getCellByPosition(objekt_col, r)
|
|
||||||
txt = str(cell.String).strip()
|
|
||||||
if not txt:
|
|
||||||
continue
|
|
||||||
clauses = [c.strip() for c in re.split(r",", txt) if c.strip()]
|
|
||||||
terms = []
|
|
||||||
for cl in clauses:
|
|
||||||
for p in [p.strip() for p in re.split(r"\s+", cl) if p.strip()]:
|
|
||||||
if p.lower() in STOPWORDS or re.fullmatch(r"\d+", p):
|
|
||||||
continue
|
|
||||||
terms.extend([sp.strip() for sp in compound_split(p) if sp.strip()])
|
|
||||||
|
|
||||||
row_hits, row_sugs, row_ids = [], [], []
|
|
||||||
any_unmapped = False
|
|
||||||
for term in terms:
|
|
||||||
hits, sugs, ids = map_term_with_indexes(term, norm_dict, lemma_index)
|
|
||||||
row_hits.extend(hits)
|
|
||||||
row_sugs.extend(sugs)
|
|
||||||
row_ids.extend(ids)
|
|
||||||
if not hits and not sugs:
|
|
||||||
any_unmapped = True
|
|
||||||
|
|
||||||
def uniq(seq):
|
|
||||||
seen = set()
|
|
||||||
out = []
|
|
||||||
for x in seq:
|
|
||||||
if x not in seen:
|
|
||||||
seen.add(x)
|
|
||||||
out.append(x)
|
|
||||||
return out
|
|
||||||
|
|
||||||
row_hits, row_sugs, row_ids = map(uniq, [row_hits, row_sugs, row_ids])
|
|
||||||
sheet.getCellByPosition(existing["Norm_Treffer"], r).String = " | ".join(row_hits)
|
|
||||||
sheet.getCellByPosition(existing["Norm_Vorschlag"], r).String = " | ".join(row_sugs)
|
|
||||||
sheet.getCellByPosition(existing["Norm_ID"], r).String = " | ".join(row_ids)
|
|
||||||
|
|
||||||
cell.CellBackColor = RED if any_unmapped else 0xFFFFFF
|
|
||||||
sheet.getCellByPosition(existing["Norm_Treffer"], r).CellBackColor = GREEN if row_hits and not any_unmapped else 0xFFFFFF
|
|
||||||
sheet.getCellByPosition(existing["Norm_Vorschlag"], r).CellBackColor = YELLOW if row_sugs else 0xFFFFFF
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
log(f"Fehler in Zeile {r}: {e}", level="ERROR")
|
|
||||||
continue
|
|
||||||
|
|
||||||
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
|
||||||
json.dump(CACHE, f, ensure_ascii=False, indent=2)
|
|
||||||
log("=== mapper_macro 1.5 fertig ===", level="INFO")
|
|
||||||
|
|
||||||
# ------------------------
|
|
||||||
# Export
|
|
||||||
# ------------------------
|
|
||||||
g_exportedScripts = (
|
|
||||||
run_mapper_macro,
|
|
||||||
on_objektbeschreibung_change
|
|
||||||
)
|
|
||||||
Loading…
x
Reference in New Issue
Block a user