import abc
import os.path
import time
from contextlib import contextmanager
from email.utils import parsedate_to_datetime
from typing import List, ContextManager, Optional, Type, Any, Dict
import requests
from hbutils.string import truncate
from hbutils.system.filesystem.tempfile import TemporaryDirectory
from hbutils.system.network import urlsplit
from ..utils import download_file, srequest, get_requests_session, hash_anything
[docs]class ResourceNotChange(Exception):
pass
[docs]class SyncItem(metaclass=abc.ABCMeta):
__type__: str = None
[docs] def __init__(self, value, metadata: dict, segments: List[str]):
self._value = value
self.metadata = metadata
self.segments = segments
def load_file(self) -> ContextManager[str]:
raise NotImplementedError # pragma: no cover
def refresh_mark(self, mark: Optional[Dict[str, Any]]):
return mark or {}
[docs] def __hash__(self):
return hash_anything((type(self), self._value, self.metadata, self.segments))
[docs] def __eq__(self, other):
if self is other:
return True
elif isinstance(self, type(other)) and isinstance(other, type(self)):
return (self._value, self.metadata, self.segments) == \
(other._value, other.metadata, other.segments)
else:
return False
[docs]class RemoteSyncItem(SyncItem):
__type__ = 'remote'
__headers__ = {}
__request_kwargs__ = {}
[docs] def __init__(self, url, metadata, segments: List[str]):
SyncItem.__init__(self, url, metadata, segments)
self.url = url
self._session = None
def get_new_session(self):
return get_requests_session(headers=self.__headers__)
def _get_session(self) -> requests.Session:
if self._session is None:
self._session = self.get_new_session()
return self._session
def _file_process(self, filename):
pass
@contextmanager
def load_file(self) -> ContextManager[str]:
with TemporaryDirectory() as td:
filename = os.path.join(td, urlsplit(self.url).filename or 'unnamed_file')
download_file(self.url, filename, session=self._get_session(), **self.__request_kwargs__)
self._file_process(filename)
yield filename
def refresh_mark(self, mark: Optional[Dict[str, Any]]):
mark = dict(mark or {})
url = mark.get('url')
if url == self.url: # url not changed
expires = mark.get('expires')
if expires is not None and time.time() < expires:
raise ResourceNotChange
etag = mark.get('etag')
headers = {'If-None-Match': etag} if etag else {}
else:
headers = {}
kwargs = self.__request_kwargs__.copy()
if 'headers' in kwargs:
headers.update(kwargs['headers'])
kwargs.pop('headers')
resp = srequest(
self._get_session(), 'HEAD', self.url,
allow_redirects=True, headers=headers,
**kwargs
)
if resp.status_code == 304:
raise ResourceNotChange
headers = resp.headers
etag = headers.get('ETag')
expires = headers.get('Expires')
expires = parsedate_to_datetime(expires).timestamp() if expires else None
content_length = headers.get('Content-Length')
content_length = int(content_length) if content_length is not None else None
content_type = headers.get('Content-Type')
return {
'url': self.url,
'etag': etag,
'expires': expires,
'content_length': content_length,
'content_type': content_type,
}
def __repr__(self):
return f'<{self.__class__.__name__} url: {self.url!r}>'
[docs]class CustomSyncItem(SyncItem):
__type__ = 'custom'
[docs] def __init__(self, gene, metadata, segments):
SyncItem.__init__(self, gene, metadata, segments)
self.gene = gene
@contextmanager
def load_file(self) -> ContextManager[str]:
with self.gene() as f:
yield f
def __repr__(self):
return f'<{self.__class__.__name__} gene: {self.gene!r}>'
[docs]class TextOutputSyncItem(SyncItem):
__type__ = 'text'
[docs] def __init__(self, content, metadata, segments):
SyncItem.__init__(self, content, metadata, segments)
self.content = content
@contextmanager
def load_file(self) -> ContextManager[str]:
with TemporaryDirectory() as td:
filename = os.path.join(td, 'file')
with open(filename, 'w') as f:
f.write(self.content)
yield filename
def __repr__(self):
return f'<{self.__class__.__name__} content: {truncate(self.content, tail_length=15, show_length=True)!r}>'
_PRESERVED_NAMES = {'metadata'}
_REGISTERED_SYNC_TYPES: Dict[str, Type[SyncItem]] = {}
def register_sync_type(clazz: Type[SyncItem]):
type_ = clazz.__type__
if type_ is None:
raise TypeError(f'Sync item class {clazz!r} should have __type__ instead of None.')
elif type_ in _PRESERVED_NAMES:
raise KeyError(f'Type {type_!r} is preserved, please use another one.')
elif type_ in _REGISTERED_SYNC_TYPES:
raise KeyError(f'Sync item type {type_!r} already exist.')
else:
_REGISTERED_SYNC_TYPES[type_] = clazz
register_sync_type(RemoteSyncItem)
register_sync_type(CustomSyncItem)
register_sync_type(TextOutputSyncItem)
def create_sync_item(type_: str, value: Any, metadata: dict, segments: List[str]) -> SyncItem:
if type_ in _REGISTERED_SYNC_TYPES:
clazz = _REGISTERED_SYNC_TYPES[type_]
return clazz(value, metadata, segments)
else:
raise KeyError(f'Unknown sync item type - {type_!r}.')