123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- # Copyright (c) 2012-2013 LiuYC https://github.com/liuyichen/
- # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
- #
- # Licensed under the Apache License, Version 2.0 (the "License"). You
- # may not use this file except in compliance with the License. A copy of
- # the License is located at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # or in the "license" file accompanying this file. This file is
- # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
- # ANY KIND, either express or implied. See the License for the specific
- # language governing permissions and limitations under the License.
- import os
- import logging
- import time
- import threading
- from kscore.vendored.requests.sessions import Session
- from kscore.vendored.requests.utils import get_environ_proxies
- from kscore.vendored.requests.exceptions import ConnectionError
- from kscore.vendored import six
- from kscore.ksrequest import create_request_object
- from kscore.exceptions import UnknownEndpointError
- from kscore.exceptions import EndpointConnectionError
- from kscore.exceptions import ConnectionClosedError
- from kscore.compat import filter_ssl_warnings
- from kscore.utils import is_valid_endpoint_url
- from kscore.hooks import first_non_none_response
- from kscore.response import StreamingBody
- from kscore import parsers
- logger = logging.getLogger(__name__)
- DEFAULT_TIMEOUT = 60
- filter_ssl_warnings()
- try:
- from kscore.vendored.requests.packages.urllib3.contrib import pyopenssl
- pyopenssl.extract_from_urllib3()
- except ImportError:
- pass
- def convert_to_response_dict(http_response, operation_model):
- """Convert an HTTP response object to a request dict.
- This converts the requests library's HTTP response object to
- a dictionary.
- :type http_response: kscore.vendored.requests.model.Response
- :param http_response: The HTTP response from an KSYUN service request.
- :rtype: dict
- :return: A response dictionary which will contain the following keys:
- * headers (dict)
- * status_code (int)
- * body (string or file-like object)
- """
- response_dict = {
- 'headers': http_response.headers,
- 'status_code': http_response.status_code,
- }
- if response_dict['status_code'] >= 300:
- response_dict['body'] = http_response.content
- elif operation_model.has_streaming_output:
- response_dict['body'] = StreamingBody(
- http_response.raw, response_dict['headers'].get('content-length'))
- else:
- response_dict['body'] = http_response.content
- return response_dict
- class PreserveAuthSession(Session):
- def rebuild_auth(self, prepared_request, response):
- pass
- class Endpoint(object):
- """
- Represents an endpoint for a particular service in a specific
- region. Only an endpoint can make requests.
- :ivar service: The Service object that describes this endpoints
- service.
- :ivar host: The fully qualified endpoint hostname.
- :ivar session: The session object.
- """
- def __init__(self, host, endpoint_prefix,
- event_emitter, proxies=None, verify=True,
- timeout=DEFAULT_TIMEOUT, response_parser_factory=None):
- self._endpoint_prefix = endpoint_prefix
- self._event_emitter = event_emitter
- self.host = host
- self.verify = verify
- if proxies is None:
- proxies = {}
- self.proxies = proxies
- self.http_session = PreserveAuthSession()
- self.timeout = timeout
- logger.debug('Setting %s timeout as %s', endpoint_prefix, self.timeout)
- self._lock = threading.Lock()
- if response_parser_factory is None:
- response_parser_factory = parsers.ResponseParserFactory()
- self._response_parser_factory = response_parser_factory
- def __repr__(self):
- return '%s(%s)' % (self._endpoint_prefix, self.host)
- def make_request(self, operation_model, request_dict):
- logger.debug("Making request for %s (verify_ssl=%s) with params: %s",
- operation_model, self.verify, request_dict)
- return self._send_request(request_dict, operation_model)
- def create_request(self, params, operation_model=None):
- request = create_request_object(params)
- if operation_model:
- event_name = 'request-created.{endpoint_prefix}.{op_name}'.format(
- endpoint_prefix=self._endpoint_prefix,
- op_name=operation_model.name)
- self._event_emitter.emit(event_name, request=request,
- operation_name=operation_model.name)
- prepared_request = self.prepare_request(request)
- return prepared_request
- def _encode_headers(self, headers):
- # In place encoding of headers to utf-8 if they are unicode.
- for key, value in headers.items():
- if isinstance(value, six.text_type):
- headers[key] = value.encode('utf-8')
- def prepare_request(self, request):
- self._encode_headers(request.headers)
- return request.prepare()
- def _send_request(self, request_dict, operation_model):
- attempts = 1
- request = self.create_request(request_dict, operation_model)
- success_response, exception = self._get_response(
- request, operation_model, attempts)
- while self._needs_retry(attempts, operation_model,
- success_response, exception):
- attempts += 1
- # If there is a stream associated with the request, we need
- # to reset it before attempting to send the request again.
- # This will ensure that we resend the entire contents of the
- # body.
- request.reset_stream()
- # Create a new request when retried (including a new signature).
- request = self.create_request(
- request_dict, operation_model=operation_model)
- success_response, exception = self._get_response(
- request, operation_model, attempts)
- if exception is not None:
- raise exception
- else:
- return success_response
- def _get_response(self, request, operation_model, attempts):
- # This will return a tuple of (success_response, exception)
- # and success_response is itself a tuple of
- # (http_response, parsed_dict).
- # If an exception occurs then the success_response is None.
- # If no exception occurs then exception is None.
- try:
- logger.debug("Sending http request: %s", request)
- http_response = self.http_session.send(
- request, verify=self.verify,
- stream=operation_model.has_streaming_output,
- proxies=self.proxies, timeout=self.timeout)
- except ConnectionError as e:
- # For a connection error, if it looks like it's a DNS
- # lookup issue, 99% of the time this is due to a misconfigured
- # region/endpoint so we'll raise a more specific error message
- # to help users.
- logger.debug("ConnectionError received when sending HTTP request.",
- exc_info=True)
- if self._looks_like_dns_error(e):
- endpoint_url = e.request.url
- better_exception = EndpointConnectionError(
- endpoint_url=endpoint_url, error=e)
- return (None, better_exception)
- elif self._looks_like_bad_status_line(e):
- better_exception = ConnectionClosedError(
- endpoint_url=e.request.url, request=e.request)
- return (None, better_exception)
- else:
- return (None, e)
- except Exception as e:
- logger.debug("Exception received when sending HTTP request.",
- exc_info=True)
- return (None, e)
- # This returns the http_response and the parsed_data.
- response_dict = convert_to_response_dict(http_response, operation_model)
- parser = self._response_parser_factory.create_parser(
- operation_model.protocol)
- return ((http_response,
- parser.parse(response_dict, operation_model.output_shape)),
- None)
- def _looks_like_dns_error(self, e):
- return 'gaierror' in str(e) and e.request is not None
- def _looks_like_bad_status_line(self, e):
- return 'BadStatusLine' in str(e) and e.request is not None
- def _needs_retry(self, attempts, operation_model, response=None,
- caught_exception=None):
- event_name = 'needs-retry.%s.%s' % (self._endpoint_prefix,
- operation_model.name)
- responses = self._event_emitter.emit(
- event_name, response=response, endpoint=self,
- operation=operation_model, attempts=attempts,
- caught_exception=caught_exception)
- handler_response = first_non_none_response(responses)
- if handler_response is None:
- return False
- else:
- # Request needs to be retried, and we need to sleep
- # for the specified number of times.
- logger.debug("Response received to retry, sleeping for "
- "%s seconds", handler_response)
- time.sleep(handler_response)
- return True
- class EndpointCreator(object):
- def __init__(self, event_emitter):
- self._event_emitter = event_emitter
- def create_endpoint(self, service_model, region_name, endpoint_url,
- verify=None, response_parser_factory=None,
- timeout=DEFAULT_TIMEOUT):
- if not is_valid_endpoint_url(endpoint_url):
- raise ValueError("Invalid endpoint: %s" % endpoint_url)
- return Endpoint(
- endpoint_url,
- endpoint_prefix=service_model.endpoint_prefix,
- event_emitter=self._event_emitter,
- proxies=self._get_proxies(endpoint_url),
- verify=self._get_verify_value(verify),
- timeout=timeout,
- response_parser_factory=response_parser_factory)
- def _get_proxies(self, url):
- # We could also support getting proxies from a config file,
- # but for now proxy support is taken from the environment.
- return get_environ_proxies(url)
- def _get_verify_value(self, verify):
- # This is to account for:
- # https://github.com/kennethreitz/requests/issues/1436
- # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
- # own request objects.
- # First, if verify is not None, then the user explicitly specified
- # a value so this automatically wins.
- if verify is not None:
- return verify
- # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
- # True if the env var does not exist.
- return os.environ.get('REQUESTS_CA_BUNDLE', True)
|