Activiti工作流引擎的使用、思考与总结(上)

简介: Activiti工作流引擎的使用、思考与总结

从业务角度来说,至少需要满足以下功能:

1.查询待办事项列表

2.待办事项的办理

3.查看已办历史

从技术角度来说:

1.activiti引擎提供的api功能过于分散,对于开发人员来说,没有一个统一的接口。最终导致开发效率不高,代码维护困难。

2.画流程图不确定性比较多,节点名称和监听器设置没有统一规范。同样也会导致效率问题。

用请假的流程来举例,首先,看一下流程图怎么画?

针对于画图,我们先定一个小规范,要求所有的节点上的Listeners全部指定为一个我们的自定义listener,无论是ExecutionListener还是TaskListener,全部指定为一个固定的自定义listener。

// 全部统一设置成ActivitiListener监听器
public class ActivitiListener implements TaskListener, ExecutionListener {
    private static final long serialVersionUID = 6071133014325140738L;
    // 用户任务监听器被触发的话,就会调用下面的notify方法
    @Override
    public void notify(DelegateTask delegateTask) {
        // 获取当前任务的taskDefinitionKey,其实就是节点id
        String taskDefinitionKey = delegateTask.getTaskDefinitionKey();
        // 通过taskDefinitionKey和目标类型去IOC容器中查找指定的Bean
        UserTaskListener userTaskHandler =
                ApplicationContextUtil.getBean(taskDefinitionKey, UserTaskListener.class);
        if (Objects.nonNull(userTaskHandler)) {
            // 执行目标Bean中的方法
            userTaskHandler.notify(delegateTask);
        }
    }
    // ExecutionListener回调方法
    @Override
    public void notify(DelegateExecution execution) {
        // 获取IOC容器
        ApplicationContext webApplicationContext = ApplicationContextUtil.getApplicationContext();
        // 从容器中查找目标类型Bean列表
        Map<String, AbstractElement> beans =
                webApplicationContext.getBeansOfType(AbstractElement.class);
        if (CollectionUtils.isEmpty(beans)) {
            return;
        }
        // 通过activityId查找符合要求的Bean实例。在bpmn中,其实就是元素的id。
        String activityId = execution.getCurrentActivityId();
        AbstractElement abstractElement = beans.get(activityId);
        if (Objects.nonNull(abstractElement)) {
            // 执行目标Bean的notify方法
            abstractElement.notify(execution);
            return;
        }
        // 如果通过activityId找不到,就通过processDefinitionKey查找目标Bean
        String processDefinitionKey =
                CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
        abstractElement = beans.get(processDefinitionKey);
        if (Objects.nonNull(abstractElement)) {
            // 执行目标Bean的notify方法
            abstractElement.notify(execution);
        }
    }
}
复制代码

按照上述设定,那么一旦触发ExecutionListener就会去IOC容器中找AbstractElement类,我们来看一下这个类:

// 这个接口是需要所有UserTask来实现的
public interface UserTaskListener {
    /** @param delegateTask */
    void notify(DelegateTask delegateTask);
}
// 把所有的节点都抽象成AbstractElement,主要在于需要让每一个节点都是ExecutionListener的子类
// 这个设计的灵感来自于bpmn流程图里面任何组件元素都可以指定ExecutionListener,那意味着我如果想要统一规范所有的组件元素的Listener为ActivitiListener的话,那么我就需要把所有的组件都抽象成一个目标,这样我在ActivitiListener的实现里面就可以通过一定的规则找到目标组件的实现了。
// 最终,通过总结,要求所有元素的id都作为Bean的名字,也就是说,通过这个元素的id可以在IOC容器中找到对应的Bean实例
public abstract class AbstractElement extends AbstractActivitiServiceImpl
        implements ExecutionListener {
    private static final long serialVersionUID = 4836235578847862052L;
    @Autowired private ActivitiBusinessDataRepository activitiBusinessDataRepository;
    @Transactional(rollbackFor = Exception.class)
    @Override
    public void notify(DelegateExecution execution) {
        // 如果当前对象不支持,就直接结束
        if (!support(execution)) {
            return;
        }
        Map<String, Object> variables = Maps.newHashMap();
        Map<String, Object> data = Maps.newHashMap();
        // 执行前
        beforeExecute(execution, variables, data);
        // 真正的执行目标代码
        execute(execution, variables, data);
        // 执行后
        afterExecute(execution, data);
    }
    /**
     * 执行业务
     *
     * @param execution
     * @param variables
     * @param data
     */
    protected abstract void execute(
            DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data);
    /**
     * 后置处理
     *
     * @param execution
     * @param data
     */
    protected abstract void afterExecute(DelegateExecution execution, Map<String, Object> data);
    /**
     * @param execution
     * @return
     */
    protected boolean support(DelegateExecution execution) {
        return false;
    }
    /**
     * 读取前置系统参数
     *
     * @param execution
     * @param variables
     * @param data
     */
    protected void beforeExecute(
            DelegateExecution execution, Map<String, Object> variables, Map<String, Object> data) {
        String taskDefinitionKey = execution.getCurrentActivityId();
        String processDefinitionKey =
                CastUtil.castString(execution.getVariable(Const.PROCESS_DEFINITION_KEY));
        // 从数据库里面把自定义配置读取到variables里面。如果没有这种需要的话,可以去掉。
        activitiBusinessDataRepository.readProcessInstanceVariable(
                processDefinitionKey, taskDefinitionKey, variables);
        if (!CollectionUtils.isEmpty(variables)) {
            for (Map.Entry<String, Object> e : variables.entrySet()) {
                String name = e.getKey();
                Object value = e.getValue();
                execution.setVariable(e.getKey(), e.getValue());
                data.put(name, value);
            }
        }
    }
    protected void saveData(DelegateExecution execution, Map<String, Object> data) {
        Timestamp now = new Timestamp(System.currentTimeMillis());
        String id = execution.getId();
        String taskDefinitionKey = execution.getCurrentActivityId();
        String name = execution.getCurrentFlowElement().getName();
        String processInstanceId = execution.getProcessInstanceId();
        String businessKey = execution.getProcessInstanceBusinessKey();
        TaskNodeInfo taskNodeInfo = TaskNodeInfo.newInstance(id, name, taskDefinitionKey);
        // 自动保存的流程节点数据;把一些我认为一定要保存的数据自动保存下来,也就是说,不依赖于开发人员,我自己认为一定要保存的数据,比如businessKey一定要保存
        List<FieldInfo> autoSaveTaskNodeVariables =
                activitiBusinessDataRepository.getAutoSaveTaskNodeVariables(
                        Executor.defaultExecutor(), businessKey, data);
        // processInstanceData是抽象方法,需要每一个子类实现;返回的数据是当前流程实例需要保存的信息,以便后续节点可以使用
        List<FieldInfo> processInstanceDataList = processInstanceData(execution, data);
        // taskNodeData是抽象方法,子类需要实现;返回的数据是当前节点的快照信息
        List<FieldInfo> taskNodeDataList = taskNodeData(execution, data);
        // 记录流程实例数据;保存当前流程实例数据
        activitiBusinessDataRepository.saveProcessInstanceData(
                processInstanceDataList, null, processInstanceId, businessKey, now);
        // 记录节点数据;把当前节点的相关信息保存起来
        activitiBusinessDataRepository.saveTaskNodeData(
                autoSaveTaskNodeVariables, taskNodeDataList, processInstanceId, taskNodeInfo, now);
        // 自动记录节点历史,不需要开发者操心
        // 一般历史数据大概可以归纳为时间、节点、任务、流程实例ID,附加信息等等,所以花点心思总结一下,应该是可以固定下来的,所以就不需要子类去实现
        activitiBusinessDataRepository.saveHistory(
                businessKey,
                processInstanceId,
                Executor.defaultExecutor(),
                taskNodeInfo,
                data,
                now);
    }
    // 返回的数据是当前流程实例需要保存的信息
    protected abstract List<FieldInfo> processInstanceData(
            DelegateExecution execution, Map<String, Object> data);
    // 返回的数据是当前节点的快照信息
    protected abstract List<FieldInfo> taskNodeData(
            DelegateExecution execution, Map<String, Object> data);
    /**
     * 读取变量参数
     *
     * @param processDefinitionKey
     * @param taskDefinitionKey
     * @param variables
     */
    protected void readTransactorAndSendNoticeVariable(
            String processDefinitionKey, String taskDefinitionKey, Map<String, Object> variables) {
        activitiBusinessDataRepository.readTransactorAndSendNoticeVariable(
                processDefinitionKey, taskDefinitionKey, variables);
    }
}
复制代码

综上所述,我们总结两点规范:

  • 1.所有的Listener都是ActivitiListener
  • 2.所有的组件id都作为Bean的名字,通过组件id去IOC容器中查找目标Bean;如果找不到,就通过processDefinitionKey查找目标Bean

关于AbstractActivitiServiceImpl这个类,其实就是集成了activiti里面一些常用的api,我认为这是大多数组件都会调用的,所以特地放在这个抽象类里面,以便所有AbstractElement的子类都可以使用里面的方法。只和activiti引擎相关数据表打交道的逻辑可以放在这个类里面实现。

public abstract class AbstractActivitiServiceImpl
        implements IActivitiService, ApplicationContextAware {
    @Autowired protected TaskService taskService;
    @Autowired protected FormService formService;
    @Autowired protected HistoryService historyService;
    @Autowired protected IdentityService identityService;
    @Autowired protected ManagementService managementService;
    @Autowired protected RepositoryService repositoryService;
    @Autowired protected RuntimeService runtimeService;
    @Autowired private IActRunTaskRepository actRunTaskRepository;
    private static ApplicationContext APPLICATION_CONTEXT;
    protected ApplicationContext applicationContext() {
        return APPLICATION_CONTEXT;
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        APPLICATION_CONTEXT = applicationContext;
    }
    /**
     * 统计指定用户的待办事项数量
     *
     * @param userIds
     * @return
     */
    public List<ActivitiCountGroupByResult> countUnassignedTaskGroupByUserId(List<String> userIds) {
        List<Map<String, Object>> list = actRunTaskRepository.countRunTaskForEachOne(userIds);
        if (CollectionUtils.isEmpty(list)) {
            return null;
        }
        Map<String, Integer> userTaskMap = Maps.newHashMap();
        for (Map<String, Object> e : list) {
            String userId = CastUtil.castString(e.get("userId"));
            Integer taskNum = CastUtil.castInt(e.get("taskNum"));
            userTaskMap.put(userId, taskNum);
        }
        List<ActivitiCountGroupByResult> result = Lists.newArrayList();
        for (String userId : userIds) {
            Integer taskNum = userTaskMap.getOrDefault(userId, 0);
            result.add(new ActivitiCountGroupByResult(userId, taskNum));
        }
        return result;
    }
    /**
     * 待办事项最少的人
     *
     * @param userIds
     * @return
     */
    public ActivitiCountGroupByResult minUnassignedTask(List<String> userIds) {
        return countUnassignedTask(userIds, false);
    }
    /**
     * 待办事项最多的人
     *
     * @param userIds
     * @return
     */
    public ActivitiCountGroupByResult maxUnassignedTask(List<String> userIds) {
        return countUnassignedTask(userIds, true);
    }
    /**
     * 待办事项最少或最多的人
     *
     * @param userIds
     * @return
     */
    private ActivitiCountGroupByResult countUnassignedTask(List<String> userIds, boolean isMax) {
        List<ActivitiCountGroupByResult> list = countUnassignedTaskGroupByUserId(userIds);
        if (CollectionUtils.isEmpty(list)) {
            // 都没有待办事项
            return null;
        }
        if (isMax) {
            return Collections.max(
                    list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
        } else {
            return Collections.min(
                    list, Comparator.comparingInt(ActivitiCountGroupByResult::getNum));
        }
    }
    @Override
    public Task queryUnassignedTask(String taskId) {
        return taskService.createTaskQuery().taskId(taskId).singleResult();
    }
    /**
     * 通过taskId查询task
     *
     * @param taskId
     * @return
     */
    @Override
    public Task queryUnassignedTask(String taskId, String userId) {
        return taskService
                .createTaskQuery()
                .taskId(taskId)
                .taskCandidateOrAssigned(userId)
                .singleResult();
    }
    /**
     * 查询待办任务
     *
     * @param userId
     * @return
     */
    @Override
    public List<Task> queryUnassignedTaskByUser(
            String userId, List<String> processInstaceIds, final int page, int size) {
        int size_val = resetSize(size);
        int firstResult = countFirstResult(page, size_val);
        TaskQuery taskQuery = taskService.createTaskQuery().taskCandidateOrAssigned(userId);
        if (!CollectionUtils.isEmpty(processInstaceIds)) {
            taskQuery = taskQuery.processInstanceIdIn(processInstaceIds);
        }
        return taskQuery.orderByTaskCreateTime().desc().listPage(firstResult, size_val);
    }
    /**
     * 计算第一条结果的索引
     *
     * @param page
     * @param size
     * @return
     */
    private int countFirstResult(final int page, final int size) {
        int startIndex = 1;
        // 如果减1后还小于等于0,就重置为0(因为达成统一规定:分页的第一页page指定从1开始,但是jpa中是从0开始)
        startIndex = page - startIndex;
        int page_val = startIndex <= 0 ? 0 : startIndex;
        return page_val * size;
    }
    /**
     * 计算每页大小
     *
     * @param size
     * @return
     */
    private int resetSize(final int size) {
        return size <= 0 ? 10 : size;
    }
    /**
     * 查询已办任务
     *
     * @param userId
     * @param page
     * @param size
     * @return
     */
    @Override
    public List<HistoricTaskInstance> queryDoneTasks(String userId, int page, int size) {
        int size_val = resetSize(size);
        int firstResult = countFirstResult(page, size_val);
        return historyService
                .createHistoricTaskInstanceQuery()
                .taskAssignee(userId)
                .finished()
                .orderByTaskCreateTime()
                .desc()
                .listPage(firstResult, size_val);
    }
    /**
     * 统计待办事项数量
     *
     * @param userId
     * @return
     */
    @Override
    public long countUnassignedTask(String userId) {
        return taskService.createTaskQuery().taskCandidateOrAssigned(userId).count();
    }
    /**
     * 统计已办事项数量
     *
     * @param userId
     * @return
     */
    public long countDoneTask(String userId) {
        return historyService.createHistoricTaskInstanceQuery().taskAssignee(userId).count();
    }
    /**
     * 查询当前流程没有办理人的任务
     *
     * @param processInstanceId
     * @return
     */
    @Override
    public List<Task> currentUnassignedTasks(String processInstanceId) {
        return taskService
                .createTaskQuery()
                .processInstanceId(processInstanceId)
                .taskUnassigned()
                .list();
    }
    /**
     * 查询当前没有被办理的任务
     *
     * @param processInstanceId
     * @return
     */
    @Override
    public Task currentUndoneTask(String processInstanceId) {
        return taskService
                .createTaskQuery()
                .processInstanceId(processInstanceId)
                .singleResult();
    }
}
复制代码

从下面开始,我们就要开始做具体的区分了,开始区分流程实例任务处理器

相关文章
|
存储 SQL 消息中间件
大数据生态圈常用组件(一):数据库、查询引擎、ETL工具、调度工具等
大数据生态圈常用组件(一):数据库、查询引擎、ETL工具、调度工具等
|
7月前
|
运维 Serverless 文件存储
Serverless 应用引擎产品使用合集之是基于什么逻辑运行的
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
8月前
|
人工智能 供应链 监控
推荐一款TinyEngine低代码引擎!支持自定义DSL 生成定制的源码、跨技术栈!
推荐一款TinyEngine低代码引擎!支持自定义DSL 生成定制的源码、跨技术栈!
150 0
|
XML Java 关系型数据库
Activiti工作流引擎介绍
Activiti工作流引擎介绍
|
8月前
|
边缘计算 安全 算法
阿里云丁玉杰:构建全场景服务引擎
2023全球边缘计算大会·上海站,阿里云边缘云演讲分享
194 0
|
数据库
易搭工作流引擎用是什么开源 还是阿里自研产品,零代码平台场景页面映射数据库表是动态创建,采用什么框架处理,怎么让系统产生高并发能力。易搭权限有没有了解,求解。
易搭工作流引擎用是什么开源 还是阿里自研产品,零代码平台场景页面映射数据库表是动态创建,采用什么框架处理,怎么让系统产生高并发能力。易搭权限有没有了解,求解。
|
存储 XML SQL
Activiti工作流框架学习笔记(一)之通用数据表详细介绍
Activiti工作流框架学习笔记(一)之通用数据表详细介绍
539 1
|
存储 XML Java
Activiti工作流
Activiti工作流
159 0
|
存储 XML Java
Activiti工作流与业务整合实战
Activiti工作流与业务整合实战
558 0
Activiti工作流与业务整合实战
|
XML 存储 物联网
OneCode低代码引擎-流程引擎白皮书
在低代码应用中,应用比例非常高的一种应用便是以流程+表单驱动为模型的各种审批类引用。但流程在低代码平台中的应用绝不是简简单单的流程+表单的模型。而是站在更高的层次上在自然时间轴为基础的维度上,将事件、数据、响应、人工交互等因素进行特定场景下的编排逻辑处理。
OneCode低代码引擎-流程引擎白皮书