2
0

endpoint.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. # Copyright (c) 2012-2013 LiuYC https://github.com/liuyichen/
  2. # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"). You
  5. # may not use this file except in compliance with the License. A copy of
  6. # the License is located at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # or in the "license" file accompanying this file. This file is
  11. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  12. # ANY KIND, either express or implied. See the License for the specific
  13. # language governing permissions and limitations under the License.
  14. import os
  15. import logging
  16. import time
  17. import threading
  18. from kscore.vendored.requests.sessions import Session
  19. from kscore.vendored.requests.utils import get_environ_proxies
  20. from kscore.vendored.requests.exceptions import ConnectionError
  21. from kscore.vendored import six
  22. from kscore.ksrequest import create_request_object
  23. from kscore.exceptions import UnknownEndpointError
  24. from kscore.exceptions import EndpointConnectionError
  25. from kscore.exceptions import ConnectionClosedError
  26. from kscore.compat import filter_ssl_warnings
  27. from kscore.utils import is_valid_endpoint_url
  28. from kscore.hooks import first_non_none_response
  29. from kscore.response import StreamingBody
  30. from kscore import parsers
  31. logger = logging.getLogger(__name__)
  32. DEFAULT_TIMEOUT = 60
  33. filter_ssl_warnings()
  34. try:
  35. from kscore.vendored.requests.packages.urllib3.contrib import pyopenssl
  36. pyopenssl.extract_from_urllib3()
  37. except ImportError:
  38. pass
  39. def convert_to_response_dict(http_response, operation_model):
  40. """Convert an HTTP response object to a request dict.
  41. This converts the requests library's HTTP response object to
  42. a dictionary.
  43. :type http_response: kscore.vendored.requests.model.Response
  44. :param http_response: The HTTP response from an KSYUN service request.
  45. :rtype: dict
  46. :return: A response dictionary which will contain the following keys:
  47. * headers (dict)
  48. * status_code (int)
  49. * body (string or file-like object)
  50. """
  51. response_dict = {
  52. 'headers': http_response.headers,
  53. 'status_code': http_response.status_code,
  54. }
  55. if response_dict['status_code'] >= 300:
  56. response_dict['body'] = http_response.content
  57. elif operation_model.has_streaming_output:
  58. response_dict['body'] = StreamingBody(
  59. http_response.raw, response_dict['headers'].get('content-length'))
  60. else:
  61. response_dict['body'] = http_response.content
  62. return response_dict
  63. class PreserveAuthSession(Session):
  64. def rebuild_auth(self, prepared_request, response):
  65. pass
  66. class Endpoint(object):
  67. """
  68. Represents an endpoint for a particular service in a specific
  69. region. Only an endpoint can make requests.
  70. :ivar service: The Service object that describes this endpoints
  71. service.
  72. :ivar host: The fully qualified endpoint hostname.
  73. :ivar session: The session object.
  74. """
  75. def __init__(self, host, endpoint_prefix,
  76. event_emitter, proxies=None, verify=True,
  77. timeout=DEFAULT_TIMEOUT, response_parser_factory=None):
  78. self._endpoint_prefix = endpoint_prefix
  79. self._event_emitter = event_emitter
  80. self.host = host
  81. self.verify = verify
  82. if proxies is None:
  83. proxies = {}
  84. self.proxies = proxies
  85. self.http_session = PreserveAuthSession()
  86. self.timeout = timeout
  87. logger.debug('Setting %s timeout as %s', endpoint_prefix, self.timeout)
  88. self._lock = threading.Lock()
  89. if response_parser_factory is None:
  90. response_parser_factory = parsers.ResponseParserFactory()
  91. self._response_parser_factory = response_parser_factory
  92. def __repr__(self):
  93. return '%s(%s)' % (self._endpoint_prefix, self.host)
  94. def make_request(self, operation_model, request_dict):
  95. logger.debug("Making request for %s (verify_ssl=%s) with params: %s",
  96. operation_model, self.verify, request_dict)
  97. return self._send_request(request_dict, operation_model)
  98. def create_request(self, params, operation_model=None):
  99. request = create_request_object(params)
  100. if operation_model:
  101. event_name = 'request-created.{endpoint_prefix}.{op_name}'.format(
  102. endpoint_prefix=self._endpoint_prefix,
  103. op_name=operation_model.name)
  104. self._event_emitter.emit(event_name, request=request,
  105. operation_name=operation_model.name)
  106. prepared_request = self.prepare_request(request)
  107. return prepared_request
  108. def _encode_headers(self, headers):
  109. # In place encoding of headers to utf-8 if they are unicode.
  110. for key, value in headers.items():
  111. if isinstance(value, six.text_type):
  112. headers[key] = value.encode('utf-8')
  113. def prepare_request(self, request):
  114. self._encode_headers(request.headers)
  115. return request.prepare()
  116. def _send_request(self, request_dict, operation_model):
  117. attempts = 1
  118. request = self.create_request(request_dict, operation_model)
  119. success_response, exception = self._get_response(
  120. request, operation_model, attempts)
  121. while self._needs_retry(attempts, operation_model,
  122. success_response, exception):
  123. attempts += 1
  124. # If there is a stream associated with the request, we need
  125. # to reset it before attempting to send the request again.
  126. # This will ensure that we resend the entire contents of the
  127. # body.
  128. request.reset_stream()
  129. # Create a new request when retried (including a new signature).
  130. request = self.create_request(
  131. request_dict, operation_model=operation_model)
  132. success_response, exception = self._get_response(
  133. request, operation_model, attempts)
  134. if exception is not None:
  135. raise exception
  136. else:
  137. return success_response
  138. def _get_response(self, request, operation_model, attempts):
  139. # This will return a tuple of (success_response, exception)
  140. # and success_response is itself a tuple of
  141. # (http_response, parsed_dict).
  142. # If an exception occurs then the success_response is None.
  143. # If no exception occurs then exception is None.
  144. try:
  145. logger.debug("Sending http request: %s", request)
  146. http_response = self.http_session.send(
  147. request, verify=self.verify,
  148. stream=operation_model.has_streaming_output,
  149. proxies=self.proxies, timeout=self.timeout)
  150. except ConnectionError as e:
  151. # For a connection error, if it looks like it's a DNS
  152. # lookup issue, 99% of the time this is due to a misconfigured
  153. # region/endpoint so we'll raise a more specific error message
  154. # to help users.
  155. logger.debug("ConnectionError received when sending HTTP request.",
  156. exc_info=True)
  157. if self._looks_like_dns_error(e):
  158. endpoint_url = e.request.url
  159. better_exception = EndpointConnectionError(
  160. endpoint_url=endpoint_url, error=e)
  161. return (None, better_exception)
  162. elif self._looks_like_bad_status_line(e):
  163. better_exception = ConnectionClosedError(
  164. endpoint_url=e.request.url, request=e.request)
  165. return (None, better_exception)
  166. else:
  167. return (None, e)
  168. except Exception as e:
  169. logger.debug("Exception received when sending HTTP request.",
  170. exc_info=True)
  171. return (None, e)
  172. # This returns the http_response and the parsed_data.
  173. response_dict = convert_to_response_dict(http_response, operation_model)
  174. parser = self._response_parser_factory.create_parser(
  175. operation_model.protocol)
  176. return ((http_response,
  177. parser.parse(response_dict, operation_model.output_shape)),
  178. None)
  179. def _looks_like_dns_error(self, e):
  180. return 'gaierror' in str(e) and e.request is not None
  181. def _looks_like_bad_status_line(self, e):
  182. return 'BadStatusLine' in str(e) and e.request is not None
  183. def _needs_retry(self, attempts, operation_model, response=None,
  184. caught_exception=None):
  185. event_name = 'needs-retry.%s.%s' % (self._endpoint_prefix,
  186. operation_model.name)
  187. responses = self._event_emitter.emit(
  188. event_name, response=response, endpoint=self,
  189. operation=operation_model, attempts=attempts,
  190. caught_exception=caught_exception)
  191. handler_response = first_non_none_response(responses)
  192. if handler_response is None:
  193. return False
  194. else:
  195. # Request needs to be retried, and we need to sleep
  196. # for the specified number of times.
  197. logger.debug("Response received to retry, sleeping for "
  198. "%s seconds", handler_response)
  199. time.sleep(handler_response)
  200. return True
  201. class EndpointCreator(object):
  202. def __init__(self, event_emitter):
  203. self._event_emitter = event_emitter
  204. def create_endpoint(self, service_model, region_name, endpoint_url,
  205. verify=None, response_parser_factory=None,
  206. timeout=DEFAULT_TIMEOUT):
  207. if not is_valid_endpoint_url(endpoint_url):
  208. raise ValueError("Invalid endpoint: %s" % endpoint_url)
  209. return Endpoint(
  210. endpoint_url,
  211. endpoint_prefix=service_model.endpoint_prefix,
  212. event_emitter=self._event_emitter,
  213. proxies=self._get_proxies(endpoint_url),
  214. verify=self._get_verify_value(verify),
  215. timeout=timeout,
  216. response_parser_factory=response_parser_factory)
  217. def _get_proxies(self, url):
  218. # We could also support getting proxies from a config file,
  219. # but for now proxy support is taken from the environment.
  220. return get_environ_proxies(url)
  221. def _get_verify_value(self, verify):
  222. # This is to account for:
  223. # https://github.com/kennethreitz/requests/issues/1436
  224. # where we need to honor REQUESTS_CA_BUNDLE because we're creating our
  225. # own request objects.
  226. # First, if verify is not None, then the user explicitly specified
  227. # a value so this automatically wins.
  228. if verify is not None:
  229. return verify
  230. # Otherwise use the value from REQUESTS_CA_BUNDLE, or default to
  231. # True if the env var does not exist.
  232. return os.environ.get('REQUESTS_CA_BUNDLE', True)