2025-10-10 09:46:41 +02:00

158 lines
4.2 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import argparse
import json
import os
import shutil
import sys
import xml
from typing import List, Optional
import rdflib
from . import __version__
from .Wrapper import SPARQLWrapper, _allowedAuth, _allowedFormats, _allowedRequests
class SPARQLWrapperFormatter(
argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter
):
pass
def check_file(v: str) -> str:
if os.path.isfile(v):
return v
elif v == "-":
return "-" # stdin
else:
raise argparse.ArgumentTypeError("file '%s' is not found" % v)
def choicesDescriptions() -> str:
d = "\n - ".join(["allowed FORMAT:"] + _allowedFormats)
d += "\n - ".join(["\n\nallowed METHOD:"] + _allowedRequests)
d += "\n - ".join(["\n\nallowed AUTH:"] + _allowedAuth)
return d
def parse_args(test: Optional[List[str]] = None) -> argparse.Namespace:
"""Parse arguments."""
parser = argparse.ArgumentParser(
prog="rqw",
formatter_class=(
lambda prog: SPARQLWrapperFormatter(
prog,
**{
"width": shutil.get_terminal_size(fallback=(120, 50)).columns,
"max_help_position": 30,
},
)
),
description="sparqlwrapper CLI",
epilog=choicesDescriptions(),
)
input_group = parser.add_mutually_exclusive_group(required=True)
input_group.add_argument(
"-f",
"--file",
metavar="FILE",
type=check_file,
help="query with sparql file (stdin: -)",
)
input_group.add_argument("-Q", "--query", metavar="QUERY", help="query with string")
parser.add_argument(
"-F",
"--format",
default="json",
metavar="FORMAT",
choices=_allowedFormats,
help="response format",
)
parser.add_argument(
"-e",
"--endpoint",
metavar="URI",
help="sparql endpoint",
default="http://dbpedia.org/sparql",
)
parser.add_argument(
"-m",
"--method",
metavar="METHOD",
choices=_allowedRequests,
help="request method",
)
parser.add_argument(
"-a", "--auth", metavar="AUTH", choices=_allowedAuth, help="HTTP auth"
)
parser.add_argument(
"-u", "--username", metavar="ID", default="guest", help="username for auth"
)
parser.add_argument(
"-p", "--password", metavar="PW", default="", help="password for auth"
)
parser.add_argument("-q", "--quiet", action="store_true", help="supress warnings")
parser.add_argument(
"-V", "--version", action="version", version="%(prog)s {}".format(__version__)
)
if test is None:
return parser.parse_args()
else:
return parser.parse_args(test)
def main(test: Optional[List[str]] = None) -> None:
args = parse_args(test)
if args.quiet:
import warnings
warnings.filterwarnings("ignore")
q = ""
if args.query is not None:
q = args.query
elif args.file == "-":
q = sys.stdin.read()
else:
q = open(args.file, "r").read()
sparql = SPARQLWrapper(
args.endpoint,
agent=(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/96.0.4664.110 Safari/537.36"
),
)
if args.auth is not None:
sparql.setHTTPAuth(args.auth)
sparql.setCredentials(args.username, args.password)
if args.method is not None:
sparql.setMethod(args.method)
sparql.setQuery(q)
sparql.setReturnFormat(args.format)
results = sparql.query().convert()
if isinstance(results, dict):
# "json"
print(json.dumps(results, indent=4))
elif isinstance(results, xml.dom.minidom.Document):
# "xml"
print(results.toxml())
elif isinstance(results, bytes):
# "csv", "tsv", "turtle", "n3"
print(results.decode("utf-8"))
elif isinstance(results, rdflib.graph.ConjunctiveGraph):
# "rdf"
print(results.serialize())
else:
# unknown type
raise TypeError(f"Unsupported result of type {type(results)}: {results!r}")
if __name__ == "__main__":
main()