tasks.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # from __future__ import absolute_import, unicode_literals
  2. import contextlib
  3. import os
  4. import sys
  5. import traceback
  6. import logging
  7. from celery import Celery
  8. # set the default Django settings module for the 'celery' program.
  9. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
  10. import django
  11. django.setup()
  12. app = Celery('loonflow')
  13. # Using a string here means the worker doesn't have to serialize
  14. # the configuration object to child processes.
  15. # - namespace='CELERY' means all celery-related configuration keys
  16. # should have a `CELERY_` prefix.
  17. app.config_from_object('django.conf:settings', namespace='CELERY')
  18. # Load task modules from all registered Django app configs.
  19. app.autodiscover_tasks()
  20. import json
  21. import requests
  22. from apps.ticket.models import TicketRecord
  23. from apps.workflow.models import Transition, Node, Workflow, CustomNotice
  24. from service.account.account_base_service import account_base_service_ins
  25. from service.common.constant_service import constant_service_ins
  26. from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
  27. from service.common.common_service import CommonService, common_service_ins
  28. from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
  29. from django.conf import settings
  30. try:
  31. from StringIO import StringIO
  32. except ImportError:
  33. from io import StringIO
  34. logger = logging.getLogger('django')
  35. @app.task(bind=True)
  36. def debug_task(self):
  37. print('Request: {0!r}'.format(self.request))
  38. @app.task
  39. def test_task(a, b):
  40. print('a:', a)
  41. print('b:', b)
  42. print(a+b)
  43. @contextlib.contextmanager
  44. def stdoutIO(stdout=None):
  45. old = sys.stdout
  46. if stdout is None:
  47. try:
  48. # for python2
  49. stdout = StringIO.StringIO()
  50. except Exception:
  51. stdout = StringIO()
  52. sys.stdout = stdout
  53. yield stdout
  54. sys.stdout = old
  55. @app.task
  56. def timer_transition(ticket_id, state_id, date_time, transition_id):
  57. """
  58. 定时器流转
  59. :param ticket_id:
  60. :param state_id:
  61. :param date_time:
  62. :param transition_id:
  63. :return:
  64. """
  65. # 需要满足工单此状态后续无其他操作才自动流转
  66. # 查询该工单此状态所有操作
  67. # flow_log_set, msg = TicketBaseService().get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
  68. flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
  69. if flag is False:
  70. return False, result
  71. flow_log_list = result.get('ticket_flow_log_restful_list')
  72. for flow_log in flow_log_list:
  73. if flow_log.get('state').get('state_id') == state_id and flow_log.get('gmt_created') > date_time:
  74. return True, '后续有操作,定时器失效'
  75. # 执行流转
  76. handle_ticket_data = dict(transition_id=transition_id, username='loonrobot', suggestion='定时器流转')
  77. ticket_base_service_ins.handle_ticket(ticket_id, handle_ticket_data, True)
  78. @app.task
  79. def send_ticket_notice(ticket_id):
  80. """
  81. 发送工单通知
  82. :param ticket_id:
  83. :return:
  84. """
  85. # 获取工单信息
  86. # 获取工作流信息,获取工作流的通知信息
  87. # 获取通知信息的标题和内容模板
  88. # 将通知内容,通知标题,通知人,作为hook的请求参数
  89. ticket_obj = TicketRecord.objects.filter(id=ticket_id).first()
  90. if not ticket_obj:
  91. return False, 'ticket is not exist or has been deleted'
  92. workflow_id = ticket_obj.workflow_id
  93. workflow_obj = Workflow.objects.filter(id=workflow_id).first()
  94. notices = workflow_obj.notices
  95. if not notices:
  96. return True, 'no notice defined'
  97. notice_str_list = notices.split(',')
  98. notice_id_list = [int(notice_str) for notice_str in notice_str_list]
  99. send_notice_result_list = []
  100. title_template = workflow_obj.title_template
  101. content_template = workflow_obj.content_template
  102. # 获取工单所有字段的变量
  103. flag, ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  104. if flag is False:
  105. return False, ticket_value_info
  106. title_result = title_template.format(**ticket_value_info)
  107. content_result = content_template.format(**ticket_value_info)
  108. # 获取工单最后一条操作记录
  109. flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot')
  110. flow_log_list = result.get('ticket_flow_log_restful_list')
  111. last_flow_log = flow_log_list[0]
  112. participant_info_list = []
  113. participant_username_list = []
  114. from apps.account.models import User
  115. if ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_PERSONAL:
  116. participant_username_list = [ticket_obj.participant]
  117. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_MULTI:
  118. participant_username_list = ticket_obj.participant.split(',')
  119. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROLE:
  120. flag, participant_username_list = account_base_service_ins.get_role_username_list(ticket_obj.participant)
  121. if flag is False:
  122. return False, participant_username_list
  123. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_DEPT:
  124. flag, participant_username_list = account_base_service_ins.get_dept_username_list(ticket_obj.participant)
  125. if not flag:
  126. return flag, participant_username_list
  127. if participant_username_list:
  128. participant_queryset = User.objects.filter(username__in=participant_username_list)
  129. for participant_0 in participant_queryset:
  130. participant_info_list.append(dict(username=participant_0.username, alias=participant_0.alias,
  131. phone=participant_0.phone, email=participant_0.email))
  132. params = {'title_result': title_result, 'content_result': content_result,
  133. 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
  134. 'multi_all_person': ticket_obj.multi_all_person, 'ticket_value_info': ticket_value_info,
  135. 'last_flow_log': last_flow_log, 'participant_info_list': participant_info_list}
  136. for notice_id in notice_id_list:
  137. notice_obj = CustomNotice.objects.filter(id=notice_id).first()
  138. if not notice_obj:
  139. continue
  140. hook_url = notice_obj.hook_url
  141. from service.workflow.workflow_base_service import workflow_base_service_ins
  142. flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
  143. if not flag:
  144. return False, msg
  145. hook_token = notice_obj.hook_token
  146. # gen signature
  147. flag, headers = common_service_ins.gen_signature_by_token(hook_token)
  148. try:
  149. r = requests.post(hook_url, headers=headers, json=params)
  150. result = r.json()
  151. if result.get('code') == 0:
  152. send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
  153. else:
  154. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
  155. except Exception as e:
  156. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
  157. return True, dict(send_notice_result_list=send_notice_result_list)
  158. @app.task
  159. def flow_hook_task(ticket_id):
  160. """
  161. hook 任务
  162. :param ticket_id:
  163. :return:
  164. """
  165. # 查询工单状态
  166. ticket_obj = TicketRecord.objects.filter(id=ticket_id).first()
  167. state_id = ticket_obj.state_id
  168. state_obj = Node.objects.filter(id=state_id).first()
  169. participant_type_id = state_obj.participant_type_id
  170. if participant_type_id != constant_service_ins.PARTICIPANT_TYPE_HOOK:
  171. return False, ''
  172. hook_config = state_obj.participant
  173. hook_config_dict = json.loads(hook_config)
  174. hook_url = hook_config_dict.get('hook_url')
  175. hook_token = hook_config_dict.get('hook_token')
  176. wait = hook_config_dict.get('wait')
  177. extra_info = hook_config_dict.get('extra_info')
  178. from service.workflow.workflow_base_service import workflow_base_service_ins
  179. hook_check_flag, hook_result_msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
  180. flag, msg = common_service_ins.gen_hook_signature(hook_token)
  181. if not flag:
  182. hook_check_flag, hook_result_msg = flag, msg
  183. flag, all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  184. flag, all_field_value_result = ticket_base_service_ins.get_ticket_all_field_value_json(ticket_id)
  185. all_ticket_data_json = all_field_value_result.get('all_field_value_json')
  186. if hook_check_flag:
  187. if extra_info is not None:
  188. all_ticket_data.update(dict(extra_info=extra_info))
  189. try:
  190. r = requests.post(hook_url, headers=msg, json=all_ticket_data, timeout=10)
  191. result = r.json()
  192. except Exception as e:
  193. result = dict(code=-1, msg=e.__str__())
  194. if result.get('code') == 0:
  195. # 调用成功
  196. TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
  197. suggestion=result.get('msg'),
  198. participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
  199. participant='hook', state_id=state_id,
  200. intervene_type_id=constant_service_ins.TRANSITION_INTERVENE_TYPE_HOOK,
  201. ticket_data=all_ticket_data_json,
  202. creator='loonrobot'
  203. ))
  204. if not wait:
  205. # 不等待hook目标回调,直接流转
  206. flag, transition_queryset = workflow_transition_service_ins.get_state_transition_queryset(state_id)
  207. transition_id = transition_queryset[0].id # hook状态只支持一个流转
  208. new_request_dict = {}
  209. new_request_dict.update({'transition_id': transition_id, 'suggestion': msg, 'username': 'loonrobot'})
  210. # 执行流转
  211. flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, new_request_dict, by_hook=True)
  212. if not flag:
  213. return False, msg
  214. return True, ''
  215. hook_result_msg = result.get('msg')
  216. ticket_base_service_ins.update_ticket_field_value(ticket_id, {'script_run_last_result': False})
  217. ticket_base_service_ins.add_ticket_flow_log(
  218. dict(ticket_id=ticket_id, transition_id=0, suggestion=hook_result_msg,
  219. participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
  220. participant='hook', state_id=state_id, ticket_data=all_ticket_data_json, creator='loonrobot'))