123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- # from __future__ import absolute_import, unicode_literals
- import contextlib
- import os
- import sys
- import traceback
- import logging
- from celery import Celery
- from service.account.account_dept_service import account_dept_service_ins
- from service.account.account_role_service import account_role_service_ins
- from service.exception.custom_common_exception import CustomCommonException
- from service.hook.hook_base_service import hook_base_service_ins
- from service.ticket.ticket_flow_history_service import ticket_flow_history_service_ins
- from service.ticket.ticket_node_service import ticket_node_service_ins
- # set the default Django settings module for the 'celery' program.
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
- import django
- django.setup()
- app = Celery('loonflow')
- # Using a string here means the worker doesn't have to serialize
- # the configuration object to child processes.
- # - namespace='CELERY' means all celery-related configuration keys
- # should have a `CELERY_` prefix.
- app.config_from_object('django.conf:settings', namespace='CELERY')
- # Load task modules from all registered Django app configs.
- app.autodiscover_tasks()
- import json
- import requests
- from apps.ticket.models import TicketRecord
- from apps.workflow.models import Transition, Node, Workflow, WorkflowNotice
- from apps.manage.models import Notice
- from service.account.account_user_service import account_user_service_ins
- from service.common.constant_service import constant_service_ins
- from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
- from service.common.common_service import CommonService, common_service_ins
- from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
- from django.conf import settings
- try:
- from StringIO import StringIO
- except ImportError:
- from io import StringIO
- logger = logging.getLogger('django')
- @app.task(bind=True)
- def debug_task(self):
- print('Request: {0!r}'.format(self.request))
- @app.task
- def test_task(a, b):
- print('a:', a)
- print('b:', b)
- print(a+b)
- @contextlib.contextmanager
- def stdoutIO(stdout=None):
- old = sys.stdout
- if stdout is None:
- try:
- # for python2
- stdout = StringIO.StringIO()
- except Exception:
- stdout = StringIO()
- sys.stdout = stdout
- yield stdout
- sys.stdout = old
- @app.task
- def timer_transition(tenant_id, ticket_id, node_id, date_time, transition_id):
- """
- timer transition
- need no flow record for this node, then run time transition
- :param tenant_id:
- :param ticket_id:
- :param node_id:
- :param date_time:
- :param transition_id:
- :return:
- """
- flow_history_result = ticket_flow_history_service_ins.get_ticket_flow_history(tenant_id, ticket_id, 10000, 1)
- ticket_flow_history_object_format_list = flow_history_result.get("ticket_flow_history_object_format_list")
- for ticket_flow_history_object_format in ticket_flow_history_object_format_list:
- if ticket_flow_history_object_format.get("node_info").get("id") == node_id and ticket_flow_history_object_format.get("created_at") > date_time:
- return "warning: have flow history, timer is invalid"
- handle_ticket_data = dict(transition_id=transition_id, operator='loonrobot', suggestion='定时器流转')
- ticket_base_service_ins.handle_ticket(tenant_id, ticket_id, handle_ticket_data, True)
- @app.task
- def send_ticket_notice(tenant_id: int, ticket_id: int):
- """
- send ticket notice
- :param ticket_id:
- :return:
- """
- # get ticket info
- # get workflow notice config
- # get notice title and content template
- # hook with title content, ticket info
- ticket_obj = TicketRecord.objects.get(id=ticket_id)
- workflow_id = ticket_obj.workflow_id
- workflow_obj = Workflow.objects.get(id=workflow_id)
- workflow_notices = WorkflowNotice.objects.get(workflow_id=workflow_id).workflow_notices
- notice_str_list = workflow_notices.split(',')
- notice_id_list = [int(notice_str) for notice_str in notice_str_list]
- send_notice_result_list = []
- title_template = workflow_obj.title_template
- content_template = workflow_obj.content_template
- ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- title_result = title_template.format(**ticket_value_info)
- content_result = content_template.format(**ticket_value_info)
- flow_history_result = ticket_flow_history_service_ins.get_ticket_flow_history(tenant_id, ticket_id, 1)
- latest_history = flow_history_result.get("ticket_flow_history_object_format_list")[0]
- participant_queryset = account_user_service_ins.get_ticket_current_participant_list(tenant_id, ticket_id)
- participant_info_list = []
- for participant in participant_queryset:
- participant_info_list.append(dict(username=participant.username, alias=participant.alias,
- phone=participant.phone, email=participant.email))
- params = {'title_result': title_result, 'content_result': content_result,
- 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
- 'ticket_value_info': ticket_value_info, 'latest_history': latest_history,
- 'participant_info_list': participant_info_list}
- for notice_id in notice_id_list:
- notice_obj = Notice.objects.filter(id=notice_id).first()
- if not notice_obj:
- continue
- hook_url = notice_obj.hook_url
- from service.workflow.workflow_base_service import workflow_base_service_ins
- flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
- if not flag:
- return False, msg
- hook_token = notice_obj.hook_token
- # gen signature
- flag, headers = common_service_ins.gen_signature_by_token(hook_token)
- try:
- r = requests.post(hook_url, headers=headers, json=params)
- result = r.json()
- if result.get('code') == 0:
- send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
- else:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
- except Exception as e:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
- return send_notice_result_list
- @app.task
- def flow_hook_task(tenant_id: int, ticket_id: int, node_id: int):
- """
- hook task
- :param tenant_id:
- :param ticket_id:
- :param node_id:
- :return:
- """
- node_obj = Node.objects.get(id=node_id)
- hook_config = node_obj.participant
- hook_config_dict = json.loads(hook_config)
- hook_url = hook_config_dict.get('hook_url')
- hook_token = hook_config_dict.get('hook_token')
- wait = hook_config_dict.get('wait')
- extra_info = hook_config_dict.get('extra_info')
- participant_type = node_obj.participant_type
- if participant_type != "hook":
- return CustomCommonException("current node's participant is not hook")
- all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- all_ticket_data.update(extra_info=extra_info)
- hook_result = hook_base_service_ins.call_hook(hook_url, hook_token, all_ticket_data)
- comment = hook_result.get("msg")
- ticket_flow_history_service_ins.add_ticket_flow_history(tenant_id, 0, ticket_id, 0, comment, "hook", "", node_id, "hook",
- all_ticket_data)
- if hook_result.get("code") == 0:
- if not wait:
- transition_queryset = workflow_transition_service_ins.get_transition_queryset_by_source_node_id(node_id)
- transition_id = transition_queryset[0].id # only support one transition behind hook
- all_ticket_data.update(transition_id=transition_id)
- ticket_base_service_ins.handle_ticket(ticket_id, all_ticket_data, by_hook=True)
- else:
- ticket_node_participant_obj = dict()
- ticket_node_participant_obj["ticket_id"] = ticket_id
- ticket_node_participant_obj["node_id"] = node_id
- ticket_node_participant_obj["in_add_node"] = False
- ticket_node_participant_obj["add_node_target"] = ""
- ticket_node_participant_obj["hook_state"] = "fail"
- ticket_node_participant_obj["all_participant_result"] = {}
- ticket_node_service_ins.update_batch_record(tenant_id, [ticket_node_participant_obj])
- return True
|