base.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. class LoonflowApiCall(object):
  2. """
  3. loonflow api调用
  4. """
  5. def __init__(self, username='admin'):
  6. from service.common.common_service import common_service_ins
  7. flag, msg = common_service_ins.gen_signature('ops')
  8. if not flag:
  9. pass
  10. self.signature = msg.get('signature', '')
  11. self.timestamp = msg.get('timestamp', '')
  12. self.headers = {'HTTP_SIGNATURE': self.signature, 'HTTP_TIMESTAMP': self.timestamp, 'HTTP_APPNAME': 'ops', 'HTTP_USERNAME':username}
  13. def api_call(self, method, url, params={}):
  14. import json
  15. from django.test.client import Client
  16. c = Client()
  17. if method not in ('get', 'post', 'patch', 'delete', 'put'):
  18. return json.loads(dict(code=-1, msg='method is invalid'))
  19. if method == 'get':
  20. response_content = c.get(url, data=params, **self.headers).content
  21. elif method == 'post':
  22. # response_content = c.post(url, params, content_type='application/json', **self.headers).content
  23. response_content = c.post(url, data=json.dumps(params), content_type='application/json', **self.headers).content
  24. elif method == 'patch':
  25. response_content = c.patch(url, data=json.dumps(params), content_type='application/json', **self.headers).content
  26. elif method == 'delete':
  27. response_content = c.delete(url, data=json.dumps(params), content_type='application/json', **self.headers).content
  28. elif method == 'put':
  29. response_content = c.put(url, data=json.dumps(params), content_type='application/json', **self.headers).content
  30. if url == '/api/v1.0/accounts/app_token':
  31. print('#'*30)
  32. print(method)
  33. print(params)
  34. print(response_content)
  35. print('#' * 30)
  36. response_content_dict = json.loads(str(response_content, encoding='utf-8'))
  37. return response_content_dict