tasks.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. # from __future__ import absolute_import, unicode_literals
  2. import contextlib
  3. import os
  4. import sys
  5. import traceback
  6. import logging
  7. from celery import Celery
  8. # set the default Django settings module for the 'celery' program.
  9. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
  10. import django
  11. django.setup()
  12. app = Celery('loonflow')
  13. # Using a string here means the worker doesn't have to serialize
  14. # the configuration object to child processes.
  15. # - namespace='CELERY' means all celery-related configuration keys
  16. # should have a `CELERY_` prefix.
  17. app.config_from_object('django.conf:settings', namespace='CELERY')
  18. # Load task modules from all registered Django app configs.
  19. app.autodiscover_tasks()
  20. import json
  21. import requests
  22. from apps.ticket.models import TicketRecord
  23. from apps.workflow.models import Transition, State, WorkflowScript, Workflow, CustomNotice
  24. from service.account.account_base_service import account_base_service_ins
  25. from service.common.constant_service import constant_service_ins
  26. from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
  27. from service.common.common_service import CommonService, common_service_ins
  28. from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
  29. from django.conf import settings
  30. try:
  31. from StringIO import StringIO
  32. except ImportError:
  33. from io import StringIO
  34. logger = logging.getLogger('django')
  35. @app.task(bind=True)
  36. def debug_task(self):
  37. print('Request: {0!r}'.format(self.request))
  38. @app.task
  39. def test_task(a, b):
  40. print('a:', a)
  41. print('b:', b)
  42. print(a+b)
  43. @contextlib.contextmanager
  44. def stdoutIO(stdout=None):
  45. old = sys.stdout
  46. if stdout is None:
  47. try:
  48. # for python2
  49. stdout = StringIO.StringIO()
  50. except Exception:
  51. stdout = StringIO()
  52. sys.stdout = stdout
  53. yield stdout
  54. sys.stdout = old
  55. @app.task
  56. def run_flow_task(ticket_id, script_id_str, state_id, action_from='loonrobot'):
  57. """
  58. 执行工作流脚本
  59. :param script_id_star:通过脚本id来执行, 保存的是字符串
  60. :param ticket_id:
  61. :param state_id:
  62. :param action_from:
  63. :return:
  64. """
  65. script_id = int(script_id_str)
  66. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
  67. if ticket_obj.participant == script_id_str and ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROBOT:
  68. ## 校验脚本是否合法
  69. # 获取脚本名称
  70. script_obj = WorkflowScript.objects.filter(id=script_id, is_deleted=False, is_active=True).first()
  71. if not script_obj:
  72. return False, '脚本未注册或非激活状态'
  73. script_file = os.path.join(settings.MEDIA_ROOT, script_obj.saved_name.name)
  74. globals = {'ticket_id': ticket_id, 'action_from': action_from}
  75. # 如果需要脚本执行完成后,工单不往下流转(也就脚本执行失败或调用其他接口失败的情况),需要在脚本中抛出异常
  76. try:
  77. with stdoutIO() as s:
  78. # execfile(script_file, globals) # for python 2
  79. exec(open(script_file, encoding='utf-8').read(), globals)
  80. script_result = True
  81. # script_result_msg = ''.join(s.buflist)
  82. script_result_msg = ''.join(s.getvalue())
  83. except Exception as e:
  84. logger.error(traceback.format_exc())
  85. script_result = False
  86. script_result_msg = e.__str__()
  87. logger.info('*' * 20 + '工作流脚本回调,ticket_id:[%s]' % ticket_id + '*' * 20)
  88. logger.info('*******工作流脚本回调,ticket_id:{}*****'.format(ticket_id))
  89. # 因为上面的脚本执行时间可能会比较长,为了避免db session失效,重新获取ticket对象
  90. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
  91. # 新增处理记录,脚本后只允许只有一个后续直连状态
  92. transition_obj = Transition.objects.filter(source_state_id=state_id, is_deleted=False).first()
  93. new_ticket_flow_dict = dict(ticket_id=ticket_id, transition_id=transition_obj.id,
  94. suggestion=script_result_msg, participant_type_id=constant_service_ins.PARTICIPANT_TYPE_ROBOT,
  95. participant='脚本:(id:{}, name:{})'.format(script_obj.id, script_obj.name), state_id=state_id, creator='loonrobot')
  96. ticket_base_service_ins.add_ticket_flow_log(new_ticket_flow_dict)
  97. if not script_result:
  98. # 脚本执行失败,状态不更新,标记任务执行结果
  99. ticket_obj.script_run_last_result = False
  100. ticket_obj.save()
  101. return False, script_result_msg
  102. # 自动执行流转
  103. flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, dict(username='loonrobot',
  104. suggestion='脚本执行完成后自行流转',
  105. transition_id=transition_obj.id), False, True)
  106. if flag:
  107. logger.info('******脚本执行成功,工单基础信息更新完成, ticket_id:{}******'.format(ticket_id))
  108. return flag, msg
  109. else:
  110. return False, '工单当前处理人为非脚本,不执行脚本'
  111. @app.task
  112. def timer_transition(ticket_id, state_id, date_time, transition_id):
  113. """
  114. 定时器流转
  115. :param ticket_id:
  116. :param state_id:
  117. :param date_time:
  118. :param transition_id:
  119. :return:
  120. """
  121. # 需要满足工单此状态后续无其他操作才自动流转
  122. # 查询该工单此状态所有操作
  123. # flow_log_set, msg = TicketBaseService().get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
  124. flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
  125. if flag is False:
  126. return False, result
  127. flow_log_list = result.get('ticket_flow_log_restful_list')
  128. for flow_log in flow_log_list:
  129. if flow_log.get('state').get('state_id') == state_id and flow_log.get('gmt_created') > date_time:
  130. return True, '后续有操作,定时器失效'
  131. # 执行流转
  132. handle_ticket_data = dict(transition_id=transition_id, username='loonrobot', suggestion='定时器流转')
  133. ticket_base_service_ins.handle_ticket(ticket_id, handle_ticket_data, True)
  134. @app.task
  135. def send_ticket_notice(ticket_id):
  136. """
  137. 发送工单通知
  138. :param ticket_id:
  139. :return:
  140. """
  141. # 获取工单信息
  142. # 获取工作流信息,获取工作流的通知信息
  143. # 获取通知信息的标题和内容模板
  144. # 将通知内容,通知标题,通知人,作为hook的请求参数
  145. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
  146. if not ticket_obj:
  147. return False, 'ticket is not exist or has been deleted'
  148. workflow_id = ticket_obj.workflow_id
  149. workflow_obj = Workflow.objects.filter(id=workflow_id, is_deleted=0).first()
  150. notices = workflow_obj.notices
  151. if not notices:
  152. return True, 'no notice defined'
  153. notice_str_list = notices.split(',')
  154. notice_id_list = [int(notice_str) for notice_str in notice_str_list]
  155. send_notice_result_list = []
  156. title_template = workflow_obj.title_template
  157. content_template = workflow_obj.content_template
  158. # 获取工单所有字段的变量
  159. flag, ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  160. if flag is False:
  161. return False, ticket_value_info
  162. title_result = title_template.format(**ticket_value_info)
  163. content_result = content_template.format(**ticket_value_info)
  164. # 获取工单最后一条操作记录
  165. flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot')
  166. flow_log_list = result.get('ticket_flow_log_restful_list')
  167. last_flow_log = flow_log_list[0]
  168. participant_info_list = []
  169. participant_username_list = []
  170. from apps.account.models import LoonUser
  171. if ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_PERSONAL:
  172. participant_username_list = [ticket_obj.participant]
  173. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_MULTI:
  174. participant_username_list = ticket_obj.participant.split(',')
  175. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROLE:
  176. flag, participant_username_list = account_base_service_ins.get_role_username_list(ticket_obj.participant)
  177. if flag is False:
  178. return False, participant_username_list
  179. elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_DEPT:
  180. flag, participant_username_list = account_base_service_ins.get_dept_username_list(ticket_obj.participant)
  181. if not flag:
  182. return flag, participant_username_list
  183. if participant_username_list:
  184. participant_queryset = LoonUser.objects.filter(username__in=participant_username_list, is_deleted=0)
  185. for participant_0 in participant_queryset:
  186. participant_info_list.append(dict(username=participant_0.username, alias=participant_0.alias,
  187. phone=participant_0.phone, email=participant_0.email))
  188. params = {'title_result': title_result, 'content_result': content_result,
  189. 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
  190. 'multi_all_person': ticket_obj.multi_all_person, 'ticket_value_info': ticket_value_info,
  191. 'last_flow_log': last_flow_log, 'participant_info_list': participant_info_list}
  192. for notice_id in notice_id_list:
  193. notice_obj = CustomNotice.objects.filter(id=notice_id, is_deleted=0).first()
  194. if not notice_obj:
  195. continue
  196. hook_url = notice_obj.hook_url
  197. from service.workflow.workflow_base_service import workflow_base_service_ins
  198. flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
  199. if not flag:
  200. return False, msg
  201. hook_token = notice_obj.hook_token
  202. # gen signature
  203. flag, headers = common_service_ins.gen_signature_by_token(hook_token)
  204. try:
  205. r = requests.post(hook_url, headers=headers, json=params)
  206. result = r.json()
  207. if result.get('code') == 0:
  208. send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
  209. else:
  210. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
  211. except Exception as e:
  212. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
  213. return True, dict(send_notice_result_list=send_notice_result_list)
  214. @app.task
  215. def flow_hook_task(ticket_id):
  216. """
  217. hook 任务
  218. :param ticket_id:
  219. :return:
  220. """
  221. # 查询工单状态
  222. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
  223. state_id = ticket_obj.state_id
  224. state_obj = State.objects.filter(id=state_id, is_deleted=0).first()
  225. participant_type_id = state_obj.participant_type_id
  226. if participant_type_id != constant_service_ins.PARTICIPANT_TYPE_HOOK:
  227. return False, ''
  228. hook_config = state_obj.participant
  229. hook_config_dict = json.loads(hook_config)
  230. hook_url = hook_config_dict.get('hook_url')
  231. hook_token = hook_config_dict.get('hook_token')
  232. wait = hook_config_dict.get('wait')
  233. extra_info = hook_config_dict.get('extra_info')
  234. from service.workflow.workflow_base_service import workflow_base_service_ins
  235. hook_check_flag, hook_result_msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
  236. flag, msg = common_service_ins.gen_hook_signature(hook_token)
  237. if not flag:
  238. hook_check_flag, hook_result_msg = flag, msg
  239. flag, all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  240. flag, all_field_value_result = ticket_base_service_ins.get_ticket_all_field_value_json(ticket_id)
  241. all_ticket_data_json = all_field_value_result.get('all_field_value_json')
  242. if hook_check_flag:
  243. if extra_info is not None:
  244. all_ticket_data.update(dict(extra_info=extra_info))
  245. try:
  246. r = requests.post(hook_url, headers=msg, json=all_ticket_data, timeout=10)
  247. result = r.json()
  248. except Exception as e:
  249. result = dict(code=-1, msg=e.__str__())
  250. if result.get('code') == 0:
  251. # 调用成功
  252. TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
  253. suggestion=result.get('msg'),
  254. participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
  255. participant='hook', state_id=state_id,
  256. intervene_type_id=constant_service_ins.TRANSITION_INTERVENE_TYPE_HOOK,
  257. ticket_data=all_ticket_data_json,
  258. creator='loonrobot'
  259. ))
  260. if not wait:
  261. # 不等待hook目标回调,直接流转
  262. flag, transition_queryset = workflow_transition_service_ins.get_state_transition_queryset(state_id)
  263. transition_id = transition_queryset[0].id # hook状态只支持一个流转
  264. new_request_dict = {}
  265. new_request_dict.update({'transition_id': transition_id, 'suggestion': msg, 'username': 'loonrobot'})
  266. # 执行流转
  267. flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, new_request_dict, by_hook=True)
  268. if not flag:
  269. return False, msg
  270. return True, ''
  271. hook_result_msg = result.get('msg')
  272. ticket_base_service_ins.update_ticket_field_value(ticket_id, {'script_run_last_result': False})
  273. ticket_base_service_ins.add_ticket_flow_log(
  274. dict(ticket_id=ticket_id, transition_id=0, suggestion=hook_result_msg,
  275. participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
  276. participant='hook', state_id=state_id, ticket_data=all_ticket_data_json, creator='loonrobot'))