123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- # from __future__ import absolute_import, unicode_literals
- import contextlib
- import os
- import sys
- import traceback
- import logging
- from celery import Celery
- # set the default Django settings module for the 'celery' program.
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
- import django
- django.setup()
- app = Celery('loonflow')
- # Using a string here means the worker doesn't have to serialize
- # the configuration object to child processes.
- # - namespace='CELERY' means all celery-related configuration keys
- # should have a `CELERY_` prefix.
- app.config_from_object('django.conf:settings', namespace='CELERY')
- # Load task modules from all registered Django app configs.
- app.autodiscover_tasks()
- import json
- import requests
- from apps.ticket.models import TicketRecord
- from apps.workflow.models import Transition, Node, Workflow, CustomNotice
- from service.account.account_base_service import account_base_service_ins
- from service.common.constant_service import constant_service_ins
- from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
- from service.common.common_service import CommonService, common_service_ins
- from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
- from django.conf import settings
- try:
- from StringIO import StringIO
- except ImportError:
- from io import StringIO
- logger = logging.getLogger('django')
- @app.task(bind=True)
- def debug_task(self):
- print('Request: {0!r}'.format(self.request))
- @app.task
- def test_task(a, b):
- print('a:', a)
- print('b:', b)
- print(a+b)
- @contextlib.contextmanager
- def stdoutIO(stdout=None):
- old = sys.stdout
- if stdout is None:
- try:
- # for python2
- stdout = StringIO.StringIO()
- except Exception:
- stdout = StringIO()
- sys.stdout = stdout
- yield stdout
- sys.stdout = old
- @app.task
- def timer_transition(ticket_id, state_id, date_time, transition_id):
- """
- 定时器流转
- :param ticket_id:
- :param state_id:
- :param date_time:
- :param transition_id:
- :return:
- """
- # 需要满足工单此状态后续无其他操作才自动流转
- # 查询该工单此状态所有操作
- # flow_log_set, msg = TicketBaseService().get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
- flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
- if flag is False:
- return False, result
- flow_log_list = result.get('ticket_flow_log_restful_list')
- for flow_log in flow_log_list:
- if flow_log.get('state').get('state_id') == state_id and flow_log.get('gmt_created') > date_time:
- return True, '后续有操作,定时器失效'
- # 执行流转
- handle_ticket_data = dict(transition_id=transition_id, username='loonrobot', suggestion='定时器流转')
- ticket_base_service_ins.handle_ticket(ticket_id, handle_ticket_data, True)
- @app.task
- def send_ticket_notice(ticket_id):
- """
- 发送工单通知
- :param ticket_id:
- :return:
- """
- # 获取工单信息
- # 获取工作流信息,获取工作流的通知信息
- # 获取通知信息的标题和内容模板
- # 将通知内容,通知标题,通知人,作为hook的请求参数
- ticket_obj = TicketRecord.objects.filter(id=ticket_id).first()
- if not ticket_obj:
- return False, 'ticket is not exist or has been deleted'
- workflow_id = ticket_obj.workflow_id
- workflow_obj = Workflow.objects.filter(id=workflow_id).first()
- notices = workflow_obj.notices
- if not notices:
- return True, 'no notice defined'
- notice_str_list = notices.split(',')
- notice_id_list = [int(notice_str) for notice_str in notice_str_list]
- send_notice_result_list = []
- title_template = workflow_obj.title_template
- content_template = workflow_obj.content_template
- # 获取工单所有字段的变量
- flag, ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- if flag is False:
- return False, ticket_value_info
- title_result = title_template.format(**ticket_value_info)
- content_result = content_template.format(**ticket_value_info)
- # 获取工单最后一条操作记录
- flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot')
- flow_log_list = result.get('ticket_flow_log_restful_list')
- last_flow_log = flow_log_list[0]
- participant_info_list = []
- participant_username_list = []
- from apps.account.models import User
- if ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_PERSONAL:
- participant_username_list = [ticket_obj.participant]
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_MULTI:
- participant_username_list = ticket_obj.participant.split(',')
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROLE:
- flag, participant_username_list = account_base_service_ins.get_role_username_list(ticket_obj.participant)
- if flag is False:
- return False, participant_username_list
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_DEPT:
- flag, participant_username_list = account_base_service_ins.get_dept_username_list(ticket_obj.participant)
- if not flag:
- return flag, participant_username_list
- if participant_username_list:
- participant_queryset = User.objects.filter(username__in=participant_username_list)
- for participant_0 in participant_queryset:
- participant_info_list.append(dict(username=participant_0.username, alias=participant_0.alias,
- phone=participant_0.phone, email=participant_0.email))
- params = {'title_result': title_result, 'content_result': content_result,
- 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
- 'multi_all_person': ticket_obj.multi_all_person, 'ticket_value_info': ticket_value_info,
- 'last_flow_log': last_flow_log, 'participant_info_list': participant_info_list}
- for notice_id in notice_id_list:
- notice_obj = CustomNotice.objects.filter(id=notice_id).first()
- if not notice_obj:
- continue
- hook_url = notice_obj.hook_url
- from service.workflow.workflow_base_service import workflow_base_service_ins
- flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
- if not flag:
- return False, msg
- hook_token = notice_obj.hook_token
- # gen signature
- flag, headers = common_service_ins.gen_signature_by_token(hook_token)
- try:
- r = requests.post(hook_url, headers=headers, json=params)
- result = r.json()
- if result.get('code') == 0:
- send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
- else:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
- except Exception as e:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
- return True, dict(send_notice_result_list=send_notice_result_list)
- @app.task
- def flow_hook_task(ticket_id):
- """
- hook 任务
- :param ticket_id:
- :return:
- """
- # 查询工单状态
- ticket_obj = TicketRecord.objects.filter(id=ticket_id).first()
- state_id = ticket_obj.state_id
- state_obj = Node.objects.filter(id=state_id).first()
- participant_type_id = state_obj.participant_type_id
- if participant_type_id != constant_service_ins.PARTICIPANT_TYPE_HOOK:
- return False, ''
- hook_config = state_obj.participant
- hook_config_dict = json.loads(hook_config)
- hook_url = hook_config_dict.get('hook_url')
- hook_token = hook_config_dict.get('hook_token')
- wait = hook_config_dict.get('wait')
- extra_info = hook_config_dict.get('extra_info')
- from service.workflow.workflow_base_service import workflow_base_service_ins
- hook_check_flag, hook_result_msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
- flag, msg = common_service_ins.gen_hook_signature(hook_token)
- if not flag:
- hook_check_flag, hook_result_msg = flag, msg
- flag, all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- flag, all_field_value_result = ticket_base_service_ins.get_ticket_all_field_value_json(ticket_id)
- all_ticket_data_json = all_field_value_result.get('all_field_value_json')
- if hook_check_flag:
- if extra_info is not None:
- all_ticket_data.update(dict(extra_info=extra_info))
- try:
- r = requests.post(hook_url, headers=msg, json=all_ticket_data, timeout=10)
- result = r.json()
- except Exception as e:
- result = dict(code=-1, msg=e.__str__())
- if result.get('code') == 0:
- # 调用成功
- TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
- suggestion=result.get('msg'),
- participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
- participant='hook', state_id=state_id,
- intervene_type_id=constant_service_ins.TRANSITION_INTERVENE_TYPE_HOOK,
- ticket_data=all_ticket_data_json,
- creator='loonrobot'
- ))
- if not wait:
- # 不等待hook目标回调,直接流转
- flag, transition_queryset = workflow_transition_service_ins.get_state_transition_queryset(state_id)
- transition_id = transition_queryset[0].id # hook状态只支持一个流转
- new_request_dict = {}
- new_request_dict.update({'transition_id': transition_id, 'suggestion': msg, 'username': 'loonrobot'})
- # 执行流转
- flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, new_request_dict, by_hook=True)
- if not flag:
- return False, msg
- return True, ''
- hook_result_msg = result.get('msg')
- ticket_base_service_ins.update_ticket_field_value(ticket_id, {'script_run_last_result': False})
- ticket_base_service_ins.add_ticket_flow_log(
- dict(ticket_id=ticket_id, transition_id=0, suggestion=hook_result_msg,
- participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
- participant='hook', state_id=state_id, ticket_data=all_ticket_data_json, creator='loonrobot'))
|