Исходный код yadisk_async.utils

# -*- coding: utf-8 -*-

from collections import defaultdict
import asyncio

import aiohttp

from .objects import ErrorObject
from .exceptions import *
from . import settings

from typing import Optional, Union, TypeVar, Protocol

from .compat import Callable, Awaitable, TimeoutError

__all__ = ["get_exception", "auto_retry"]

EXCEPTION_MAP = {400: defaultdict(lambda: BadRequestError,
                                  {"FieldValidationError": FieldValidationError}),
                 401: defaultdict(lambda: UnauthorizedError),
                 403: defaultdict(lambda: ForbiddenError),
                 404: defaultdict(lambda: NotFoundError,
                                  {"DiskNotFoundError": PathNotFoundError,
                                   "DiskOperationNotFoundError": OperationNotFoundError}),
                 406: defaultdict(lambda: NotAcceptableError),
                 409: defaultdict(lambda: ConflictError,
                                  {"DiskPathDoesntExistsError": ParentNotFoundError,
                                   "DiskPathPointsToExistentDirectoryError": DirectoryExistsError,
                                   "DiskResourceAlreadyExistsError": PathExistsError,
                                   "MD5DifferError": MD5DifferError}),
                 415: defaultdict(lambda: UnsupportedMediaError),
                 423: defaultdict(lambda: LockedError,
                                  {"DiskResourceLockedError": ResourceIsLockedError,
                                   "DiskUploadTrafficLimitExceeded": UploadTrafficLimitExceededError}),
                 429: defaultdict(lambda: TooManyRequestsError),
                 500: defaultdict(lambda: InternalServerError),
                 502: defaultdict(lambda: BadGatewayError),
                 503: defaultdict(lambda: UnavailableError),
                 504: defaultdict(lambda: GatewayTimeoutError),
                 507: defaultdict(lambda: InsufficientStorageError)}

[документация] async def get_exception(response: aiohttp.client.ClientResponse) -> YaDiskError: """ Get an exception instance based on response, assuming the request has failed. :param response: an instance of :any:`aiohttp.ClientResponse` :returns: an exception instance, subclass of :any:`YaDiskError` """ exc_group = EXCEPTION_MAP.get(response.status, None) if exc_group is None: return UnknownYaDiskError("Unknown Yandex.Disk error") try: js = await response.json() except (ValueError, RuntimeError): js = None error = ErrorObject(js) msg = error.message or "<empty>" desc = error.description or "<empty>" exc = exc_group[error.error] return exc(error.error, "%s (%s / %s)" % (msg, desc, error.error), response)
T = TypeVar("T")
[документация] async def auto_retry(func: Callable[[], Union[T, Awaitable[T]]], n_retries: Optional[int] = None, retry_interval: Optional[Union[int, float]] = None) -> T: """ Attempt to perform a request with automatic retries. A retry is triggered by :any:`aiohttp.ClientError` or :any:`RetriableYaDiskError`. :param func: function to run, must not require any arguments :param n_retries: `int`, maximum number of retries :param retry_interval: `int` or `float`, delay between retries (in seconds) :returns: return value of func() """ if n_retries is None: n_retries = settings.DEFAULT_N_RETRIES if retry_interval is None: retry_interval = settings.DEFAULT_RETRY_INTERVAL for i in range(n_retries + 1): try: if asyncio.iscoroutinefunction(func): return await func() else: return func() except (aiohttp.ClientError, TimeoutError, RetriableYaDiskError) as e: if i == n_retries: raise e if retry_interval: await asyncio.sleep(retry_interval) # This should never be reachable assert False