from datetime import datetime, timezone
import http
import os
import re
import urllib.request
import urllib.parse
import urllib.error
from pathlib import Path
from typing import Iterable, Optional, Tuple, Union, Callable
import shutil
import mimetypes
import warnings
from cloudpathlib.client import Client, register_client_class
from cloudpathlib.enums import FileCacheMode
from .httppath import HttpPath
@register_client_class("http")
class HttpClient(Client):
def __init__(
self,
file_cache_mode: Optional[Union[str, FileCacheMode]] = None,
local_cache_dir: Optional[Union[str, os.PathLike]] = None,
content_type_method: Optional[Callable] = mimetypes.guess_type,
auth: Optional[urllib.request.BaseHandler] = None,
custom_list_page_parser: Optional[Callable[[str], Iterable[str]]] = None,
custom_dir_matcher: Optional[Callable[[str], bool]] = None,
write_file_http_method: Optional[str] = "PUT",
):
"""Class constructor. Creates an HTTP client that can be used to interact with HTTP servers
using the cloudpathlib library.
Args:
file_cache_mode (Optional[Union[str, FileCacheMode]]): How often to clear the file cache; see
[the caching docs](https://cloudpathlib.drivendata.org/stable/caching/) for more information
about the options in cloudpathlib.eums.FileCacheMode.
local_cache_dir (Optional[Union[str, os.PathLike]]): Path to directory to use as cache
for downloaded files. If None, will use a temporary directory. Default can be set with
the `CLOUDPATHLIB_LOCAL_CACHE_DIR` environment variable.
content_type_method (Optional[Callable]): Function to call to guess media type (mimetype) when
uploading files. Defaults to `mimetypes.guess_type`.
auth (Optional[urllib.request.BaseHandler]): Authentication handler to use for the client. Defaults to None, which will use the default handler.
custom_list_page_parser (Optional[Callable[[str], Iterable[str]]]): Function to call to parse pages that list directories. Defaults to looking for `` tags with `href`.
custom_dir_matcher (Optional[Callable[[str], bool]]): Function to call to identify a url that is a directory. Defaults to a lambda that checks if the path ends with a `/`.
write_file_http_method (Optional[str]): HTTP method to use when writing files. Defaults to "PUT", but some servers may want "POST".
"""
super().__init__(file_cache_mode, local_cache_dir, content_type_method)
self.auth = auth
if self.auth is None:
self.opener = urllib.request.build_opener()
else:
self.opener = urllib.request.build_opener(self.auth)
self.custom_list_page_parser = custom_list_page_parser
self.dir_matcher = (
custom_dir_matcher if custom_dir_matcher is not None else lambda x: x.endswith("/")
)
self.write_file_http_method = write_file_http_method
def _get_metadata(self, cloud_path: HttpPath) -> dict:
with self.opener.open(cloud_path.as_url()) as response:
last_modified = response.headers.get("Last-Modified", None)
if last_modified is not None:
# per https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified
last_modified = datetime.strptime(last_modified, "%a, %d %b %Y %H:%M:%S %Z")
# should always be utc https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified#gmt
last_modified = last_modified.replace(tzinfo=timezone.utc)
return {
"size": int(response.headers.get("Content-Length", 0)),
"last_modified": last_modified,
"content_type": response.headers.get("Content-Type", None),
}
def _is_file_or_dir(self, cloud_path: HttpPath) -> Optional[str]:
if self.dir_matcher(cloud_path.as_url()):
return "dir"
else:
return "file"
def _download_file(self, cloud_path: HttpPath, local_path: Union[str, os.PathLike]) -> Path:
local_path = Path(local_path)
with self.opener.open(cloud_path.as_url()) as response:
# Ensure parent directory exists before opening file
local_path.parent.mkdir(parents=True, exist_ok=True)
with local_path.open("wb") as out_file:
shutil.copyfileobj(response, out_file)
return local_path
def _exists(self, cloud_path: HttpPath) -> bool:
request = urllib.request.Request(cloud_path.as_url(), method="HEAD")
try:
with self.opener.open(request) as response:
return response.status == 200
except (urllib.error.HTTPError, urllib.error.URLError) as e:
if isinstance(e, urllib.error.URLError) or e.code == 404:
return False
raise
def _move_file(self, src: HttpPath, dst: HttpPath, remove_src: bool = True) -> HttpPath:
# .fspath will download the file so the local version can be uploaded
self._upload_file(src.fspath, dst)
if remove_src:
try:
self._remove(src)
except Exception as e:
warnings.warn(
f"File was successfully uploaded to {dst} but failed to remove original {src}: {e}",
UserWarning,
)
raise
return dst
def _remove(self, cloud_path: HttpPath, missing_ok: bool = True) -> None:
request = urllib.request.Request(cloud_path.as_url(), method="DELETE")
try:
with self.opener.open(request) as response:
if response.status != 204:
raise Exception(f"Failed to delete {cloud_path}.")
except urllib.error.HTTPError as e:
if e.code == 404 and missing_ok:
pass
else:
raise FileNotFoundError(f"Failed to delete {cloud_path}.")
def _list_dir(self, cloud_path: HttpPath, recursive: bool) -> Iterable[Tuple[HttpPath, bool]]:
try:
with self.opener.open(cloud_path.as_url()) as response:
# Parse the directory listing
for path, is_dir in self._parse_list_dir_response(
response.read().decode(), base_url=str(cloud_path)
):
yield path, is_dir
# If it's a directory and recursive is True, list the contents of the directory
if recursive and is_dir:
yield from self._list_dir(path, recursive=True)
except Exception as e: # noqa E722
raise NotImplementedError(
f"Unable to parse response as a listing of files; please provide a custom parser as `custom_list_page_parser`. Error raised: {e}"
)
def _upload_file(self, local_path: Union[str, os.PathLike], cloud_path: HttpPath) -> HttpPath:
local_path = Path(local_path)
if self.content_type_method is not None:
content_type, _ = self.content_type_method(local_path)
headers = {"Content-Type": content_type or "application/octet-stream"}
with local_path.open("rb") as file_data:
request = urllib.request.Request(
cloud_path.as_url(),
data=file_data.read(),
method=self.write_file_http_method,
headers=headers,
)
with self.opener.open(request) as response:
if response.status != 201 and response.status != 200:
raise Exception(f"Failed to upload {local_path} to {cloud_path}.")
return cloud_path
def _get_public_url(self, cloud_path: HttpPath) -> str:
return cloud_path.as_url()
def _generate_presigned_url(self, cloud_path: HttpPath, expire_seconds: int = 60 * 60) -> str:
raise NotImplementedError("Presigned URLs are not supported using urllib.")
def _parse_list_dir_response(
self, response: str, base_url: str
) -> Iterable[Tuple[HttpPath, bool]]:
# Ensure base_url ends with a trailing slash so joining works
if not base_url.endswith("/"):
base_url += "/"
def _simple_links(html: str) -> Iterable[str]:
return re.findall(r' Tuple[http.client.HTTPResponse, bytes]:
request = urllib.request.Request(url.as_url(), method=method, **kwargs)
with self.opener.open(request) as response:
# eager read of response content, which is not available after
# the connection is closed when we exit the context manager.
return response, response.read()
HttpClient.HttpPath = HttpClient.CloudPath # type: ignore
@register_client_class("https")
class HttpsClient(HttpClient):
pass
HttpsClient.HttpsPath = HttpsClient.CloudPath # type: ignore