123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- # from __future__ import absolute_import, unicode_literals
- import contextlib
- import os
- import sys
- import traceback
- import logging
- from celery import Celery
- # set the default Django settings module for the 'celery' program.
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings.config')
- import django
- django.setup()
- app = Celery('loonflow')
- # Using a string here means the worker doesn't have to serialize
- # the configuration object to child processes.
- # - namespace='CELERY' means all celery-related configuration keys
- # should have a `CELERY_` prefix.
- app.config_from_object('django.conf:settings', namespace='CELERY')
- # Load task modules from all registered Django app configs.
- app.autodiscover_tasks()
- import json
- import requests
- from apps.ticket.models import TicketRecord
- from apps.workflow.models import Transition, State, WorkflowScript, Workflow, CustomNotice
- from service.account.account_base_service import account_base_service_ins
- from service.common.constant_service import constant_service_ins
- from service.ticket.ticket_base_service import TicketBaseService, ticket_base_service_ins
- from service.common.common_service import CommonService, common_service_ins
- from service.workflow.workflow_transition_service import WorkflowTransitionService, workflow_transition_service_ins
- from django.conf import settings
- try:
- from StringIO import StringIO
- except ImportError:
- from io import StringIO
- logger = logging.getLogger('django')
- @app.task(bind=True)
- def debug_task(self):
- print('Request: {0!r}'.format(self.request))
- @app.task
- def test_task(a, b):
- print('a:', a)
- print('b:', b)
- print(a+b)
- @contextlib.contextmanager
- def stdoutIO(stdout=None):
- old = sys.stdout
- if stdout is None:
- try:
- # for python2
- stdout = StringIO.StringIO()
- except Exception:
- stdout = StringIO()
- sys.stdout = stdout
- yield stdout
- sys.stdout = old
- @app.task
- def run_flow_task(ticket_id, script_id_str, state_id, action_from='loonrobot'):
- """
- 执行工作流脚本
- :param script_id_star:通过脚本id来执行, 保存的是字符串
- :param ticket_id:
- :param state_id:
- :param action_from:
- :return:
- """
- script_id = int(script_id_str)
- ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
- if ticket_obj.participant == script_id_str and ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROBOT:
- ## 校验脚本是否合法
- # 获取脚本名称
- script_obj = WorkflowScript.objects.filter(id=script_id, is_deleted=False, is_active=True).first()
- if not script_obj:
- return False, '脚本未注册或非激活状态'
- script_file = os.path.join(settings.MEDIA_ROOT, script_obj.saved_name.name)
- globals = {'ticket_id': ticket_id, 'action_from': action_from}
- # 如果需要脚本执行完成后,工单不往下流转(也就脚本执行失败或调用其他接口失败的情况),需要在脚本中抛出异常
- try:
- with stdoutIO() as s:
- # execfile(script_file, globals) # for python 2
- exec(open(script_file, encoding='utf-8').read(), globals)
- script_result = True
- # script_result_msg = ''.join(s.buflist)
- script_result_msg = ''.join(s.getvalue())
- except Exception as e:
- logger.error(traceback.format_exc())
- script_result = False
- script_result_msg = e.__str__()
- logger.info('*' * 20 + '工作流脚本回调,ticket_id:[%s]' % ticket_id + '*' * 20)
- logger.info('*******工作流脚本回调,ticket_id:{}*****'.format(ticket_id))
- # 因为上面的脚本执行时间可能会比较长,为了避免db session失效,重新获取ticket对象
- ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=False).first()
- # 新增处理记录,脚本后只允许只有一个后续直连状态
- transition_obj = Transition.objects.filter(source_state_id=state_id, is_deleted=False).first()
- new_ticket_flow_dict = dict(ticket_id=ticket_id, transition_id=transition_obj.id,
- suggestion=script_result_msg, participant_type_id=constant_service_ins.PARTICIPANT_TYPE_ROBOT,
- participant='脚本:(id:{}, name:{})'.format(script_obj.id, script_obj.name), state_id=state_id, creator='loonrobot')
- ticket_base_service_ins.add_ticket_flow_log(new_ticket_flow_dict)
- if not script_result:
- # 脚本执行失败,状态不更新,标记任务执行结果
- ticket_obj.script_run_last_result = False
- ticket_obj.save()
- return False, script_result_msg
- # 自动执行流转
- flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, dict(username='loonrobot',
- suggestion='脚本执行完成后自行流转',
- transition_id=transition_obj.id), False, True)
- if flag:
- logger.info('******脚本执行成功,工单基础信息更新完成, ticket_id:{}******'.format(ticket_id))
- return flag, msg
- else:
- return False, '工单当前处理人为非脚本,不执行脚本'
- @app.task
- def timer_transition(ticket_id, state_id, date_time, transition_id):
- """
- 定时器流转
- :param ticket_id:
- :param state_id:
- :param date_time:
- :param transition_id:
- :return:
- """
- # 需要满足工单此状态后续无其他操作才自动流转
- # 查询该工单此状态所有操作
- # flow_log_set, msg = TicketBaseService().get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
- flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot', per_page=1000)
- if flag is False:
- return False, result
- flow_log_list = result.get('ticket_flow_log_restful_list')
- for flow_log in flow_log_list:
- if flow_log.get('state').get('state_id') == state_id and flow_log.get('gmt_created') > date_time:
- return True, '后续有操作,定时器失效'
- # 执行流转
- handle_ticket_data = dict(transition_id=transition_id, username='loonrobot', suggestion='定时器流转')
- ticket_base_service_ins.handle_ticket(ticket_id, handle_ticket_data, True)
- @app.task
- def send_ticket_notice(ticket_id):
- """
- 发送工单通知
- :param ticket_id:
- :return:
- """
- # 获取工单信息
- # 获取工作流信息,获取工作流的通知信息
- # 获取通知信息的标题和内容模板
- # 将通知内容,通知标题,通知人,作为hook的请求参数
- ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
- if not ticket_obj:
- return False, 'ticket is not exist or has been deleted'
- workflow_id = ticket_obj.workflow_id
- workflow_obj = Workflow.objects.filter(id=workflow_id, is_deleted=0).first()
- notices = workflow_obj.notices
- if not notices:
- return True, 'no notice defined'
- notice_str_list = notices.split(',')
- notice_id_list = [int(notice_str) for notice_str in notice_str_list]
- send_notice_result_list = []
- title_template = workflow_obj.title_template
- content_template = workflow_obj.content_template
- # 获取工单所有字段的变量
- flag, ticket_value_info = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- if flag is False:
- return False, ticket_value_info
- title_result = title_template.format(**ticket_value_info)
- content_result = content_template.format(**ticket_value_info)
- # 获取工单最后一条操作记录
- flag, result = ticket_base_service_ins.get_ticket_flow_log(ticket_id, 'loonrobot')
- flow_log_list = result.get('ticket_flow_log_restful_list')
- last_flow_log = flow_log_list[0]
- participant_info_list = []
- participant_username_list = []
- from apps.account.models import LoonUser
- if ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_PERSONAL:
- participant_username_list = [ticket_obj.participant]
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_MULTI:
- participant_username_list = ticket_obj.participant.split(',')
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_ROLE:
- flag, participant_username_list = account_base_service_ins.get_role_username_list(ticket_obj.participant)
- if flag is False:
- return False, participant_username_list
- elif ticket_obj.participant_type_id == constant_service_ins.PARTICIPANT_TYPE_DEPT:
- flag, participant_username_list = account_base_service_ins.get_dept_username_list(ticket_obj.participant)
- if not flag:
- return flag, participant_username_list
- if participant_username_list:
- participant_queryset = LoonUser.objects.filter(username__in=participant_username_list, is_deleted=0)
- for participant_0 in participant_queryset:
- participant_info_list.append(dict(username=participant_0.username, alias=participant_0.alias,
- phone=participant_0.phone, email=participant_0.email))
- params = {'title_result': title_result, 'content_result': content_result,
- 'participant': ticket_obj.participant, 'participant_type_id': ticket_obj.participant_type_id,
- 'multi_all_person': ticket_obj.multi_all_person, 'ticket_value_info': ticket_value_info,
- 'last_flow_log': last_flow_log, 'participant_info_list': participant_info_list}
- for notice_id in notice_id_list:
- notice_obj = CustomNotice.objects.filter(id=notice_id, is_deleted=0).first()
- if not notice_obj:
- continue
- hook_url = notice_obj.hook_url
- from service.workflow.workflow_base_service import workflow_base_service_ins
- flag, msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
- if not flag:
- return False, msg
- hook_token = notice_obj.hook_token
- # gen signature
- flag, headers = common_service_ins.gen_signature_by_token(hook_token)
- try:
- r = requests.post(hook_url, headers=headers, json=params)
- result = r.json()
- if result.get('code') == 0:
- send_notice_result_list.append(dict(notice_id=notice_id, result='success', msg=result.get('msg', '')))
- else:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=result.get('msg', '')))
- except Exception as e:
- send_notice_result_list.append(dict(notice_id=notice_id, result='fail', msg=e.__str__()))
- return True, dict(send_notice_result_list=send_notice_result_list)
- @app.task
- def flow_hook_task(ticket_id):
- """
- hook 任务
- :param ticket_id:
- :return:
- """
- # 查询工单状态
- ticket_obj = TicketRecord.objects.filter(id=ticket_id, is_deleted=0).first()
- state_id = ticket_obj.state_id
- state_obj = State.objects.filter(id=state_id, is_deleted=0).first()
- participant_type_id = state_obj.participant_type_id
- if participant_type_id != constant_service_ins.PARTICIPANT_TYPE_HOOK:
- return False, ''
- hook_config = state_obj.participant
- hook_config_dict = json.loads(hook_config)
- hook_url = hook_config_dict.get('hook_url')
- hook_token = hook_config_dict.get('hook_token')
- wait = hook_config_dict.get('wait')
- extra_info = hook_config_dict.get('extra_info')
- from service.workflow.workflow_base_service import workflow_base_service_ins
- hook_check_flag, hook_result_msg = workflow_base_service_ins.hook_host_valid_check(hook_url)
- flag, msg = common_service_ins.gen_hook_signature(hook_token)
- if not flag:
- hook_check_flag, hook_result_msg = flag, msg
- flag, all_ticket_data = ticket_base_service_ins.get_ticket_all_field_value(ticket_id)
- flag, all_field_value_result = ticket_base_service_ins.get_ticket_all_field_value_json(ticket_id)
- all_ticket_data_json = all_field_value_result.get('all_field_value_json')
- if hook_check_flag:
- if extra_info is not None:
- all_ticket_data.update(dict(extra_info=extra_info))
- try:
- r = requests.post(hook_url, headers=msg, json=all_ticket_data, timeout=10)
- result = r.json()
- except Exception as e:
- result = dict(code=-1, msg=e.__str__())
- if result.get('code') == 0:
- # 调用成功
- TicketBaseService().add_ticket_flow_log(dict(ticket_id=ticket_id, transition_id=0,
- suggestion=result.get('msg'),
- participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
- participant='hook', state_id=state_id,
- intervene_type_id=constant_service_ins.TRANSITION_INTERVENE_TYPE_HOOK,
- ticket_data=all_ticket_data_json,
- creator='loonrobot'
- ))
- if not wait:
- # 不等待hook目标回调,直接流转
- flag, transition_queryset = workflow_transition_service_ins.get_state_transition_queryset(state_id)
- transition_id = transition_queryset[0].id # hook状态只支持一个流转
- new_request_dict = {}
- new_request_dict.update({'transition_id': transition_id, 'suggestion': msg, 'username': 'loonrobot'})
- # 执行流转
- flag, msg = ticket_base_service_ins.handle_ticket(ticket_id, new_request_dict, by_hook=True)
- if not flag:
- return False, msg
- return True, ''
- hook_result_msg = result.get('msg')
- ticket_base_service_ins.update_ticket_field_value(ticket_id, {'script_run_last_result': False})
- ticket_base_service_ins.add_ticket_flow_log(
- dict(ticket_id=ticket_id, transition_id=0, suggestion=hook_result_msg,
- participant_type_id=constant_service_ins.PARTICIPANT_TYPE_HOOK,
- participant='hook', state_id=state_id, ticket_data=all_ticket_data_json, creator='loonrobot'))
|