waiter.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # Copyright 2012-2014 ksyun.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import jmespath
  14. import logging
  15. import time
  16. from kscore.utils import get_service_module_name
  17. from kscore.docs.docstring import WaiterDocstring
  18. from .exceptions import WaiterError, ClientError, WaiterConfigError
  19. from . import xform_name
  20. logger = logging.getLogger(__name__)
  21. def create_waiter_with_client(waiter_name, waiter_model, client):
  22. """
  23. :type waiter_name: str
  24. :param waiter_name: The name of the waiter. The name should match
  25. the name (including the casing) of the key name in the waiter
  26. model file (typically this is CamelCasing).
  27. :type waiter_model: kscore.waiter.WaiterModel
  28. :param waiter_model: The model for the waiter configuration.
  29. :type client: kscore.client.BaseClient
  30. :param client: The kscore client associated with the service.
  31. :rtype: kscore.waiter.Waiter
  32. :return: The waiter object.
  33. """
  34. single_waiter_config = waiter_model.get_waiter(waiter_name)
  35. operation_name = xform_name(single_waiter_config.operation)
  36. operation_method = NormalizedOperationMethod(
  37. getattr(client, operation_name))
  38. # Create a new wait method that will serve as a proxy to the underlying
  39. # Waiter.wait method. This is needed to attach a docstring to the
  40. # method.
  41. def wait(self, **kwargs):
  42. Waiter.wait(self, **kwargs)
  43. wait.__doc__ = WaiterDocstring(
  44. waiter_name=waiter_name,
  45. event_emitter=client.meta.events,
  46. service_model=client.meta.service_model,
  47. service_waiter_model=waiter_model,
  48. include_signature=False
  49. )
  50. # Rename the waiter class based on the type of waiter.
  51. waiter_class_name = str('%s.Waiter.%s' % (
  52. get_service_module_name(client.meta.service_model),
  53. waiter_name))
  54. # Create the new waiter class
  55. documented_waiter_cls = type(
  56. waiter_class_name, (Waiter,), {'wait': wait})
  57. # Return an instance of the new waiter class.
  58. return documented_waiter_cls(
  59. waiter_name, single_waiter_config, operation_method
  60. )
  61. class NormalizedOperationMethod(object):
  62. def __init__(self, client_method):
  63. self._client_method = client_method
  64. def __call__(self, **kwargs):
  65. try:
  66. return self._client_method(**kwargs)
  67. except ClientError as e:
  68. return e.response
  69. class WaiterModel(object):
  70. SUPPORTED_VERSION = 2
  71. def __init__(self, waiter_config):
  72. """
  73. Note that the WaiterModel takes ownership of the waiter_config.
  74. It may or may not mutate the waiter_config. If this is a concern,
  75. it is best to make a copy of the waiter config before passing it to
  76. the WaiterModel.
  77. :type waiter_config: dict
  78. :param waiter_config: The loaded waiter config
  79. from the <service>*.waiters.json file. This can be
  80. obtained from a kscore Loader object as well.
  81. """
  82. self._waiter_config = waiter_config['waiters']
  83. # These are part of the public API. Changing these
  84. # will result in having to update the consuming code,
  85. # so don't change unless you really need to.
  86. version = waiter_config.get('version', 'unknown')
  87. self._verify_supported_version(version)
  88. self.version = version
  89. self.waiter_names = list(sorted(waiter_config['waiters'].keys()))
  90. def _verify_supported_version(self, version):
  91. if version != self.SUPPORTED_VERSION:
  92. raise WaiterConfigError(
  93. error_msg=("Unsupported waiter version, supported version "
  94. "must be: %s, but version of waiter config "
  95. "is: %s" % (self.SUPPORTED_VERSION,
  96. version)))
  97. def get_waiter(self, waiter_name):
  98. try:
  99. single_waiter_config = self._waiter_config[waiter_name]
  100. except KeyError:
  101. raise ValueError("Waiter does not exist: %s" % waiter_name)
  102. return SingleWaiterConfig(single_waiter_config)
  103. class SingleWaiterConfig(object):
  104. """Represents the waiter configuration for a single waiter.
  105. A single waiter is considered the configuration for a single
  106. value associated with a named waiter (i.e TableExists).
  107. """
  108. def __init__(self, single_waiter_config):
  109. self._config = single_waiter_config
  110. # These attributes are part of the public API.
  111. self.description = single_waiter_config.get('description', '')
  112. # Per the spec, these three fields are required.
  113. self.operation = single_waiter_config['operation']
  114. self.delay = single_waiter_config['delay']
  115. self.max_attempts = single_waiter_config['maxAttempts']
  116. @property
  117. def acceptors(self):
  118. acceptors = []
  119. for acceptor_config in self._config['acceptors']:
  120. acceptor = AcceptorConfig(acceptor_config)
  121. acceptors.append(acceptor)
  122. return acceptors
  123. class AcceptorConfig(object):
  124. def __init__(self, config):
  125. self.state = config['state']
  126. self.matcher = config['matcher']
  127. self.expected = config['expected']
  128. self.argument = config.get('argument')
  129. self.matcher_func = self._create_matcher_func()
  130. def _create_matcher_func(self):
  131. # An acceptor function is a callable that takes a single value. The
  132. # parsed KSYUN response. Note that the parsed error response is also
  133. # provided in the case of errors, so it's entirely possible to
  134. # handle all the available matcher capabilities in the future.
  135. # There's only three supported matchers, so for now, this is all
  136. # contained to a single method. If this grows, we can expand this
  137. # out to separate methods or even objects.
  138. if self.matcher == 'path':
  139. return self._create_path_matcher()
  140. elif self.matcher == 'pathAll':
  141. return self._create_path_all_matcher()
  142. elif self.matcher == 'pathAny':
  143. return self._create_path_any_matcher()
  144. elif self.matcher == 'status':
  145. return self._create_status_matcher()
  146. elif self.matcher == 'error':
  147. return self._create_error_matcher()
  148. else:
  149. raise WaiterConfigError(
  150. error_msg="Unknown acceptor: %s" % self.matcher)
  151. def _create_path_matcher(self):
  152. expression = jmespath.compile(self.argument)
  153. expected = self.expected
  154. def acceptor_matches(response):
  155. return expression.search(response) == expected
  156. return acceptor_matches
  157. def _create_path_all_matcher(self):
  158. expression = jmespath.compile(self.argument)
  159. expected = self.expected
  160. def acceptor_matches(response):
  161. result = expression.search(response)
  162. if not isinstance(result, list) or not result:
  163. # pathAll matcher must result in a list.
  164. # Also we require at least one element in the list,
  165. # that is, an empty list should not result in this
  166. # acceptor match.
  167. return False
  168. for element in result:
  169. if element != expected:
  170. return False
  171. return True
  172. return acceptor_matches
  173. def _create_path_any_matcher(self):
  174. expression = jmespath.compile(self.argument)
  175. expected = self.expected
  176. def acceptor_matches(response):
  177. result = expression.search(response)
  178. if not isinstance(result, list) or not result:
  179. # pathAny matcher must result in a list.
  180. # Also we require at least one element in the list,
  181. # that is, an empty list should not result in this
  182. # acceptor match.
  183. return False
  184. for element in result:
  185. if element == expected:
  186. return True
  187. return False
  188. return acceptor_matches
  189. def _create_status_matcher(self):
  190. expected = self.expected
  191. def acceptor_matches(response):
  192. # We don't have any requirements on the expected incoming data
  193. # other than it is a dict, so we don't assume there's
  194. # a ResponseMetadata.HTTPStatusCode.
  195. status_code = response.get('ResponseMetadata', {}).get(
  196. 'HTTPStatusCode')
  197. return status_code == expected
  198. return acceptor_matches
  199. def _create_error_matcher(self):
  200. expected = self.expected
  201. def acceptor_matches(response):
  202. # When the client encounters an error, it will normally raise
  203. # an exception. However, the waiter implementation will catch
  204. # this exception, and instead send us the parsed error
  205. # response. So response is still a dictionary, and in the case
  206. # of an error response will contain the "Error" and
  207. # "ResponseMetadata" key.
  208. return response.get("Error", {}).get("Code", "") == expected
  209. return acceptor_matches
  210. class Waiter(object):
  211. def __init__(self, name, config, operation_method):
  212. """
  213. :type name: string
  214. :param name: The name of the waiter
  215. :type config: kscore.waiter.SingleWaiterConfig
  216. :param config: The configuration for the waiter.
  217. :type operation_method: callable
  218. :param operation_method: A callable that accepts **kwargs
  219. and returns a response. For example, this can be
  220. a method from a kscore client.
  221. """
  222. self._operation_method = operation_method
  223. # The two attributes are exposed to allow for introspection
  224. # and documentation.
  225. self.name = name
  226. self.config = config
  227. def wait(self, **kwargs):
  228. acceptors = list(self.config.acceptors)
  229. current_state = 'waiting'
  230. sleep_amount = self.config.delay
  231. num_attempts = 0
  232. max_attempts = self.config.max_attempts
  233. while True:
  234. response = self._operation_method(**kwargs)
  235. num_attempts += 1
  236. for acceptor in acceptors:
  237. if acceptor.matcher_func(response):
  238. current_state = acceptor.state
  239. break
  240. else:
  241. # If none of the acceptors matched, we should
  242. # transition to the failure state if an error
  243. # response was received.
  244. if 'Error' in response:
  245. # Transition to the failure state, which we can
  246. # just handle here by raising an exception.
  247. raise WaiterError(
  248. name=self.name,
  249. reason=response['Error'].get('Message', 'Unknown'))
  250. if current_state == 'success':
  251. logger.debug("Waiting complete, waiter matched the "
  252. "success state.")
  253. return
  254. if current_state == 'failure':
  255. raise WaiterError(
  256. name=self.name,
  257. reason='Waiter encountered a terminal failure state')
  258. if num_attempts >= max_attempts:
  259. raise WaiterError(name=self.name,
  260. reason='Max attempts exceeded')
  261. time.sleep(sleep_amount)