2
0

tasks.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # from __future__ import absolute_import, unicode_literals
  2. import contextlib
  3. import os
  4. import sys
  5. import traceback
  6. import logging
  7. from celery import Celery
  8. from service.account.account_dept_service import account_dept_service_ins
  9. from service.account.account_role_service import account_role_service_ins
  10. from service.exception.custom_common_exception import CustomCommonException
  11. from service.hook.hook_base_service import hook_base_service_ins
  12. from service.ticket.ticket_flow_history_service import ticket_flow_history_service_ins
  13. from service.ticket.ticket_node_service import ticket_node_service_ins
  14. # set the default Django settings module for the 'celery' program.
  15. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
  16. import django
  17. django.setup()
  18. app = Celery('loonflow')
  19. # Using a string here means the worker doesn't have to serialize
  20. # the configuration object to child processes.
  21. # - namespace='CELERY' means all celery-related configuration keys
  22. # should have a `CELERY_` prefix.
  23. app.config_from_object('django.conf:settings', namespace='CELERY')
  24. # Load task modules from all registered Django app configs.
  25. app.autodiscover_tasks()
  26. import json
  27. import requests
  28. from apps.ticket.models import TicketRecord
  29. from apps.workflow.models import Transition, Node, Workflow, WorkflowNotice
  30. from apps.manage.models import Notice
  31. from service.account.account_user_service import account_user_service_ins
  32. from service.common.constant_service import constant_service_ins
  33. from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
  34. from service.common.common_service import CommonService, common_service_ins
  35. from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
  36. from django.conf import settings
  37. try:
  38. from StringIO import StringIO
  39. except ImportError:
  40. from io import StringIO
  41. logger = logging.getLogger('django')
  42. @app.task(bind=True)
  43. def debug_task(self):
  44. print('Request: {0!r}'.format(self.request))
  45. @app.task
  46. def test_task(a, b):
  47. print('a:', a)
  48. print('b:', b)
  49. print(a+b)
  50. @contextlib.contextmanager
  51. def stdoutIO(stdout=None):
  52. old = sys.stdout
  53. if stdout is None:
  54. try:
  55. # for python2
  56. stdout = StringIO.StringIO()
  57. except Exception:
  58. stdout = StringIO()
  59. sys.stdout = stdout
  60. yield stdout
  61. sys.stdout = old
  62. @app.task
  63. def timer_transition(tenant_id, ticket_id, node_id, date_time, transition_id):
  64. """
  65. timer transition
  66. need no flow record for this node, then run time transition
  67. :param tenant_id:
  68. :param ticket_id:
  69. :param node_id:
  70. :param date_time:
  71. :param transition_id:
  72. :return:
  73. """
  74. flow_history_result = ticket_flow_history_service_ins.get_ticket_flow_history(tenant_id, ticket_id, 10000, 1)
  75. ticket_flow_history_object_format_list = flow_history_result.get("ticket_flow_history_object_format_list")
  76. for ticket_flow_history_object_format in ticket_flow_history_object_format_list:
  77. if ticket_flow_history_object_format.get("node_info").get("id") == node_id and ticket_flow_history_object_format.get("created_at") > date_time:
  78. return "warning: have flow history, timer is invalid"
  79. handle_ticket_data = dict(transition_id=transition_id, operator='loonrobot', suggestion='定时器流转')
  80. ticket_base_service_ins.handle_ticket(tenant_id, ticket_id, handle_ticket_data, True)
  81. @app.task
  82. def send_ticket_notice(tenant_id: int, ticket_id: int):
  83. """
  84. send ticket notice
  85. :param ticket_id:
  86. :return:
  87. """
  88. # get ticket info
  89. # get workflow notice config
  90. # get notice title and content template
  91. # hook with title content, ticket info
  92. ticket_obj = TicketRecord.objects.get(id=ticket_id)
  93. workflow_id = ticket_obj.workflow_id
  94. workflow_obj = Workflow.objects.get(id=workflow_id)
  95. workflow_notices = WorkflowNotice.objects.get(workflow_id=workflow_id).workflow_notices
  96. notice_str_list = workflow_notices.split(',')
  97. notice_id_list = [int(notice_str) for notice_str in notice_str_list]
  98. send_notice_result_list = []
  99. title_template = workflow_obj.title_template
  100. content_template = workflow_obj.content_template
  101. ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  102. title_result = title_template.format(**ticket_value_info)
  103. content_result = content_template.format(**ticket_value_info)
  104. flow_history_result = ticket_flow_history_service_ins.get_ticket_flow_history(tenant_id, ticket_id, 1)
  105. latest_history = flow_history_result.get("ticket_flow_history_object_format_list")[0]
  106. participant_queryset = account_user_service_ins.get_ticket_current_participant_list(tenant_id, ticket_id)
  107. participant_info_list = []
  108. for participant in participant_queryset:
  109. participant_info_list.append(dict(username=participant.username, alias=participant.alias,
  110. phone=participant.phone, email=participant.email))
  111. params = {'title_result': title_result, 'content_result': content_result,
  112. 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
  113. 'ticket_value_info': ticket_value_info, 'latest_history': latest_history,
  114. 'participant_info_list': participant_info_list}
  115. for notice_id in notice_id_list:
  116. notice_obj = Notice.objects.filter(id=notice_id).first()
  117. if not notice_obj:
  118. continue
  119. hook_url = notice_obj.hook_url
  120. from service.workflow.workflow_base_service import workflow_base_service_ins
  121. flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
  122. if not flag:
  123. return False, msg
  124. hook_token = notice_obj.hook_token
  125. # gen signature
  126. flag, headers = common_service_ins.gen_signature_by_token(hook_token)
  127. try:
  128. r = requests.post(hook_url, headers=headers, json=params)
  129. result = r.json()
  130. if result.get('code') == 0:
  131. send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
  132. else:
  133. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
  134. except Exception as e:
  135. send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
  136. return send_notice_result_list
  137. @app.task
  138. def flow_hook_task(tenant_id: int, ticket_id: int, node_id: int):
  139. """
  140. hook task
  141. :param tenant_id:
  142. :param ticket_id:
  143. :param node_id:
  144. :return:
  145. """
  146. node_obj = Node.objects.get(id=node_id)
  147. hook_config = node_obj.participant
  148. hook_config_dict = json.loads(hook_config)
  149. hook_url = hook_config_dict.get('hook_url')
  150. hook_token = hook_config_dict.get('hook_token')
  151. wait = hook_config_dict.get('wait')
  152. extra_info = hook_config_dict.get('extra_info')
  153. participant_type = node_obj.participant_type
  154. if participant_type != "hook":
  155. return CustomCommonException("current node's participant is not hook")
  156. all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
  157. all_ticket_data.update(extra_info=extra_info)
  158. hook_result = hook_base_service_ins.call_hook(hook_url, hook_token, all_ticket_data)
  159. comment = hook_result.get("msg")
  160. ticket_flow_history_service_ins.add_ticket_flow_history(tenant_id, 0, ticket_id, 0, comment, "hook", "", node_id, "hook",
  161. all_ticket_data)
  162. if hook_result.get("code") == 0:
  163. if not wait:
  164. transition_queryset = workflow_transition_service_ins.get_transition_queryset_by_source_node_id(node_id)
  165. transition_id = transition_queryset[0].id # only support one transition behind hook
  166. all_ticket_data.update(transition_id=transition_id)
  167. ticket_base_service_ins.handle_ticket(ticket_id, all_ticket_data, by_hook=True)
  168. else:
  169. ticket_node_participant_obj = dict()
  170. ticket_node_participant_obj["ticket_id"] = ticket_id
  171. ticket_node_participant_obj["node_id"] = node_id
  172. ticket_node_participant_obj["in_add_node"] = False
  173. ticket_node_participant_obj["add_node_target"] = ""
  174. ticket_node_participant_obj["hook_state"] = "fail"
  175. ticket_node_participant_obj["all_participant_result"] = {}
  176. ticket_node_service_ins.update_batch_record(tenant_id, [ticket_node_participant_obj])
  177. return True