tasks.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # from __future__ import absolute_import, unicode_literals
  2. import contextlib
  3. import os
  4. import sys
  5. import traceback
  6. import logging
  7. from celery import Celery
  8. # set the default Django settings module for the 'celery' program.
  9. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.dev')
  10. import django
  11. django.setup()
  12. app = Celery('loonflow')
  13. # Using a string here means the worker doesn't have to serialize
  14. # the configuration object to child processes.
  15. # - namespace='CELERY' means all celery-related configuration keys
  16. # should have a `CELERY_` prefix.
  17. app.config_from_object('django.conf:settings', namespace='CELERY')
  18. # Load task modules from all registered Django app configs.
  19. app.autodiscover_tasks()
  20. import json
  21. import requests
  22. from apps.ticket.models import TicketRecord
  23. from apps.workflow.models import Transition, State, WorkflowScript, Workflow, CustomNotice
  24. from service.account.account_base_service import AccountBaseService
  25. from service.common.constant_service import CONSTANT_SERVICE
  26. from service.ticket.ticket_base_service import TicketBaseService
  27. from service.common.common_service import CommonService
  28. from service.workflow.workflow_transition_service import WorkflowTransitionService
  29. from django.conf import settings
  30. try:
  31. from StringIO import StringIO
  32. except ImportError:
  33. from io import StringIO
  34. logger = logging.getLogger('django')
  35. @app.task(bind=True)
  36. def debug_task(self):
  37. print('Request: {0!r}'.format(self.request))
  38. @app.task
  39. def test_task(a, b):
  40. print('a:', a)
  41. print('b:', b)
  42. print(a+b)
  43. @contextlib.contextmanager
  44. def stdoutIO(stdout=None):
  45. old = sys.stdout
  46. if stdout is None:
  47. try:
  48. # for python2
  49. stdout = StringIO.StringIO()
  50. except Exception:
  51. stdout = StringIO()
  52. sys.stdout = stdout
  53. yield stdout
  54. sys.stdout = old
  55. @app.task
  56. def run_flow_task(ticket_id, script_id_str, state_id, action_from='loonrobot'):
  57. """
  58. 执行工作流脚本
  59. :param script_id_star:通过脚本id来执行, 保存的是字符串
  60. :param ticket_id:
  61. :param state_id:
  62. :param action_from:
  63. :return:
  64. """
  65. script_id = int(script_id_str)
  66. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
  67. if ticket_obj.participant == script_id_str and ticket_obj.participant_type_id == CONSTANT_SERVICE.PARTICIPANT_TYPE_ROBOT:
  68. ## 校验脚本是否合法
  69. # 获取脚本名称
  70. script_obj = WorkflowScript.objects.filter(id=script_id, is_deleted=False, is_active=True).first()
  71. if not script_obj:
  72. return False, '脚本未注册或非激活状态'
  73. script_file = os.path.join(settings.MEDIA_ROOT, script_obj.saved_name.name)
  74. globals = {'ticket_id': ticket_id, 'action_from': action_from}
  75. # 如果需要脚本执行完成后,工单不往下流转(也就脚本执行失败或调用其他接口失败的情况),需要在脚本中抛出异常
  76. try:
  77. with stdoutIO() as s:
  78. # execfile(script_file, globals) # for python 2
  79. exec(open(script_file, encoding='utf-8').read(), globals)
  80. script_result = True
  81. # script_result_msg = ''.join(s.buflist)
  82. script_result_msg = ''.join(s.getvalue())
  83. except Exception as e:
  84. logger.error(traceback.format_exc())
  85. script_result = False
  86. script_result_msg = e.__str__()
  87. logger.info('*' * 20 + '工作流脚本回调,ticket_id:[%s]' % ticket_id + '*' * 20)
  88. logger.info('*******工作流脚本回调,ticket_id:{}*****'.format(ticket_id))
  89. # 因为上面的脚本执行时间可能会比较长,为了避免db session失效,重新获取ticket对象
  90. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
  91. # 新增处理记录,脚本后只允许只有一个后续直连状态
  92. transition_obj = Transition.objects.filter(source_state_id=state_id, is_deleted=False).first()
  93. new_ticket_flow_dict = dict(ticket_id=ticket_id, transition_id=transition_obj.id,
  94. suggestion=script_result_msg, participant_type_id=CONSTANT_SERVICE.PARTICIPANT_TYPE_ROBOT,
  95. participant='脚本:(id:{}, name:{})'.format(script_obj.id, script_obj.name), state_id=state_id, creator='loonrobot')
  96. TicketBaseService.add_ticket_flow_log(new_ticket_flow_dict)
  97. if not script_result:
  98. # 脚本执行失败,状态不更新,标记任务执行结果
  99. ticket_obj.script_run_last_result = False
  100. ticket_obj.save()
  101. return False, script_result_msg
  102. # 自动执行流转
  103. flag, msg = TicketBaseService.handle_ticket(ticket_id, dict(username='loonrobot',
  104. suggestion='脚本执行完成后自行流转',
  105. transition_id=transition_obj.id), False, True)
  106. if flag:
  107. logger.info('******脚本执行成功,工单基础信息更新完成, ticket_id:{}******'.format(ticket_id))
  108. return flag, msg
  109. else:
  110. return False, '工单当前处理人为非脚本,不执行脚本'
  111. @app.task
  112. def timer_transition(ticket_id, state_id, date_time, transition_id):
  113. """
  114. 定时器流转
  115. :param ticket_id:
  116. :param state_id:
  117. :param date_time:
  118. :param transition_id:
  119. :return:
  120. """
  121. # 需要满足工单此状态后续无其他操作才自动流转
  122. # 查询该工单此状态所有操作
  123. flow_log_set, msg = TicketBaseService().get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
  124. for flow_log in flow_log_set:
  125. if flow_log.get('state').get('state_id') == state_id and flow_log.get('gmt_created') > date_time:
  126. return True, '后续有操作,定时器失效'
  127. # 执行流转
  128. handle_ticket_data = dict(transition_id=transition_id, username='loonrobot', suggestion='定时器流转')
  129. TicketBaseService().handle_ticket(ticket_id, handle_ticket_data, True)
  130. @app.task
  131. def send_ticket_notice(ticket_id):
  132. """
  133. 发送工单通知
  134. :param ticket_id:
  135. :return:
  136. """
  137. # 获取工单信息
  138. # 获取工作流信息,获取工作流的通知信息
  139. # 获取通知信息的标题和内容模板
  140. # 将通知内容,通知标题,通知人,作为变量传给通知脚本
  141. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
  142. if not ticket_obj:
  143. return False, 'ticket is not exist or has been deleted'
  144. workflow_id = ticket_obj.workflow_id
  145. workflow_obj = Workflow.objects.filter(id=workflow_id, is_deleted=0).first()
  146. notices = workflow_obj.notices
  147. if not notices:
  148. return True, 'no notice defined'
  149. notice_str_list = notices.split(',')
  150. notice_id_list = [int(notice_str) for notice_str in notice_str_list]
  151. send_notice_result_list = []
  152. for notice_id in notice_id_list:
  153. notice_obj = CustomNotice.objects.filter(id=notice_id, is_deleted=0).first()
  154. if not notice_obj:
  155. continue
  156. title_template = notice_obj.title_template
  157. content_template = notice_obj.content_template
  158. # 获取工单所有字段的变量
  159. ticket_value_info, msg = TicketBaseService.get_ticket_all_field_value(ticket_id)
  160. if not ticket_value_info:
  161. return False, msg
  162. title_result = title_template.format(**ticket_value_info)
  163. content_result = content_template.format(**ticket_value_info)
  164. notice_script_file_name = notice_obj.script.name
  165. notice_script_file = os.path.join(settings.MEDIA_ROOT, notice_script_file_name)
  166. # 获取工单最后一条操作记录
  167. flow_log_list, msg = TicketBaseService.get_ticket_flow_log(ticket_id, 'loonrobot')
  168. last_flow_log = flow_log_list[0]
  169. participant_info_list = []
  170. participant_username_list = []
  171. from apps.account.models import LoonUser
  172. if ticket_obj.participant_type_id == CONSTANT_SERVICE.PARTICIPANT_TYPE_PERSONAL:
  173. participant_username_list = [ticket_obj.participant]
  174. elif ticket_obj.participant_type_id in (CONSTANT_SERVICE.PARTICIPANT_TYPE_MULTI, CONSTANT_SERVICE.PARTICIPANT_TYPE_MULTI_ALL):
  175. participant_username_list = ticket_obj.participant.split(',')
  176. elif ticket_obj.participant_type_id == CONSTANT_SERVICE.PARTICIPANT_TYPE_ROLE:
  177. participant_username_list, msg = AccountBaseService.get_role_username_list(ticket_obj.participant)
  178. elif ticket_obj.participant_type_id == CONSTANT_SERVICE.PARTICIPANT_TYPE_DEPT:
  179. participant_username_list, msg = AccountBaseService.get_dept_username_list(ticket_obj.participant)
  180. if participant_username_list:
  181. participant_queryset = LoonUser.objects.filter(username__in=participant_username_list, is_deleted=0)
  182. for participant_0 in participant_queryset:
  183. participant_info_list.append(dict(username=participant_0.username, alias=participant_0.alias,
  184. phone=participant_0.phone, email=participant_0.email))
  185. globals = {'title_result': title_result, 'content_result': content_result,
  186. 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
  187. 'multi_all_person': ticket_obj.multi_all_person, 'ticket_value_info': ticket_value_info,
  188. 'last_flow_log': last_flow_log, 'participant_info_list': participant_info_list}
  189. try:
  190. with stdoutIO() as s:
  191. # execfile(script_file, globals) # for python 2
  192. exec(open(notice_script_file, encoding='utf-8').read(), globals)
  193. script_result = True
  194. # script_result_msg = ''.join(s.buflist)
  195. script_result_msg = ''.join(s.getvalue())
  196. logger.info('send notice successful for ticket_id: {}, notice_id:{}'.format(ticket_id, notice_id))
  197. except Exception as e:
  198. logger.error(traceback.format_exc())
  199. script_result = False
  200. script_result_msg = e.__str__()
  201. send_notice_result_list.append(dict(notice_id=notice_id, result=script_result, msg=script_result_msg))
  202. return send_notice_result_list
  203. @app.task
  204. def flow_hook_task(ticket_id):
  205. """
  206. hook 任务
  207. :param ticket_id:
  208. :return:
  209. """
  210. # 查询工单状态
  211. ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
  212. state_id = ticket_obj.state_id
  213. state_obj = State.objects.filter(id=state_id, is_deleted=0).first()
  214. participant_type_id = state_obj.participant_type_id
  215. if participant_type_id != CONSTANT_SERVICE.PARTICIPANT_TYPE_HOOK:
  216. return False, ''
  217. hook_config = state_obj.participant
  218. hook_config_dict= json.loads(hook_config)
  219. hook_url = hook_config_dict.get('hook_url')
  220. hook_token = hook_config_dict.get('hook_token')
  221. wait = hook_config_dict.get('wait')
  222. flag, msg = CommonService().gen_hook_signature(hook_token)
  223. if not flag:
  224. return False, msg
  225. all_ticket_data, msg = TicketBaseService().get_ticket_all_field_value(ticket_id)
  226. r = requests.post(hook_url, headers=msg, json=all_ticket_data, timeout=10)
  227. result = r.json()
  228. if result.get('code') == 0:
  229. # 调用成功
  230. if wait:
  231. # date等格式需要转换为str
  232. for key, value in all_ticket_data.items():
  233. if type(value) not in [int, str, bool, float]:
  234. all_ticket_data[key] = str(all_ticket_data[key])
  235. all_ticket_data_json = json.dumps(all_ticket_data)
  236. TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
  237. suggestion=result.get('msg'),
  238. participant_type_id=CONSTANT_SERVICE.PARTICIPANT_TYPE_HOOK,
  239. participant='hook', state_id=state_id,
  240. ticket_data=all_ticket_data_json,
  241. creator='loonrobot'
  242. ))
  243. return True, ''
  244. else:
  245. # 不等待hook目标回调,直接流转
  246. transition_queryset, msg = WorkflowTransitionService().get_state_transition_queryset(state_id)
  247. transition_id = transition_queryset[0] # hook状态只支持一个流转
  248. new_request_dict = {}
  249. new_request_dict.update({'transition_id': transition_id, 'suggestion': msg, 'username': 'loonrobot'})
  250. # 执行流转
  251. flag, msg = TicketBaseService().handle_ticket(ticket_id, new_request_dict, by_hook=True)
  252. if not flag:
  253. return False, msg
  254. else:
  255. TicketBaseService().update_ticket_field_value({'script_run_last_result': False})
  256. all_ticket_data, msg = TicketBaseService().get_ticket_all_field_value(ticket_id)
  257. # date等格式需要转换为str
  258. for key, value in all_ticket_data.items():
  259. if type(value) not in [int, str, bool, float]:
  260. all_ticket_data[key] = str(all_ticket_data[key])
  261. all_ticket_data_json = json.dumps(all_ticket_data)
  262. TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
  263. suggestion=result.get('msg'),
  264. participant_type_id=CONSTANT_SERVICE.PARTICIPANT_TYPE_HOOK,
  265. participant='hook', state_id=state_id, ticket_data=all_ticket_data_json,
  266. creator='loonrobot'
  267. ))