123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- # Copyright (c) 2012-2013 LiuYC https://github.com/liuyichen/
- # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import random
- import functools
- import logging
- from binascii import crc32
- from kscore.vendored.requests import ConnectionError, Timeout
- from kscore.vendored.requests.packages.urllib3.exceptions import ClosedPoolError
- from kscore.exceptions import ChecksumError, EndpointConnectionError
- logger = logging.getLogger(__name__)
- # The only supported error for now is GENERAL_CONNECTION_ERROR
- # which maps to requests generic ConnectionError. If we're able
- # to get more specific exceptions from requests we can update
- # this mapping with more specific exceptions.
- EXCEPTION_MAP = {
- 'GENERAL_CONNECTION_ERROR': [
- ConnectionError, ClosedPoolError, Timeout,
- EndpointConnectionError
- ],
- }
- def delay_exponential(base, growth_factor, attempts):
- """Calculate time to sleep based on exponential function.
- The format is::
- base * growth_factor ^ (attempts - 1)
- If ``base`` is set to 'rand' then a random number between
- 0 and 1 will be used as the base.
- Base must be greater than 0, otherwise a ValueError will be
- raised.
- """
- if base == 'rand':
- base = random.random()
- elif base <= 0:
- raise ValueError("The 'base' param must be greater than 0, "
- "got: %s" % base)
- time_to_sleep = base * (growth_factor ** (attempts - 1))
- return time_to_sleep
- def create_exponential_delay_function(base, growth_factor):
- """Create an exponential delay function based on the attempts.
- This is used so that you only have to pass it the attempts
- parameter to calculate the delay.
- """
- return functools.partial(
- delay_exponential, base=base, growth_factor=growth_factor)
- def create_retry_handler(config, operation_name=None):
- checker = create_checker_from_retry_config(
- config, operation_name=operation_name)
- action = create_retry_action_from_config(
- config, operation_name=operation_name)
- return RetryHandler(checker=checker, action=action)
- def create_retry_action_from_config(config, operation_name=None):
- # The spec has the possibility of supporting per policy
- # actions, but right now, we assume this comes from the
- # default section, which means that delay functions apply
- # for every policy in the retry config (per service).
- delay_config = config['__default__']['delay']
- if delay_config['type'] == 'exponential':
- return create_exponential_delay_function(
- base=delay_config['base'],
- growth_factor=delay_config['growth_factor'])
- def create_checker_from_retry_config(config, operation_name=None):
- checkers = []
- max_attempts = None
- retryable_exceptions = []
- if '__default__' in config:
- policies = config['__default__'].get('policies', [])
- max_attempts = config['__default__']['max_attempts']
- for key in policies:
- current_config = policies[key]
- checkers.append(_create_single_checker(current_config))
- retry_exception = _extract_retryable_exception(current_config)
- if retry_exception is not None:
- retryable_exceptions.extend(retry_exception)
- if operation_name is not None and config.get(operation_name) is not None:
- operation_policies = config[operation_name]['policies']
- for key in operation_policies:
- checkers.append(_create_single_checker(operation_policies[key]))
- retry_exception = _extract_retryable_exception(
- operation_policies[key])
- if retry_exception is not None:
- retryable_exceptions.extend(retry_exception)
- if len(checkers) == 1:
- # Don't need to use a MultiChecker
- return MaxAttemptsDecorator(checkers[0], max_attempts=max_attempts)
- else:
- multi_checker = MultiChecker(checkers)
- return MaxAttemptsDecorator(
- multi_checker, max_attempts=max_attempts,
- retryable_exceptions=tuple(retryable_exceptions))
- def _create_single_checker(config):
- if 'response' in config['applies_when']:
- return _create_single_response_checker(
- config['applies_when']['response'])
- elif 'socket_errors' in config['applies_when']:
- return ExceptionRaiser()
- def _create_single_response_checker(response):
- if 'service_error_code' in response:
- checker = ServiceErrorCodeChecker(
- status_code=response['http_status_code'],
- error_code=response['service_error_code'])
- elif 'http_status_code' in response:
- checker = HTTPStatusCodeChecker(
- status_code=response['http_status_code'])
- elif 'crc32body' in response:
- checker = CRC32Checker(header=response['crc32body'])
- else:
- # TODO: send a signal.
- raise ValueError("Unknown retry policy: %s" % config)
- return checker
- def _extract_retryable_exception(config):
- applies_when = config['applies_when']
- if 'crc32body' in applies_when.get('response', {}):
- return [ChecksumError]
- elif 'socket_errors' in applies_when:
- exceptions = []
- for name in applies_when['socket_errors']:
- exceptions.extend(EXCEPTION_MAP[name])
- return exceptions
- class RetryHandler(object):
- """Retry handler.
- The retry handler takes two params, ``checker`` object
- and an ``action`` object.
- The ``checker`` object must be a callable object and based on a response
- and an attempt number, determines whether or not sufficient criteria for
- a retry has been met. If this is the case then the ``action`` object
- (which also is a callable) determines what needs to happen in the event
- of a retry.
- """
- def __init__(self, checker, action):
- self._checker = checker
- self._action = action
- def __call__(self, attempts, response, caught_exception, **kwargs):
- """Handler for a retry.
- Intended to be hooked up to an event handler (hence the **kwargs),
- this will process retries appropriately.
- """
- if self._checker(attempts, response, caught_exception):
- result = self._action(attempts=attempts)
- logger.debug("Retry needed, action of: %s", result)
- return result
- logger.debug("No retry needed.")
- class BaseChecker(object):
- """Base class for retry checkers.
- Each class is responsible for checking a single criteria that determines
- whether or not a retry should not happen.
- """
- def __call__(self, attempt_number, response, caught_exception):
- """Determine if retry criteria matches.
- Note that either ``response`` is not None and ``caught_exception`` is
- None or ``response`` is None and ``caught_exception`` is not None.
- :type attempt_number: int
- :param attempt_number: The total number of times we've attempted
- to send the request.
- :param response: The HTTP response (if one was received).
- :type caught_exception: Exception
- :param caught_exception: Any exception that was caught while trying to
- send the HTTP response.
- :return: True, if the retry criteria matches (and therefore a retry
- should occur. False if the criteria does not match.
- """
- # The default implementation allows subclasses to not have to check
- # whether or not response is None or not.
- if response is not None:
- return self._check_response(attempt_number, response)
- elif caught_exception is not None:
- return self._check_caught_exception(attempt_number, caught_exception)
- else:
- raise ValueError("Both response and caught_exception are None.")
- def _check_response(self, attempt_number, response):
- pass
- def _check_caught_exception(self, attempt_number, caught_exception):
- pass
- class MaxAttemptsDecorator(BaseChecker):
- """Allow retries up to a maximum number of attempts.
- This will pass through calls to the decorated retry checker, provided
- that the number of attempts does not exceed max_attempts. It will
- also catch any retryable_exceptions passed in. Once max_attempts has
- been exceeded, then False will be returned or the retryable_exceptions
- that was previously being caught will be raised.
- """
- def __init__(self, checker, max_attempts, retryable_exceptions=None):
- self._checker = checker
- self._max_attempts = max_attempts
- self._retryable_exceptions = retryable_exceptions
- def __call__(self, attempt_number, response, caught_exception):
- should_retry = self._should_retry(attempt_number, response,
- caught_exception)
- if should_retry:
- if attempt_number >= self._max_attempts:
- logger.debug("Reached the maximum number of retry "
- "attempts: %s", attempt_number)
- return False
- else:
- return should_retry
- else:
- return False
- def _should_retry(self, attempt_number, response, caught_exception):
- if self._retryable_exceptions and \
- attempt_number < self._max_attempts:
- try:
- return self._checker(attempt_number, response, caught_exception)
- except self._retryable_exceptions as e:
- logger.debug("retry needed, retryable exception caught: %s",
- e, exc_info=True)
- return True
- else:
- # If we've exceeded the max attempts we just let the exception
- # propogate if one has occurred.
- return self._checker(attempt_number, response, caught_exception)
- class HTTPStatusCodeChecker(BaseChecker):
- def __init__(self, status_code):
- self._status_code = status_code
- def _check_response(self, attempt_number, response):
- if response[0].status_code == self._status_code:
- logger.debug(
- "retry needed: retryable HTTP status code received: %s",
- self._status_code)
- return True
- else:
- return False
- class ServiceErrorCodeChecker(BaseChecker):
- def __init__(self, status_code, error_code):
- self._status_code = status_code
- self._error_code = error_code
- def _check_response(self, attempt_number, response):
- if response[0].status_code == self._status_code:
- actual_error_code = response[1].get('Error', {}).get('Code')
- if actual_error_code == self._error_code:
- logger.debug(
- "retry needed: matching HTTP status and error code seen: "
- "%s, %s", self._status_code, self._error_code)
- return True
- return False
- class MultiChecker(BaseChecker):
- def __init__(self, checkers):
- self._checkers = checkers
- def __call__(self, attempt_number, response, caught_exception):
- for checker in self._checkers:
- checker_response = checker(attempt_number, response,
- caught_exception)
- if checker_response:
- return checker_response
- return False
- class CRC32Checker(BaseChecker):
- def __init__(self, header):
- # The header where the expected crc32 is located.
- self._header_name = header
- def _check_response(self, attempt_number, response):
- http_response = response[0]
- expected_crc = http_response.headers.get(self._header_name)
- if expected_crc is None:
- logger.debug("crc32 check skipped, the %s header is not "
- "in the http response.", self._header_name)
- else:
- actual_crc32 = crc32(response[0].content) & 0xffffffff
- if not actual_crc32 == int(expected_crc):
- logger.debug(
- "retry needed: crc32 check failed, expected != actual: "
- "%s != %s", int(expected_crc), actual_crc32)
- raise ChecksumError(checksum_type='crc32',
- expected_checksum=int(expected_crc),
- actual_checksum=actual_crc32)
- class ExceptionRaiser(BaseChecker):
- """Raise any caught exceptions.
- This class will raise any non None ``caught_exception``.
- """
- def _check_caught_exception(self, attempt_number, caught_exception):
- # This is implementation specific, but this class is useful by
- # coordinating with the MaxAttemptsDecorator.
- # The MaxAttemptsDecorator has a list of exceptions it should catch
- # and retry, but something needs to come along and actually raise the
- # caught_exception. That's what this class is being used for. If
- # the MaxAttemptsDecorator is not interested in retrying the exception
- # then this exception just propogates out past the retry code.
- raise caught_exception
|