response.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. # Copyright (c) 2012-2013 LiuYC https://github.com/liuyichen/
  2. # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"). You
  5. # may not use this file except in compliance with the License. A copy of
  6. # the License is located at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # or in the "license" file accompanying this file. This file is
  11. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  12. # ANY KIND, either express or implied. See the License for the specific
  13. # language governing permissions and limitations under the License.
  14. import sys
  15. import xml.etree.cElementTree
  16. import logging
  17. from kscore import ScalarTypes
  18. from kscore.hooks import first_non_none_response
  19. from kscore.compat import json, set_socket_timeout, XMLParseError
  20. from kscore.exceptions import IncompleteReadError
  21. from kscore import parsers
  22. logger = logging.getLogger(__name__)
  23. class StreamingBody(object):
  24. """Wrapper class for an http response body.
  25. This provides a few additional conveniences that do not exist
  26. in the urllib3 model:
  27. * Set the timeout on the socket (i.e read() timeouts)
  28. * Auto validation of content length, if the amount of bytes
  29. we read does not match the content length, an exception
  30. is raised.
  31. """
  32. def __init__(self, raw_stream, content_length):
  33. self._raw_stream = raw_stream
  34. self._content_length = content_length
  35. self._amount_read = 0
  36. def set_socket_timeout(self, timeout):
  37. """Set the timeout seconds on the socket."""
  38. # The problem we're trying to solve is to prevent .read() calls from
  39. # hanging. This can happen in rare cases. What we'd like to ideally
  40. # do is set a timeout on the .read() call so that callers can retry
  41. # the request.
  42. # Unfortunately, this isn't currently possible in requests.
  43. # See: https://github.com/kennethreitz/requests/issues/1803
  44. # So what we're going to do is reach into the guts of the stream and
  45. # grab the socket object, which we can set the timeout on. We're
  46. # putting in a check here so in case this interface goes away, we'll
  47. # know.
  48. try:
  49. # To further complicate things, the way to grab the
  50. # underlying socket object from an HTTPResponse is different
  51. # in py2 and py3. So this code has been pushed to kscore.compat.
  52. set_socket_timeout(self._raw_stream, timeout)
  53. except AttributeError:
  54. logger.error("Cannot access the socket object of "
  55. "a streaming response. It's possible "
  56. "the interface has changed.", exc_info=True)
  57. raise
  58. def read(self, amt=None):
  59. """Read at most amt bytes from the stream.
  60. If the amt argument is omitted, read all data.
  61. """
  62. chunk = self._raw_stream.read(amt)
  63. self._amount_read += len(chunk)
  64. if not chunk or amt is None:
  65. # If the server sends empty contents or
  66. # we ask to read all of the contents, then we know
  67. # we need to verify the content length.
  68. self._verify_content_length()
  69. return chunk
  70. def _verify_content_length(self):
  71. # See: https://github.com/kennethreitz/requests/issues/1855
  72. # Basically, our http library doesn't do this for us, so we have
  73. # to do this ourself.
  74. if self._content_length is not None and \
  75. self._amount_read != int(self._content_length):
  76. raise IncompleteReadError(
  77. actual_bytes=self._amount_read,
  78. expected_bytes=int(self._content_length))
  79. def close(self):
  80. """Close the underlying http response stream."""
  81. self._raw_stream.close()
  82. def get_response(operation_model, http_response):
  83. protocol = operation_model.metadata['protocol']
  84. response_dict = {
  85. 'headers': http_response.headers,
  86. 'status_code': http_response.status_code,
  87. }
  88. # TODO: Unfortunately, we have to have error logic here.
  89. # If it looks like an error, in the streaming response case we
  90. # need to actually grab the contents.
  91. if response_dict['status_code'] >= 300:
  92. response_dict['body'] = http_response.content
  93. elif operation_model.has_streaming_output:
  94. response_dict['body'] = StreamingBody(
  95. http_response.raw, response_dict['headers'].get('content-length'))
  96. else:
  97. response_dict['body'] = http_response.content
  98. parser = parsers.create_parser(protocol)
  99. return http_response, parser.parse(response_dict,
  100. operation_model.output_shape)