ksrequest.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. # Copyright (c) 2012-2013 LiuYC https://github.com/liuyichen/
  2. # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"). You
  5. # may not use this file except in compliance with the License. A copy of
  6. # the License is located at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # or in the "license" file accompanying this file. This file is
  11. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  12. # ANY KIND, either express or implied. See the License for the specific
  13. # language governing permissions and limitations under the License.
  14. import sys
  15. import logging
  16. import select
  17. import functools
  18. import socket
  19. import inspect
  20. from kscore.compat import six
  21. from kscore.compat import HTTPHeaders, HTTPResponse, urlunsplit, urlsplit
  22. from kscore.exceptions import UnseekableStreamError
  23. from kscore.utils import percent_encode_sequence
  24. from kscore.vendored.requests import models
  25. from kscore.vendored.requests.sessions import REDIRECT_STATI
  26. from kscore.vendored.requests.packages.urllib3.connection import \
  27. VerifiedHTTPSConnection
  28. from kscore.vendored.requests.packages.urllib3.connection import \
  29. HTTPConnection
  30. from kscore.vendored.requests.packages.urllib3.connectionpool import \
  31. HTTPConnectionPool
  32. from kscore.vendored.requests.packages.urllib3.connectionpool import \
  33. HTTPSConnectionPool
  34. logger = logging.getLogger(__name__)
  35. class KSHTTPResponse(HTTPResponse):
  36. # The *args, **kwargs is used because the args are slightly
  37. # different in py2.6 than in py2.7/py3.
  38. def __init__(self, *args, **kwargs):
  39. self._status_tuple = kwargs.pop('status_tuple')
  40. HTTPResponse.__init__(self, *args, **kwargs)
  41. def _read_status(self):
  42. if self._status_tuple is not None:
  43. status_tuple = self._status_tuple
  44. self._status_tuple = None
  45. return status_tuple
  46. else:
  47. return HTTPResponse._read_status(self)
  48. class KSHTTPConnection(HTTPConnection):
  49. """HTTPConnection that supports Expect 100-continue.
  50. This is conceptually a subclass of httplib.HTTPConnection (though
  51. technically we subclass from urllib3, which subclasses
  52. httplib.HTTPConnection) and we only override this class to support Expect
  53. 100-continue, which we need for S3. As far as I can tell, this is
  54. general purpose enough to not be specific to S3, but I'm being
  55. tentative and keeping it in kscore because I've only tested
  56. this against KSYUN services.
  57. """
  58. def __init__(self, *args, **kwargs):
  59. HTTPConnection.__init__(self, *args, **kwargs)
  60. self._original_response_cls = self.response_class
  61. # We'd ideally hook into httplib's states, but they're all
  62. # __mangled_vars so we use our own state var. This variable is set
  63. # when we receive an early response from the server. If this value is
  64. # set to True, any calls to send() are noops. This value is reset to
  65. # false every time _send_request is called. This is to workaround the
  66. # fact that py2.6 (and only py2.6) has a separate send() call for the
  67. # body in _send_request, as opposed to endheaders(), which is where the
  68. # body is sent in all versions > 2.6.
  69. self._response_received = False
  70. self._expect_header_set = False
  71. def close(self):
  72. HTTPConnection.close(self)
  73. # Reset all of our instance state we were tracking.
  74. self._response_received = False
  75. self._expect_header_set = False
  76. self.response_class = self._original_response_cls
  77. def _tunnel(self):
  78. # Works around a bug in py26 which is fixed in later versions of
  79. # python. Bug involves hitting an infinite loop if readline() returns
  80. # nothing as opposed to just ``\r\n``.
  81. # As much as I don't like having if py2: <foo> code blocks, this seems
  82. # the cleanest way to handle this workaround. Fortunately, the
  83. # difference from py26 to py3 is very minimal. We're essentially
  84. # just overriding the while loop.
  85. if sys.version_info[:2] != (2, 6):
  86. return HTTPConnection._tunnel(self)
  87. # Otherwise we workaround the issue.
  88. self._set_hostport(self._tunnel_host, self._tunnel_port)
  89. self.send("CONNECT %s:%d HTTP/1.0\r\n" % (self.host, self.port))
  90. for header, value in self._tunnel_headers.iteritems():
  91. self.send("%s: %s\r\n" % (header, value))
  92. self.send("\r\n")
  93. response = self.response_class(self.sock, strict=self.strict,
  94. method=self._method)
  95. (version, code, message) = response._read_status()
  96. if code != 200:
  97. self.close()
  98. raise socket.error("Tunnel connection failed: %d %s" %
  99. (code, message.strip()))
  100. while True:
  101. line = response.fp.readline()
  102. if not line:
  103. break
  104. if line in (b'\r\n', b'\n', b''):
  105. break
  106. def _send_request(self, method, url, body, headers, *py36_up_extra):
  107. self._response_received = False
  108. if headers.get('Expect', b'') == b'100-continue':
  109. self._expect_header_set = True
  110. else:
  111. self._expect_header_set = False
  112. self.response_class = self._original_response_cls
  113. rval = HTTPConnection._send_request(
  114. self, method, url, body, headers, *py36_up_extra)
  115. self._expect_header_set = False
  116. return rval
  117. def _convert_to_bytes(self, mixed_buffer):
  118. # Take a list of mixed str/bytes and convert it
  119. # all into a single bytestring.
  120. # Any six.text_types will be encoded as utf-8.
  121. bytes_buffer = []
  122. for chunk in mixed_buffer:
  123. if isinstance(chunk, six.text_type):
  124. bytes_buffer.append(chunk.encode('utf-8'))
  125. else:
  126. bytes_buffer.append(chunk)
  127. msg = b"\r\n".join(bytes_buffer)
  128. return msg
  129. def _send_output(self, message_body=None, **py36_up_extra):
  130. self._buffer.extend((b"", b""))
  131. msg = self._convert_to_bytes(self._buffer)
  132. del self._buffer[:]
  133. # If msg and message_body are sent in a single send() call,
  134. # it will avoid performance problems caused by the interaction
  135. # between delayed ack and the Nagle algorithm.
  136. if isinstance(message_body, bytes):
  137. msg += message_body
  138. message_body = None
  139. self.send(msg)
  140. if self._expect_header_set:
  141. # This is our custom behavior. If the Expect header was
  142. # set, it will trigger this custom behavior.
  143. logger.debug("Waiting for 100 Continue response.")
  144. # Wait for 1 second for the server to send a response.
  145. read, write, exc = select.select([self.sock], [], [self.sock], 1)
  146. if read:
  147. self._handle_expect_response(message_body)
  148. return
  149. else:
  150. # From the RFC:
  151. # Because of the presence of older implementations, the
  152. # protocol allows ambiguous situations in which a client may
  153. # send "Expect: 100-continue" without receiving either a 417
  154. # (Expectation Failed) status or a 100 (Continue) status.
  155. # Therefore, when a client sends this header field to an origin
  156. # server (possibly via a proxy) from which it has never seen a
  157. # 100 (Continue) status, the client SHOULD NOT wait for an
  158. # indefinite period before sending the request body.
  159. logger.debug("No response seen from server, continuing to "
  160. "send the response body.")
  161. if message_body is not None:
  162. # message_body was not a string (i.e. it is a file), and
  163. # we must run the risk of Nagle.
  164. self.send(message_body)
  165. def _consume_headers(self, fp):
  166. # Most servers (including S3) will just return
  167. # the CLRF after the 100 continue response. However,
  168. # some servers (I've specifically seen this for squid when
  169. # used as a straight HTTP proxy) will also inject a
  170. # Connection: keep-alive header. To account for this
  171. # we'll read until we read '\r\n', and ignore any headers
  172. # that come immediately after the 100 continue response.
  173. current = None
  174. while current != b'\r\n':
  175. current = fp.readline()
  176. def _handle_expect_response(self, message_body):
  177. # This is called when we sent the request headers containing
  178. # an Expect: 100-continue header and received a response.
  179. # We now need to figure out what to do.
  180. fp = self.sock.makefile('rb', 0)
  181. try:
  182. maybe_status_line = fp.readline()
  183. parts = maybe_status_line.split(None, 2)
  184. if self._is_100_continue_status(maybe_status_line):
  185. self._consume_headers(fp)
  186. logger.debug("100 Continue response seen, "
  187. "now sending request body.")
  188. self._send_message_body(message_body)
  189. elif len(parts) == 3 and parts[0].startswith(b'HTTP/'):
  190. # From the RFC:
  191. # Requirements for HTTP/1.1 origin servers:
  192. #
  193. # - Upon receiving a request which includes an Expect
  194. # request-header field with the "100-continue"
  195. # expectation, an origin server MUST either respond with
  196. # 100 (Continue) status and continue to read from the
  197. # input stream, or respond with a final status code.
  198. #
  199. # So if we don't get a 100 Continue response, then
  200. # whatever the server has sent back is the final response
  201. # and don't send the message_body.
  202. logger.debug("Received a non 100 Continue response "
  203. "from the server, NOT sending request body.")
  204. status_tuple = (parts[0].decode('ascii'),
  205. int(parts[1]), parts[2].decode('ascii'))
  206. response_class = functools.partial(
  207. KSHTTPResponse, status_tuple=status_tuple)
  208. self.response_class = response_class
  209. self._response_received = True
  210. finally:
  211. fp.close()
  212. def _send_message_body(self, message_body):
  213. if message_body is not None:
  214. self.send(message_body)
  215. def send(self, str):
  216. if self._response_received:
  217. logger.debug("send() called, but reseponse already received. "
  218. "Not sending data.")
  219. return
  220. return HTTPConnection.send(self, str)
  221. def _is_100_continue_status(self, maybe_status_line):
  222. parts = maybe_status_line.split(None, 2)
  223. # Check for HTTP/<version> 100 Continue\r\n
  224. return (
  225. len(parts) >= 3 and parts[0].startswith(b'HTTP/') and
  226. parts[1] == b'100')
  227. class KSHTTPSConnection(VerifiedHTTPSConnection):
  228. pass
  229. # Now we need to set the methods we overrode from KSHTTPConnection
  230. # onto KSHTTPSConnection. This is just a shortcut to avoid
  231. # copy/pasting the same code into KSHTTPSConnection.
  232. for name, function in KSHTTPConnection.__dict__.items():
  233. if inspect.isfunction(function):
  234. setattr(KSHTTPSConnection, name, function)
  235. def prepare_request_dict(request_dict, endpoint_url, user_agent=None):
  236. """
  237. This method prepares a request dict to be created into an
  238. KSRequestObject. This prepares the request dict by adding the
  239. url and the user agent to the request dict.
  240. :type request_dict: dict
  241. :param request_dict: The request dict (created from the
  242. ``serialize`` module).
  243. :type user_agent: string
  244. :param user_agent: The user agent to use for this request.
  245. :type endpoint_url: string
  246. :param endpoint_url: The full endpoint url, which contains at least
  247. the scheme, the hostname, and optionally any path components.
  248. """
  249. r = request_dict
  250. if user_agent is not None:
  251. headers = r['headers']
  252. headers['User-Agent'] = user_agent
  253. url = _urljoin(endpoint_url, r['url_path'])
  254. if r['query_string']:
  255. encoded_query_string = percent_encode_sequence(r['query_string'])
  256. if '?' not in url:
  257. url += '?%s' % encoded_query_string
  258. else:
  259. url += '&%s' % encoded_query_string
  260. r['url'] = url
  261. def create_request_object(request_dict):
  262. """
  263. This method takes a request dict and creates an KSRequest object
  264. from it.
  265. :type request_dict: dict
  266. :param request_dict: The request dict (created from the
  267. ``prepare_request_dict`` method).
  268. :rtype: ``kscore.ksrequest.KSRequest``
  269. :return: An KSRequest object based on the request_dict.
  270. """
  271. r = request_dict
  272. return KSRequest(method=r['method'], url=r['url'],
  273. data=r['body'],
  274. headers=r['headers'])
  275. def _urljoin(endpoint_url, url_path):
  276. p = urlsplit(endpoint_url)
  277. # <part> - <index>
  278. # scheme - p[0]
  279. # netloc - p[1]
  280. # path - p[2]
  281. # query - p[3]
  282. # fragment - p[4]
  283. if not url_path or url_path == '/':
  284. # If there's no path component, ensure the URL ends with
  285. # a '/' for backwards compatibility.
  286. if not p[2]:
  287. return endpoint_url + '/'
  288. return endpoint_url
  289. if p[2].endswith('/') and url_path.startswith('/'):
  290. new_path = p[2][:-1] + url_path
  291. else:
  292. new_path = p[2] + url_path
  293. reconstructed = urlunsplit((p[0], p[1], new_path, p[3], p[4]))
  294. return reconstructed
  295. class KSRequest(models.RequestEncodingMixin, models.Request):
  296. def __init__(self, *args, **kwargs):
  297. self.auth_path = None
  298. if 'auth_path' in kwargs:
  299. self.auth_path = kwargs['auth_path']
  300. del kwargs['auth_path']
  301. models.Request.__init__(self, *args, **kwargs)
  302. headers = HTTPHeaders()
  303. if self.headers is not None:
  304. for key, value in self.headers.items():
  305. headers[key] = value
  306. self.headers = headers
  307. # This is a dictionary to hold information that is used when
  308. # processing the request. What is inside of ``context`` is open-ended.
  309. # For example, it may have a timestamp key that is used for holding
  310. # what the timestamp is when signing the request. Note that none
  311. # of the information that is inside of ``context`` is directly
  312. # sent over the wire; the information is only used to assist in
  313. # creating what is sent over the wire.
  314. self.context = {}
  315. def prepare(self):
  316. """Constructs a :class:`KSPreparedRequest <KSPreparedRequest>`."""
  317. # Eventually I think it would be nice to add hooks into this process.
  318. p = KSPreparedRequest(self)
  319. p.prepare_method(self.method)
  320. p.prepare_url(self.url, self.params)
  321. p.prepare_headers(self.headers)
  322. p.prepare_cookies(self.cookies)
  323. p.prepare_body(self.data, self.files)
  324. p.prepare_auth(self.auth)
  325. return p
  326. @property
  327. def body(self):
  328. p = models.PreparedRequest()
  329. p.prepare_headers({})
  330. p.prepare_body(self.data, self.files)
  331. if isinstance(p.body, six.text_type):
  332. p.body = p.body.encode('utf-8')
  333. return p.body
  334. class KSPreparedRequest(models.PreparedRequest):
  335. """Represents a prepared request.
  336. :ivar method: HTTP Method
  337. :ivar url: The full url
  338. :ivar headers: The HTTP headers to send.
  339. :ivar body: The HTTP body.
  340. :ivar hooks: The set of callback hooks.
  341. In addition to the above attributes, the following attributes are
  342. available:
  343. :ivar query_params: The original query parameters.
  344. :ivar post_param: The original POST params (dict).
  345. """
  346. def __init__(self, original_request):
  347. self.original = original_request
  348. super(KSPreparedRequest, self).__init__()
  349. self.hooks.setdefault('response', []).append(
  350. self.reset_stream_on_redirect)
  351. def reset_stream_on_redirect(self, response, **kwargs):
  352. if response.status_code in REDIRECT_STATI and \
  353. self._looks_like_file(self.body):
  354. logger.debug("Redirect received, rewinding stream: %s", self.body)
  355. self.reset_stream()
  356. def _looks_like_file(self, body):
  357. return hasattr(body, 'read') and hasattr(body, 'seek')
  358. def reset_stream(self):
  359. # Trying to reset a stream when there is a no stream will
  360. # just immediately return. It's not an error, it will produce
  361. # the same result as if we had actually reset the stream (we'll send
  362. # the entire body contents again if we need to).
  363. # Same case if the body is a string/bytes type.
  364. if self.body is None or isinstance(self.body, six.text_type) or \
  365. isinstance(self.body, six.binary_type):
  366. return
  367. try:
  368. logger.debug("Rewinding stream: %s", self.body)
  369. self.body.seek(0)
  370. except Exception as e:
  371. logger.debug("Unable to rewind stream: %s", e)
  372. raise UnseekableStreamError(stream_object=self.body)
  373. def prepare_body(self, data, files, json=None):
  374. """Prepares the given HTTP body data."""
  375. super(KSPreparedRequest, self).prepare_body(data, files, json)
  376. # Calculate the Content-Length by trying to seek the file as
  377. # requests cannot determine content length for some seekable file-like
  378. # objects.
  379. if 'Content-Length' not in self.headers:
  380. if hasattr(data, 'seek') and hasattr(data, 'tell'):
  381. orig_pos = data.tell()
  382. data.seek(0, 2)
  383. end_file_pos = data.tell()
  384. self.headers['Content-Length'] = str(end_file_pos - orig_pos)
  385. data.seek(orig_pos)
  386. # If the Content-Length was added this way, a
  387. # Transfer-Encoding was added by requests because it did
  388. # not add a Content-Length header. However, the
  389. # Transfer-Encoding header is not supported for
  390. # KSYUN Services so remove it if it is added.
  391. if 'Transfer-Encoding' in self.headers:
  392. self.headers.pop('Transfer-Encoding')
  393. HTTPSConnectionPool.ConnectionCls = KSHTTPSConnection
  394. HTTPConnectionPool.ConnectionCls = KSHTTPConnection