paginate.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507
  1. # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from itertools import tee
  14. from six import string_types
  15. import jmespath
  16. import json
  17. import base64
  18. import logging
  19. from kscore.exceptions import PaginationError
  20. from kscore.compat import zip
  21. from kscore.utils import set_value_from_jmespath, merge_dicts
  22. log = logging.getLogger(__name__)
  23. class PaginatorModel(object):
  24. def __init__(self, paginator_config):
  25. self._paginator_config = paginator_config['pagination']
  26. def get_paginator(self, operation_name):
  27. try:
  28. single_paginator_config = self._paginator_config[operation_name]
  29. except KeyError:
  30. raise ValueError("Paginator for operation does not exist: %s"
  31. % operation_name)
  32. return single_paginator_config
  33. class PageIterator(object):
  34. def __init__(self, method, input_token, output_token, more_results,
  35. result_keys, non_aggregate_keys, limit_key, max_items,
  36. starting_token, page_size, op_kwargs):
  37. self._method = method
  38. self._op_kwargs = op_kwargs
  39. self._input_token = input_token
  40. self._output_token = output_token
  41. self._more_results = more_results
  42. self._result_keys = result_keys
  43. self._max_items = max_items
  44. self._limit_key = limit_key
  45. self._starting_token = starting_token
  46. self._page_size = page_size
  47. self._op_kwargs = op_kwargs
  48. self._resume_token = None
  49. self._non_aggregate_key_exprs = non_aggregate_keys
  50. self._non_aggregate_part = {}
  51. @property
  52. def result_keys(self):
  53. return self._result_keys
  54. @property
  55. def resume_token(self):
  56. """Token to specify to resume pagination."""
  57. return self._resume_token
  58. @resume_token.setter
  59. def resume_token(self, value):
  60. if not isinstance(value, dict):
  61. raise ValueError("Bad starting token: %s" % value)
  62. if 'ksc_truncate_amount' in value:
  63. token_keys = sorted(self._input_token + ['ksc_truncate_amount'])
  64. else:
  65. token_keys = sorted(self._input_token)
  66. dict_keys = sorted(value.keys())
  67. if token_keys == dict_keys:
  68. self._resume_token = base64.b64encode(
  69. json.dumps(value).encode('utf-8')).decode('utf-8')
  70. else:
  71. raise ValueError("Bad starting token: %s" % value)
  72. @property
  73. def non_aggregate_part(self):
  74. return self._non_aggregate_part
  75. def __iter__(self):
  76. current_kwargs = self._op_kwargs
  77. previous_next_token = None
  78. next_token = dict((key, None) for key in self._input_token)
  79. # The number of items from result_key we've seen so far.
  80. total_items = 0
  81. first_request = True
  82. primary_result_key = self.result_keys[0]
  83. starting_truncation = 0
  84. self._inject_starting_params(current_kwargs)
  85. while True:
  86. response = self._make_request(current_kwargs)
  87. parsed = self._extract_parsed_response(response)
  88. if first_request:
  89. # The first request is handled differently. We could
  90. # possibly have a resume/starting token that tells us where
  91. # to index into the retrieved page.
  92. if self._starting_token is not None:
  93. starting_truncation = self._handle_first_request(
  94. parsed, primary_result_key, starting_truncation)
  95. first_request = False
  96. self._record_non_aggregate_key_values(parsed)
  97. current_response = primary_result_key.search(parsed)
  98. if current_response is None:
  99. current_response = []
  100. num_current_response = len(current_response)
  101. truncate_amount = 0
  102. if self._max_items is not None:
  103. truncate_amount = (total_items + num_current_response) \
  104. - self._max_items
  105. if truncate_amount > 0:
  106. self._truncate_response(parsed, primary_result_key,
  107. truncate_amount, starting_truncation,
  108. next_token)
  109. yield response
  110. break
  111. else:
  112. yield response
  113. total_items += num_current_response
  114. next_token = self._get_next_token(parsed)
  115. if all(t is None for t in next_token.values()):
  116. break
  117. if self._max_items is not None and \
  118. total_items == self._max_items:
  119. # We're on a page boundary so we can set the current
  120. # next token to be the resume token.
  121. self.resume_token = next_token
  122. break
  123. if previous_next_token is not None and \
  124. previous_next_token == next_token:
  125. message = ("The same next token was received "
  126. "twice: %s" % next_token)
  127. raise PaginationError(message=message)
  128. self._inject_token_into_kwargs(current_kwargs, next_token)
  129. previous_next_token = next_token
  130. def search(self, expression):
  131. """Applies a JMESPath expression to a paginator
  132. Each page of results is searched using the provided JMESPath
  133. expression. If the result is not a list, it is yielded
  134. directly. If the result is a list, each element in the result
  135. is yielded individually (essentially implementing a flatmap in
  136. which the JMESPath search is the mapping function).
  137. :type expression: str
  138. :param expression: JMESPath expression to apply to each page.
  139. :return: Returns an iterator that yields the individual
  140. elements of applying a JMESPath expression to each page of
  141. results.
  142. """
  143. compiled = jmespath.compile(expression)
  144. for page in self:
  145. results = compiled.search(page)
  146. if isinstance(results, list):
  147. for element in results:
  148. yield element
  149. else:
  150. # Yield result directly if it is not a list.
  151. yield results
  152. def _make_request(self, current_kwargs):
  153. return self._method(**current_kwargs)
  154. def _extract_parsed_response(self, response):
  155. return response
  156. def _record_non_aggregate_key_values(self, response):
  157. non_aggregate_keys = {}
  158. for expression in self._non_aggregate_key_exprs:
  159. result = expression.search(response)
  160. set_value_from_jmespath(non_aggregate_keys,
  161. expression.expression,
  162. result)
  163. self._non_aggregate_part = non_aggregate_keys
  164. def _inject_starting_params(self, op_kwargs):
  165. # If the user has specified a starting token we need to
  166. # inject that into the operation's kwargs.
  167. if self._starting_token is not None:
  168. # Don't need to do anything special if there is no starting
  169. # token specified.
  170. next_token = self._parse_starting_token()[0]
  171. self._inject_token_into_kwargs(op_kwargs, next_token)
  172. if self._page_size is not None:
  173. # Pass the page size as the parameter name for limiting
  174. # page size, also known as the limit_key.
  175. op_kwargs[self._limit_key] = self._page_size
  176. def _inject_token_into_kwargs(self, op_kwargs, next_token):
  177. for name, token in next_token.items():
  178. if token is None or token == 'None':
  179. continue
  180. op_kwargs[name] = token
  181. def _handle_first_request(self, parsed, primary_result_key,
  182. starting_truncation):
  183. # If the payload is an array or string, we need to slice into it
  184. # and only return the truncated amount.
  185. starting_truncation = self._parse_starting_token()[1]
  186. all_data = primary_result_key.search(parsed)
  187. if isinstance(all_data, (list, string_types)):
  188. data = all_data[starting_truncation:]
  189. else:
  190. data = None
  191. set_value_from_jmespath(
  192. parsed,
  193. primary_result_key.expression,
  194. data
  195. )
  196. # We also need to truncate any secondary result keys
  197. # because they were not truncated in the previous last
  198. # response.
  199. for token in self.result_keys:
  200. if token == primary_result_key:
  201. continue
  202. sample = token.search(parsed)
  203. if isinstance(sample, list):
  204. empty_value = []
  205. elif isinstance(sample, string_types):
  206. empty_value = ''
  207. elif isinstance(sample, (int, float)):
  208. empty_value = 0
  209. else:
  210. empty_value = None
  211. set_value_from_jmespath(parsed, token.expression, empty_value)
  212. return starting_truncation
  213. def _truncate_response(self, parsed, primary_result_key, truncate_amount,
  214. starting_truncation, next_token):
  215. original = primary_result_key.search(parsed)
  216. if original is None:
  217. original = []
  218. amount_to_keep = len(original) - truncate_amount
  219. truncated = original[:amount_to_keep]
  220. set_value_from_jmespath(
  221. parsed,
  222. primary_result_key.expression,
  223. truncated
  224. )
  225. # The issue here is that even though we know how much we've truncated
  226. # we need to account for this globally including any starting
  227. # left truncation. For example:
  228. # Raw response: [0,1,2,3]
  229. # Starting index: 1
  230. # Max items: 1
  231. # Starting left truncation: [1, 2, 3]
  232. # End right truncation for max items: [1]
  233. # However, even though we only kept 1, this is post
  234. # left truncation so the next starting index should be 2, not 1
  235. # (left_truncation + amount_to_keep).
  236. next_token['ksc_truncate_amount'] = \
  237. amount_to_keep + starting_truncation
  238. self.resume_token = next_token
  239. def _get_next_token(self, parsed):
  240. if self._more_results is not None:
  241. if not self._more_results.search(parsed):
  242. return {}
  243. next_tokens = {}
  244. for output_token, input_key in \
  245. zip(self._output_token, self._input_token):
  246. next_token = output_token.search(parsed)
  247. # We do not want to include any empty strings as actual tokens.
  248. # Treat them as None.
  249. if next_token:
  250. next_tokens[input_key] = next_token
  251. else:
  252. next_tokens[input_key] = None
  253. return next_tokens
  254. def result_key_iters(self):
  255. teed_results = tee(self, len(self.result_keys))
  256. return [ResultKeyIterator(i, result_key) for i, result_key
  257. in zip(teed_results, self.result_keys)]
  258. def build_full_result(self):
  259. complete_result = {}
  260. for response in self:
  261. page = response
  262. # We want to try to catch operation object pagination
  263. # and format correctly for those. They come in the form
  264. # of a tuple of two elements: (http_response, parsed_responsed).
  265. # We want the parsed_response as that is what the page iterator
  266. # uses. We can remove it though once operation objects are removed.
  267. if isinstance(response, tuple) and len(response) == 2:
  268. page = response[1]
  269. # We're incrementally building the full response page
  270. # by page. For each page in the response we need to
  271. # inject the necessary components from the page
  272. # into the complete_result.
  273. for result_expression in self.result_keys:
  274. # In order to incrementally update a result key
  275. # we need to search the existing value from complete_result,
  276. # then we need to search the _current_ page for the
  277. # current result key value. Then we append the current
  278. # value onto the existing value, and re-set that value
  279. # as the new value.
  280. result_value = result_expression.search(page)
  281. if result_value is None:
  282. continue
  283. existing_value = result_expression.search(complete_result)
  284. if existing_value is None:
  285. # Set the initial result
  286. set_value_from_jmespath(
  287. complete_result, result_expression.expression,
  288. result_value)
  289. continue
  290. # Now both result_value and existing_value contain something
  291. if isinstance(result_value, list):
  292. existing_value.extend(result_value)
  293. elif isinstance(result_value, (int, float, string_types)):
  294. # Modify the existing result with the sum or concatenation
  295. set_value_from_jmespath(
  296. complete_result, result_expression.expression,
  297. existing_value + result_value)
  298. merge_dicts(complete_result, self.non_aggregate_part)
  299. if self.resume_token is not None:
  300. complete_result['NextToken'] = self.resume_token
  301. return complete_result
  302. def _parse_starting_token(self):
  303. if self._starting_token is None:
  304. return None
  305. # The starting token is a dict passed as a base64 encoded string.
  306. next_token = self._starting_token
  307. try:
  308. next_token = json.loads(
  309. base64.b64decode(next_token).decode('utf-8'))
  310. index = 0
  311. if 'ksc_truncate_amount' in next_token:
  312. index = next_token.get('ksc_truncate_amount')
  313. del next_token['ksc_truncate_amount']
  314. except (ValueError, TypeError):
  315. next_token, index = self._parse_starting_token_deprecated()
  316. return next_token, index
  317. def _parse_starting_token_deprecated(self):
  318. """
  319. This handles parsing of old style starting tokens, and attempts to
  320. coerce them into the new style.
  321. """
  322. log.debug("Attempting to fall back to old starting token parser. For "
  323. "token: %s" % self._starting_token)
  324. if self._starting_token is None:
  325. return None
  326. parts = self._starting_token.split('___')
  327. next_token = []
  328. index = 0
  329. if len(parts) == len(self._input_token) + 1:
  330. try:
  331. index = int(parts.pop())
  332. except ValueError:
  333. raise ValueError("Bad starting token: %s" %
  334. self._starting_token)
  335. for part in parts:
  336. if part == 'None':
  337. next_token.append(None)
  338. else:
  339. next_token.append(part)
  340. return self._convert_deprecated_starting_token(next_token), index
  341. def _convert_deprecated_starting_token(self, deprecated_token):
  342. """
  343. This attempts to convert a deprecated starting token into the new
  344. style.
  345. """
  346. len_deprecated_token = len(deprecated_token)
  347. len_input_token = len(self._input_token)
  348. if len_deprecated_token > len_input_token:
  349. raise ValueError("Bad starting token: %s" % self._starting_token)
  350. elif len_deprecated_token < len_input_token:
  351. log.debug("Old format starting token does not contain all input "
  352. "tokens. Setting the rest, in order, as None.")
  353. for i in range(len_input_token - len_deprecated_token):
  354. deprecated_token.append(None)
  355. return dict(zip(self._input_token, deprecated_token))
  356. class Paginator(object):
  357. PAGE_ITERATOR_CLS = PageIterator
  358. def __init__(self, method, pagination_config):
  359. self._method = method
  360. self._pagination_cfg = pagination_config
  361. self._output_token = self._get_output_tokens(self._pagination_cfg)
  362. self._input_token = self._get_input_tokens(self._pagination_cfg)
  363. self._more_results = self._get_more_results_token(self._pagination_cfg)
  364. self._non_aggregate_keys = self._get_non_aggregate_keys(
  365. self._pagination_cfg)
  366. self._result_keys = self._get_result_keys(self._pagination_cfg)
  367. self._limit_key = self._get_limit_key(self._pagination_cfg)
  368. @property
  369. def result_keys(self):
  370. return self._result_keys
  371. def _get_non_aggregate_keys(self, config):
  372. keys = []
  373. for key in config.get('non_aggregate_keys', []):
  374. keys.append(jmespath.compile(key))
  375. return keys
  376. def _get_output_tokens(self, config):
  377. output = []
  378. output_token = config['output_token']
  379. if not isinstance(output_token, list):
  380. output_token = [output_token]
  381. for config in output_token:
  382. output.append(jmespath.compile(config))
  383. return output
  384. def _get_input_tokens(self, config):
  385. input_token = self._pagination_cfg['input_token']
  386. if not isinstance(input_token, list):
  387. input_token = [input_token]
  388. return input_token
  389. def _get_more_results_token(self, config):
  390. more_results = config.get('more_results')
  391. if more_results is not None:
  392. return jmespath.compile(more_results)
  393. def _get_result_keys(self, config):
  394. result_key = config.get('result_key')
  395. if result_key is not None:
  396. if not isinstance(result_key, list):
  397. result_key = [result_key]
  398. result_key = [jmespath.compile(rk) for rk in result_key]
  399. return result_key
  400. def _get_limit_key(self, config):
  401. return config.get('limit_key')
  402. def paginate(self, **kwargs):
  403. """Create paginator object for an operation.
  404. This returns an iterable object. Iterating over
  405. this object will yield a single page of a response
  406. at a time.
  407. """
  408. page_params = self._extract_paging_params(kwargs)
  409. return self.PAGE_ITERATOR_CLS(
  410. self._method, self._input_token,
  411. self._output_token, self._more_results,
  412. self._result_keys, self._non_aggregate_keys,
  413. self._limit_key,
  414. page_params['MaxItems'],
  415. page_params['StartingToken'],
  416. page_params['PageSize'],
  417. kwargs)
  418. def _extract_paging_params(self, kwargs):
  419. pagination_config = kwargs.pop('PaginationConfig', {})
  420. max_items = pagination_config.get('MaxItems', None)
  421. if max_items is not None:
  422. max_items = int(max_items)
  423. page_size = pagination_config.get('PageSize', None)
  424. if page_size is not None:
  425. page_size = int(page_size)
  426. return {
  427. 'MaxItems': max_items,
  428. 'StartingToken': pagination_config.get('StartingToken', None),
  429. 'PageSize': page_size,
  430. }
  431. class ResultKeyIterator(object):
  432. """Iterates over the results of paginated responses.
  433. Each iterator is associated with a single result key.
  434. Iterating over this object will give you each element in
  435. the result key list.
  436. :param pages_iterator: An iterator that will give you
  437. pages of results (a ``PageIterator`` class).
  438. :param result_key: The JMESPath expression representing
  439. the result key.
  440. """
  441. def __init__(self, pages_iterator, result_key):
  442. self._pages_iterator = pages_iterator
  443. self.result_key = result_key
  444. def __iter__(self):
  445. for page in self._pages_iterator:
  446. results = self.result_key.search(page)
  447. if results is None:
  448. results = []
  449. for result in results:
  450. yield result