前言

在数字化转型的浪潮中,工作流程自动化已成为企业提升效率的核心驱动力。从简单的邮件通知到复杂的业务流程编排,工作流程构建器正在重新定义我们处理重复性任务的方式。本文将深入解析一个完整的工作流程构建器系统,从可视化设计器到后端执行引擎,为读者提供一个全面的技术实现指南。

与市面上的理论性文章不同,本文将提供完整的代码实现,每个功能模块都有详细的技术细节和实际可运行的代码示例。通过本文,读者将掌握如何构建一个类似Zapier、Microsoft Power Automate的企业级工作流程自动化平台。

核心功能概览

我们将要构建的工作流程构建器包含以下核心功能:

前端可视化设计器

  • 基于React Flow的拖拽式流程设计
  • 自定义节点组件系统
  • 实时连接验证和路径规划
  • 配置面板和属性编辑器

后端执行引擎

  • 异步流程执行引擎
  • 节点注册表和插件系统
  • 第三方服务集成框架
  • 实时状态监控和日志记录

第三方集成能力

  • Slack消息通知
  • GitHub仓库操作
  • 邮件发送服务
  • HTTP API调用

一、系统架构设计

(一)整体技术架构

我们的工作流程构建器采用前后端分离的架构设计,确保系统的可扩展性和维护性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────┐
│ 前端架构层 │
├─────────────────────────────────────────────────────────────┤
│ React + TypeScript + React Flow + Ant Design + Zustand │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 可视化设计器 │ │ 配置面板 │ │ 执行监控 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

WebSocket + REST API

┌─────────────────────────────────────────────────────────────┐
│ 后端架构层 │
├─────────────────────────────────────────────────────────────┤
│ Node.js + TypeScript + Express + MongoDB + Redis │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ API网关 │ │ 执行引擎 │ │ 连接器管理 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 任务队列 │ │ 状态管理 │ │ 日志系统 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘

第三方服务集成

┌─────────────────────────────────────────────────────────────┐
│ Slack API │ GitHub API │ SendGrid │ Custom HTTP APIs │
└─────────────────────────────────────────────────────────────┘

(二)核心数据模型

工作流程定义模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
interface WorkflowDefinition {
id: string;
name: string;
description: string;
version: string;
status: 'draft' | 'active' | 'inactive';

// 节点定义
nodes: WorkflowNode[];

// 连接定义
edges: WorkflowEdge[];

// 触发器配置
trigger: {
type: 'webhook' | 'schedule' | 'manual';
config: any;
};

// 全局变量
variables: Record<string, any>;

// 元数据
metadata: {
createdAt: Date;
updatedAt: Date;
createdBy: string;
tags: string[];
};
}

interface WorkflowNode {
id: string;
type: string;
position: { x: number; y: number };
data: {
label: string;
config: any;
inputs: Record<string, any>;
outputs: Record<string, any>;
};
}

interface WorkflowEdge {
id: string;
source: string;
target: string;
sourceHandle?: string;
targetHandle?: string;
type?: string;
}

执行实例模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
interface WorkflowInstance {
id: string;
workflowId: string;
status: 'running' | 'completed' | 'failed' | 'cancelled';

// 触发数据
triggerData: any;

// 执行日志
executionLog: ExecutionLogEntry[];

// 时间信息
startTime: Date;
endTime?: Date;
duration?: number;

// 错误信息
error?: {
message: string;
stack?: string;
nodeId?: string;
};
}

interface ExecutionLogEntry {
nodeId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
startTime: Date;
endTime?: Date;
input?: any;
output?: any;
error?: string;
}

二、前端可视化设计器实现

(一)React Flow集成与基础设置

首先,我们需要搭建基于React Flow的可视化设计器基础框架:

项目初始化和依赖安装

1
2
3
4
5
6
7
8
9
10
11
# 创建React项目
npx create-react-app workflow-builder --template typescript
cd workflow-builder

# 安装核心依赖
npm install reactflow antd zustand @tanstack/react-query
npm install axios socket.io-client
npm install @types/node --save-dev

# 安装图标库
npm install @ant-design/icons

主设计器组件实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
// src/components/WorkflowDesigner.tsx
import React, { useCallback, useState, useRef } from 'react';
import ReactFlow, {
Node,
Edge,
addEdge,
Connection,
useNodesState,
useEdgesState,
Controls,
MiniMap,
Background,
BackgroundVariant,
ReactFlowProvider,
ReactFlowInstance,
} from 'reactflow';
import { Layout, Button, message, Drawer } from 'antd';
import { SaveOutlined, PlayCircleOutlined, SettingOutlined } from '@ant-design/icons';

// 导入自定义组件
import NodePanel from './panels/NodePanel';
import ConfigPanel from './panels/ConfigPanel';
import ExecutionPanel from './panels/ExecutionPanel';
import { useWorkflowStore } from '../stores/workflowStore';
import { useSocketConnection } from '../hooks/useSocketConnection';

// 导入自定义节点类型
import TriggerNode from './nodes/TriggerNode';
import ActionNode from './nodes/ActionNode';
import ConditionNode from './nodes/ConditionNode';
import TransformNode from './nodes/TransformNode';

import 'reactflow/dist/style.css';
import './WorkflowDesigner.css';

const { Sider, Content } = Layout;

// 注册自定义节点类型
const nodeTypes = {
trigger: TriggerNode,
action: ActionNode,
condition: ConditionNode,
transform: TransformNode,
};

// 初始节点和边
const initialNodes: Node[] = [];
const initialEdges: Edge[] = [];

const WorkflowDesigner: React.FC = () => {
const [nodes, setNodes, onNodesChange] = useNodesState(initialNodes);
const [edges, setEdges, onEdgesChange] = useEdgesState(initialEdges);
const [selectedNode, setSelectedNode] = useState<Node | null>(null);
const [configDrawerOpen, setConfigDrawerOpen] = useState(false);
const [executionDrawerOpen, setExecutionDrawerOpen] = useState(false);

const reactFlowWrapper = useRef<HTMLDivElement>(null);
const [reactFlowInstance, setReactFlowInstance] = useState<ReactFlowInstance | null>(null);

// 状态管理
const {
currentWorkflow,
saveWorkflow,
executeWorkflow,
isLoading,
executionStatus
} = useWorkflowStore();

// WebSocket连接用于实时状态更新
const { isConnected } = useSocketConnection();

// 处理节点连接
const onConnect = useCallback(
(params: Connection) => {
// 验证连接的有效性
if (!isValidConnection(params, nodes)) {
message.error('无效的连接:不能创建循环或连接到相同类型的端口');
return;
}

const newEdge: Edge = {
...params,
id: `edge-${params.source}-${params.target}`,
type: 'smoothstep',
animated: true,
style: { stroke: '#1890ff', strokeWidth: 2 },
};

setEdges((eds) => addEdge(newEdge, eds));
},
[setEdges, nodes]
);

// 处理节点选择
const onNodeClick = useCallback((event: React.MouseEvent, node: Node) => {
setSelectedNode(node);
setConfigDrawerOpen(true);
}, []);

// 处理拖拽添加节点
const onDragOver = useCallback((event: React.DragEvent) => {
event.preventDefault();
event.dataTransfer.dropEffect = 'move';
}, []);

const onDrop = useCallback(
(event: React.DragEvent) => {
event.preventDefault();

if (!reactFlowWrapper.current || !reactFlowInstance) {
return;
}

const reactFlowBounds = reactFlowWrapper.current.getBoundingClientRect();
const type = event.dataTransfer.getData('application/reactflow');

if (!type) {
return;
}

const position = reactFlowInstance.project({
x: event.clientX - reactFlowBounds.left,
y: event.clientY - reactFlowBounds.top,
});

const newNode: Node = {
id: generateNodeId(),
type,
position,
data: {
label: getNodeLabel(type),
config: getDefaultNodeConfig(type),
isConfigured: false,
},
};

setNodes((nds) => nds.concat(newNode));
},
[reactFlowInstance, setNodes]
);

// 保存工作流程
const handleSave = async () => {
try {
// 验证工作流程
const validation = validateWorkflow(nodes, edges);
if (!validation.isValid) {
message.error(`工作流程验证失败: ${validation.errors.join(', ')}`);
return;
}

const workflowData = {
nodes,
edges,
name: currentWorkflow?.name || '未命名工作流程',
description: currentWorkflow?.description || '',
version: '1.0.0',
};

await saveWorkflow(workflowData);
message.success('工作流程保存成功');
} catch (error) {
message.error('保存失败: ' + (error as Error).message);
}
};

// 执行工作流程
const handleExecute = async () => {
try {
if (!currentWorkflow?.id) {
message.warning('请先保存工作流程');
return;
}

// 验证工作流程配置
const validation = validateWorkflowExecution(nodes, edges);
if (!validation.isValid) {
message.error(`执行验证失败: ${validation.errors.join(', ')}`);
return;
}

await executeWorkflow(currentWorkflow.id, {});
setExecutionDrawerOpen(true);
message.success('工作流程开始执行');
} catch (error) {
message.error('执行失败: ' + (error as Error).message);
}
};

// 更新节点配置
const handleNodeUpdate = useCallback((updatedNode: Node) => {
setNodes((nds) =>
nds.map((node) =>
node.id === updatedNode.id ? updatedNode : node
)
);
setSelectedNode(updatedNode);
}, [setNodes]);

return (
<ReactFlowProvider>
<Layout style={{ height: '100vh' }}>
{/* 左侧节点面板 */}
<Sider width={280} theme="light" style={{ borderRight: '1px solid #f0f0f0' }}>
<NodePanel />
</Sider>

{/* 主设计区域 */}
<Content>
{/* 工具栏 */}
<div className="workflow-toolbar">
<div className="toolbar-left">
<Button
type="primary"
icon={<SaveOutlined />}
onClick={handleSave}
loading={isLoading}
>
保存
</Button>
<Button
icon={<PlayCircleOutlined />}
onClick={handleExecute}
loading={isLoading}
disabled={!currentWorkflow?.id}
>
执行
</Button>
</div>

<div className="toolbar-right">
<Button
icon={<SettingOutlined />}
onClick={() => setExecutionDrawerOpen(true)}
disabled={!executionStatus}
>
执行状态
</Button>
<div className="connection-status">
<span className={`status-indicator ${isConnected ? 'connected' : 'disconnected'}`} />
{isConnected ? '已连接' : '未连接'}
</div>
</div>
</div>

{/* React Flow 画布 */}
<div className="reactflow-wrapper" ref={reactFlowWrapper}>
<ReactFlow
nodes={nodes}
edges={edges}
onNodesChange={onNodesChange}
onEdgesChange={onEdgesChange}
onConnect={onConnect}
onNodeClick={onNodeClick}
onDrop={onDrop}
onDragOver={onDragOver}
onInit={setReactFlowInstance}
nodeTypes={nodeTypes}
fitView
attributionPosition="top-right"
>
<Controls />
<MiniMap />
<Background variant={BackgroundVariant.Dots} gap={12} size={1} />
</ReactFlow>
</div>
</Content>

{/* 配置面板抽屉 */}
<Drawer
title="节点配置"
placement="right"
width={400}
open={configDrawerOpen}
onClose={() => setConfigDrawerOpen(false)}
destroyOnClose
>
{selectedNode && (
<ConfigPanel
node={selectedNode}
onUpdate={handleNodeUpdate}
onClose={() => setConfigDrawerOpen(false)}
/>
)}
</Drawer>

{/* 执行状态面板 */}
<Drawer
title="执行状态"
placement="right"
width={500}
open={executionDrawerOpen}
onClose={() => setExecutionDrawerOpen(false)}
>
<ExecutionPanel />
</Drawer>
</Layout>
</ReactFlowProvider>
);
};

// 辅助函数
function generateNodeId(): string {
return `node_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

function getNodeLabel(type: string): string {
const labels = {
trigger: '触发器',
action: '动作',
condition: '条件',
transform: '转换',
};
return labels[type] || '未知节点';
}

function getDefaultNodeConfig(type: string): any {
const configs = {
trigger: { type: 'manual' },
action: { type: 'slack', message: '' },
condition: { operator: 'equals', value: '' },
transform: { script: '' },
};
return configs[type] || {};
}

function isValidConnection(connection: Connection, nodes: Node[]): boolean {
// 检查是否会创建循环
// 检查连接类型是否匹配
// 这里简化实现,实际应该有更复杂的验证逻辑
return connection.source !== connection.target;
}

function validateWorkflow(nodes: Node[], edges: Edge[]): { isValid: boolean; errors: string[] } {
const errors: string[] = [];

// 检查是否有触发器节点
const triggerNodes = nodes.filter(node => node.type === 'trigger');
if (triggerNodes.length === 0) {
errors.push('工作流程必须包含至少一个触发器节点');
}

// 检查节点配置
for (const node of nodes) {
if (!node.data.isConfigured) {
errors.push(`节点 "${node.data.label}" 尚未配置`);
}
}

return { isValid: errors.length === 0, errors };
}

function validateWorkflowExecution(nodes: Node[], edges: Edge[]): { isValid: boolean; errors: string[] } {
const errors: string[] = [];

// 检查所有节点是否已配置
for (const node of nodes) {
if (!node.data.isConfigured) {
errors.push(`节点 "${node.data.label}" 尚未配置完成`);
}
}

return { isValid: errors.length === 0, errors };
}

export default WorkflowDesigner;

这个基础的设计器组件提供了:

  1. 拖拽式节点添加:从左侧面板拖拽节点到画布
  2. 节点连接:通过拖拽创建节点间的连接
  3. 配置管理:点击节点打开配置面板
  4. 工作流程保存和执行:集成了保存和执行功能
  5. 实时状态监控:通过WebSocket连接显示执行状态

(二)自定义节点组件实现

触发器节点组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// src/components/nodes/TriggerNode.tsx
import React from 'react';
import { Handle, Position, NodeProps } from 'reactflow';
import { Card, Tag, Button, Space } from 'antd';
import { PlayCircleOutlined, SettingOutlined, ClockCircleOutlined } from '@ant-design/icons';

interface TriggerNodeData {
label: string;
config: {
type: 'webhook' | 'schedule' | 'manual';
webhookUrl?: string;
cronExpression?: string;
};
isConfigured: boolean;
status?: 'idle' | 'running' | 'success' | 'error';
}

const TriggerNode: React.FC<NodeProps<TriggerNodeData>> = ({
data,
selected,
id
}) => {
const getTriggerIcon = () => {
switch (data.config.type) {
case 'webhook':
return '🔗';
case 'schedule':
return <ClockCircleOutlined />;
case 'manual':
return <PlayCircleOutlined />;
default:
return '▶️';
}
};

const getTriggerDescription = () => {
switch (data.config.type) {
case 'webhook':
return 'Webhook触发';
case 'schedule':
return `定时触发: ${data.config.cronExpression || '未设置'}`;
case 'manual':
return '手动触发';
default:
return '未配置';
}
};

const getStatusColor = () => {
if (!data.isConfigured) return '#faad14';
switch (data.status) {
case 'running': return '#1890ff';
case 'success': return '#52c41a';
case 'error': return '#ff4d4f';
default: return '#52c41a';
}
};

return (
<div className="custom-node trigger-node">
<Card
size="small"
style={{
minWidth: 200,
borderColor: selected ? '#1890ff' : getStatusColor(),
borderWidth: 2,
boxShadow: selected ? '0 0 0 2px rgba(24, 144, 255, 0.2)' : undefined
}}
bodyStyle={{ padding: '12px' }}
>
<div className="node-header">
<Space>
<span style={{ fontSize: '16px' }}>{getTriggerIcon()}</span>
<div>
<div className="node-title">{data.label}</div>
<div className="node-description">{getTriggerDescription()}</div>
</div>
</Space>

<div className="node-status">
<Tag color={data.isConfigured ? 'success' : 'warning'}>
{data.isConfigured ? '已配置' : '未配置'}
</Tag>
</div>
</div>

{data.status === 'running' && (
<div className="node-progress">
<div className="progress-bar" />
</div>
)}
</Card>

{/* 输出连接点 */}
<Handle
type="source"
position={Position.Right}
id="output"
style={{
background: '#1890ff',
border: '2px solid white',
width: 12,
height: 12
}}
/>
</div>
);
};

export default TriggerNode;

动作节点组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// src/components/nodes/ActionNode.tsx
import React from 'react';
import { Handle, Position, NodeProps } from 'reactflow';
import { Card, Tag, Avatar, Space, Progress } from 'antd';
import {
MessageOutlined,
MailOutlined,
GithubOutlined,
ApiOutlined
} from '@ant-design/icons';

interface ActionNodeData {
label: string;
config: {
type: 'slack' | 'email' | 'github' | 'http';
[key: string]: any;
};
isConfigured: boolean;
status?: 'idle' | 'running' | 'success' | 'error';
progress?: number;
}

const ActionNode: React.FC<NodeProps<ActionNodeData>> = ({
data,
selected
}) => {
const getActionIcon = () => {
const iconProps = { style: { color: '#1890ff' } };

switch (data.config.type) {
case 'slack':
return <MessageOutlined {...iconProps} />;
case 'email':
return <MailOutlined {...iconProps} />;
case 'github':
return <GithubOutlined {...iconProps} />;
case 'http':
return <ApiOutlined {...iconProps} />;
default:
return <ApiOutlined {...iconProps} />;
}
};

const getActionName = () => {
switch (data.config.type) {
case 'slack':
return 'Slack消息';
case 'email':
return '发送邮件';
case 'github':
return 'GitHub操作';
case 'http':
return 'HTTP请求';
default:
return '动作';
}
};

const getActionDescription = () => {
switch (data.config.type) {
case 'slack':
return `频道: ${data.config.channel || '未设置'}`;
case 'email':
return `收件人: ${data.config.to || '未设置'}`;
case 'github':
return `仓库: ${data.config.repo || '未设置'}`;
case 'http':
return `${data.config.method || 'GET'} ${data.config.url || '未设置'}`;
default:
return '未配置';
}
};

const getBorderColor = () => {
if (!data.isConfigured) return '#faad14';
if (selected) return '#1890ff';

switch (data.status) {
case 'running': return '#1890ff';
case 'success': return '#52c41a';
case 'error': return '#ff4d4f';
default: return '#d9d9d9';
}
};

return (
<div className="custom-node action-node">
{/* 输入连接点 */}
<Handle
type="target"
position={Position.Left}
id="input"
style={{
background: '#1890ff',
border: '2px solid white',
width: 12,
height: 12
}}
/>

<Card
size="small"
style={{
minWidth: 220,
borderColor: getBorderColor(),
borderWidth: 2,
boxShadow: selected ? '0 0 0 2px rgba(24, 144, 255, 0.2)' : undefined
}}
bodyStyle={{ padding: '12px' }}
>
<div className="node-header">
<Space>
<Avatar
size="small"
icon={getActionIcon()}
style={{ backgroundColor: '#f0f0f0' }}
/>
<div>
<div className="node-title">{getActionName()}</div>
<div className="node-description">{getActionDescription()}</div>
</div>
</Space>

<Tag color={data.isConfigured ? 'success' : 'warning'}>
{data.isConfigured ? '已配置' : '未配置'}
</Tag>
</div>

{/* 执行进度 */}
{data.status === 'running' && data.progress !== undefined && (
<div style={{ marginTop: 8 }}>
<Progress
percent={data.progress}
size="small"
status="active"
showInfo={false}
/>
</div>
)}

{/* 状态指示器 */}
{data.status && data.status !== 'idle' && (
<div className={`status-indicator status-${data.status}`} />
)}
</Card>

{/* 输出连接点 */}
<Handle
type="source"
position={Position.Right}
id="output"
style={{
background: '#1890ff',
border: '2px solid white',
width: 12,
height: 12
}}
/>
</div>
);
};

export default ActionNode;

条件节点组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// src/components/nodes/ConditionNode.tsx
import React from 'react';
import { Handle, Position, NodeProps } from 'reactflow';
import { Card, Tag, Space } from 'antd';
import { BranchesOutlined } from '@ant-design/icons';

interface ConditionNodeData {
label: string;
config: {
field: string;
operator: 'equals' | 'not_equals' | 'greater_than' | 'less_than' | 'contains';
value: any;
};
isConfigured: boolean;
status?: 'idle' | 'running' | 'success' | 'error';
lastResult?: boolean;
}

const ConditionNode: React.FC<NodeProps<ConditionNodeData>> = ({
data,
selected
}) => {
const getOperatorText = () => {
const operators = {
equals: '等于',
not_equals: '不等于',
greater_than: '大于',
less_than: '小于',
contains: '包含'
};
return operators[data.config.operator] || '未设置';
};

const getConditionDescription = () => {
if (!data.isConfigured) return '未配置条件';

return `${data.config.field} ${getOperatorText()} ${data.config.value}`;
};

const getBorderColor = () => {
if (!data.isConfigured) return '#faad14';
if (selected) return '#1890ff';

switch (data.status) {
case 'running': return '#1890ff';
case 'success': return '#52c41a';
case 'error': return '#ff4d4f';
default: return '#722ed1';
}
};

return (
<div className="custom-node condition-node">
{/* 输入连接点 */}
<Handle
type="target"
position={Position.Left}
id="input"
style={{
background: '#722ed1',
border: '2px solid white',
width: 12,
height: 12
}}
/>

<Card
size="small"
style={{
minWidth: 200,
borderColor: getBorderColor(),
borderWidth: 2,
boxShadow: selected ? '0 0 0 2px rgba(114, 46, 209, 0.2)' : undefined
}}
bodyStyle={{ padding: '12px' }}
>
<div className="node-header">
<Space>
<BranchesOutlined style={{ color: '#722ed1', fontSize: '16px' }} />
<div>
<div className="node-title">条件判断</div>
<div className="node-description">{getConditionDescription()}</div>
</div>
</Space>

<Tag color={data.isConfigured ? 'success' : 'warning'}>
{data.isConfigured ? '已配置' : '未配置'}
</Tag>
</div>

{/* 显示上次执行结果 */}
{data.lastResult !== undefined && (
<div style={{ marginTop: 8, textAlign: 'center' }}>
<Tag color={data.lastResult ? 'success' : 'error'}>
上次结果: {data.lastResult ? '真' : '假'}
</Tag>
</div>
)}
</Card>

{/* 输出连接点 - True分支 */}
<Handle
type="source"
position={Position.Bottom}
id="true"
style={{
background: '#52c41a',
border: '2px solid white',
width: 12,
height: 12,
left: '25%'
}}
/>

{/* 输出连接点 - False分支 */}
<Handle
type="source"
position={Position.Bottom}
id="false"
style={{
background: '#ff4d4f',
border: '2px solid white',
width: 12,
height: 12,
left: '75%'
}}
/>

{/* 分支标签 */}
<div className="branch-labels">
<span className="branch-label true-label"></span>
<span className="branch-label false-label"></span>
</div>
</div>
);
};

export default ConditionNode;

(三)状态管理与API集成

Zustand状态管理实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
// src/stores/workflowStore.ts
import { create } from 'zustand';
import { devtools } from 'zustand/middleware';
import { Node, Edge } from 'reactflow';
import { workflowAPI } from '../services/api';

interface Workflow {
id: string;
name: string;
description: string;
nodes: Node[];
edges: Edge[];
status: 'draft' | 'active' | 'inactive';
version: string;
createdAt: string;
updatedAt: string;
}

interface WorkflowExecution {
id: string;
workflowId: string;
status: 'running' | 'completed' | 'failed' | 'cancelled';
startTime: string;
endTime?: string;
logs: ExecutionLog[];
error?: string;
}

interface ExecutionLog {
nodeId: string;
status: 'pending' | 'running' | 'completed' | 'failed';
startTime: string;
endTime?: string;
input?: any;
output?: any;
error?: string;
}

interface WorkflowStore {
// 状态
workflows: Workflow[];
currentWorkflow: Workflow | null;
executionStatus: WorkflowExecution | null;
isLoading: boolean;
error: string | null;

// 动作
loadWorkflows: () => Promise<void>;
loadWorkflow: (id: string) => Promise<void>;
saveWorkflow: (data: Partial<Workflow>) => Promise<void>;
deleteWorkflow: (id: string) => Promise<void>;
executeWorkflow: (id: string, triggerData: any) => Promise<void>;
stopExecution: (executionId: string) => Promise<void>;
updateExecutionStatus: (execution: WorkflowExecution) => void;
setCurrentWorkflow: (workflow: Workflow | null) => void;
setError: (error: string | null) => void;
}

export const useWorkflowStore = create<WorkflowStore>()(
devtools(
(set, get) => ({
// 初始状态
workflows: [],
currentWorkflow: null,
executionStatus: null,
isLoading: false,
error: null,

// 加载工作流程列表
loadWorkflows: async () => {
set({ isLoading: true, error: null });
try {
const workflows = await workflowAPI.getWorkflows();
set({ workflows, isLoading: false });
} catch (error) {
set({
error: error instanceof Error ? error.message : '加载工作流程失败',
isLoading: false
});
}
},

// 加载单个工作流程
loadWorkflow: async (id: string) => {
set({ isLoading: true, error: null });
try {
const workflow = await workflowAPI.getWorkflow(id);
set({ currentWorkflow: workflow, isLoading: false });
} catch (error) {
set({
error: error instanceof Error ? error.message : '加载工作流程失败',
isLoading: false
});
}
},

// 保存工作流程
saveWorkflow: async (data: Partial<Workflow>) => {
set({ isLoading: true, error: null });
try {
const { currentWorkflow } = get();

let savedWorkflow: Workflow;
if (currentWorkflow?.id) {
// 更新现有工作流程
savedWorkflow = await workflowAPI.updateWorkflow(currentWorkflow.id, data);
} else {
// 创建新工作流程
savedWorkflow = await workflowAPI.createWorkflow(data);
}

set({
currentWorkflow: savedWorkflow,
isLoading: false
});

// 更新工作流程列表
const { workflows } = get();
const updatedWorkflows = currentWorkflow?.id
? workflows.map(w => w.id === savedWorkflow.id ? savedWorkflow : w)
: [...workflows, savedWorkflow];

set({ workflows: updatedWorkflows });
} catch (error) {
set({
error: error instanceof Error ? error.message : '保存工作流程失败',
isLoading: false
});
throw error;
}
},

// 删除工作流程
deleteWorkflow: async (id: string) => {
set({ isLoading: true, error: null });
try {
await workflowAPI.deleteWorkflow(id);

const { workflows, currentWorkflow } = get();
const updatedWorkflows = workflows.filter(w => w.id !== id);

set({
workflows: updatedWorkflows,
currentWorkflow: currentWorkflow?.id === id ? null : currentWorkflow,
isLoading: false
});
} catch (error) {
set({
error: error instanceof Error ? error.message : '删除工作流程失败',
isLoading: false
});
throw error;
}
},

// 执行工作流程
executeWorkflow: async (id: string, triggerData: any) => {
set({ isLoading: true, error: null });
try {
const execution = await workflowAPI.executeWorkflow(id, triggerData);
set({
executionStatus: execution,
isLoading: false
});
} catch (error) {
set({
error: error instanceof Error ? error.message : '执行工作流程失败',
isLoading: false
});
throw error;
}
},

// 停止执行
stopExecution: async (executionId: string) => {
set({ isLoading: true, error: null });
try {
await workflowAPI.stopExecution(executionId);
set({ isLoading: false });
} catch (error) {
set({
error: error instanceof Error ? error.message : '停止执行失败',
isLoading: false
});
throw error;
}
},

// 更新执行状态(通过WebSocket)
updateExecutionStatus: (execution: WorkflowExecution) => {
set({ executionStatus: execution });
},

// 设置当前工作流程
setCurrentWorkflow: (workflow: Workflow | null) => {
set({ currentWorkflow: workflow });
},

// 设置错误信息
setError: (error: string | null) => {
set({ error });
},
}),
{
name: 'workflow-store',
}
)
);

API服务层实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// src/services/api.ts
import axios, { AxiosInstance, AxiosResponse } from 'axios';

class WorkflowAPI {
private client: AxiosInstance;

constructor() {
this.client = axios.create({
baseURL: process.env.REACT_APP_API_BASE_URL || 'http://localhost:3001/api',
timeout: 30000,
headers: {
'Content-Type': 'application/json',
},
});

// 请求拦截器 - 添加认证token
this.client.interceptors.request.use(
(config) => {
const token = localStorage.getItem('authToken');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
(error) => Promise.reject(error)
);

// 响应拦截器 - 统一错误处理
this.client.interceptors.response.use(
(response: AxiosResponse) => response.data,
(error) => {
if (error.response?.status === 401) {
// 处理认证失败
localStorage.removeItem('authToken');
window.location.href = '/login';
}
return Promise.reject(error.response?.data || error);
}
);
}

// 工作流程相关API
async getWorkflows(): Promise<Workflow[]> {
return this.client.get('/workflows');
}

async getWorkflow(id: string): Promise<Workflow> {
return this.client.get(`/workflows/${id}`);
}

async createWorkflow(data: Partial<Workflow>): Promise<Workflow> {
return this.client.post('/workflows', data);
}

async updateWorkflow(id: string, data: Partial<Workflow>): Promise<Workflow> {
return this.client.put(`/workflows/${id}`, data);
}

async deleteWorkflow(id: string): Promise<void> {
return this.client.delete(`/workflows/${id}`);
}

// 执行相关API
async executeWorkflow(id: string, triggerData: any): Promise<WorkflowExecution> {
return this.client.post(`/workflows/${id}/execute`, { triggerData });
}

async getExecution(executionId: string): Promise<WorkflowExecution> {
return this.client.get(`/executions/${executionId}`);
}

async stopExecution(executionId: string): Promise<void> {
return this.client.post(`/executions/${executionId}/stop`);
}

async getExecutionLogs(executionId: string): Promise<ExecutionLog[]> {
return this.client.get(`/executions/${executionId}/logs`);
}

// 连接器相关API
async getConnectors(): Promise<Connector[]> {
return this.client.get('/connectors');
}

async createConnector(data: any): Promise<Connector> {
return this.client.post('/connectors', data);
}

async testConnector(type: string, config: any): Promise<{ success: boolean; message?: string }> {
return this.client.post('/connectors/test', { type, config });
}

// 节点模板相关API
async getNodeTemplates(): Promise<NodeTemplate[]> {
return this.client.get('/node-templates');
}
}

export const workflowAPI = new WorkflowAPI();

三、后端执行引擎实现

(一)Express服务器和基础架构

服务器主文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// server/src/app.ts
import express from 'express';
import cors from 'cors';
import helmet from 'helmet';
import compression from 'compression';
import { createServer } from 'http';
import { Server as SocketIOServer } from 'socket.io';
import mongoose from 'mongoose';
import { config } from './config';
import { errorHandler, notFoundHandler } from './middleware/errorHandler';
import { authMiddleware } from './middleware/auth';
import { rateLimitMiddleware } from './middleware/rateLimit';

// 路由导入
import workflowRoutes from './routes/workflows';
import executionRoutes from './routes/executions';
import connectorRoutes from './routes/connectors';
import nodeTemplateRoutes from './routes/nodeTemplates';

// 服务导入
import { WorkflowEngine } from './services/WorkflowEngine';
import { SocketManager } from './services/SocketManager';

class Application {
public app: express.Application;
public server: any;
public io: SocketIOServer;
private workflowEngine: WorkflowEngine;
private socketManager: SocketManager;

constructor() {
this.app = express();
this.server = createServer(this.app);
this.io = new SocketIOServer(this.server, {
cors: {
origin: process.env.FRONTEND_URL || "http://localhost:3000",
methods: ["GET", "POST"]
}
});

this.initializeDatabase();
this.initializeMiddleware();
this.initializeRoutes();
this.initializeServices();
this.initializeErrorHandling();
}

private async initializeDatabase(): Promise<void> {
try {
await mongoose.connect(config.database.uri, {
useNewUrlParser: true,
useUnifiedTopology: true,
});
console.log('✅ 数据库连接成功');
} catch (error) {
console.error('❌ 数据库连接失败:', error);
process.exit(1);
}
}

private initializeMiddleware(): void {
// 安全中间件
this.app.use(helmet());
this.app.use(cors({
origin: process.env.FRONTEND_URL || "http://localhost:3000",
credentials: true
}));
this.app.use(compression());

// 基础中间件
this.app.use(express.json({ limit: '10mb' }));
this.app.use(express.urlencoded({ extended: true }));

// 限流中间件
this.app.use(rateLimitMiddleware);

// 认证中间件(除了公开路由)
this.app.use('/api', authMiddleware);

// 请求日志
this.app.use((req, res, next) => {
console.log(`${new Date().toISOString()} - ${req.method} ${req.path}`);
next();
});
}

private initializeRoutes(): void {
// API路由
this.app.use('/api/workflows', workflowRoutes);
this.app.use('/api/executions', executionRoutes);
this.app.use('/api/connectors', connectorRoutes);
this.app.use('/api/node-templates', nodeTemplateRoutes);

// 健康检查
this.app.get('/health', (req, res) => {
res.json({
status: 'ok',
timestamp: new Date().toISOString(),
uptime: process.uptime()
});
});
}

private initializeServices(): void {
// 初始化工作流程执行引擎
this.workflowEngine = new WorkflowEngine();

// 初始化Socket管理器
this.socketManager = new SocketManager(this.io);

// 连接执行引擎和Socket管理器
this.workflowEngine.on('workflow.started', (data) => {
this.socketManager.broadcastExecutionUpdate(data);
});

this.workflowEngine.on('workflow.completed', (data) => {
this.socketManager.broadcastExecutionUpdate(data);
});

this.workflowEngine.on('workflow.failed', (data) => {
this.socketManager.broadcastExecutionUpdate(data);
});

this.workflowEngine.on('node.started', (data) => {
this.socketManager.broadcastNodeUpdate(data);
});

this.workflowEngine.on('node.completed', (data) => {
this.socketManager.broadcastNodeUpdate(data);
});

this.workflowEngine.on('node.failed', (data) => {
this.socketManager.broadcastNodeUpdate(data);
});

// 将服务实例添加到app中,供路由使用
this.app.set('workflowEngine', this.workflowEngine);
this.app.set('socketManager', this.socketManager);
}

private initializeErrorHandling(): void {
this.app.use(notFoundHandler);
this.app.use(errorHandler);
}

public listen(): void {
const port = config.server.port;
this.server.listen(port, () => {
console.log(`🚀 服务器启动成功,端口: ${port}`);
console.log(`📊 环境: ${process.env.NODE_ENV || 'development'}`);
});
}
}

export default Application;

(二)工作流程执行引擎核心实现

执行引擎主类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
// server/src/services/WorkflowEngine.ts
import { EventEmitter } from 'events';
import { Queue } from 'bull';
import { WorkflowModel, WorkflowInstanceModel } from '../models';
import { NodeRegistry } from './NodeRegistry';
import { ConnectorManager } from './ConnectorManager';
import { Logger } from '../utils/logger';

interface ExecutionContext {
instanceId: string;
workflowId: string;
userId: string;
variables: Record<string, any>;
triggerData: any;
}

interface NodeExecutionResult {
success: boolean;
data?: any;
error?: string;
nextNodes?: string[];
}

export class WorkflowEngine extends EventEmitter {
private nodeRegistry: NodeRegistry;
private connectorManager: ConnectorManager;
private executionQueue: Queue;
private runningInstances: Map<string, ExecutionContext>;
private logger: Logger;

constructor() {
super();
this.nodeRegistry = new NodeRegistry();
this.connectorManager = new ConnectorManager();
this.executionQueue = new Queue('workflow execution', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
}
});
this.runningInstances = new Map();
this.logger = new Logger('WorkflowEngine');

this.setupQueueProcessors();
this.registerBuiltinNodes();
}

// 执行工作流程
async executeWorkflow(
workflowId: string,
triggerData: any,
userId: string
): Promise<string> {
try {
this.logger.info(`开始执行工作流程: ${workflowId}`);

// 1. 获取工作流程定义
const workflow = await WorkflowModel.findById(workflowId);
if (!workflow) {
throw new Error(`工作流程不存在: ${workflowId}`);
}

if (workflow.status !== 'active') {
throw new Error(`工作流程未激活: ${workflowId}`);
}

// 2. 创建执行实例
const instance = new WorkflowInstanceModel({
workflowId,
userId,
status: 'running',
triggerData,
executionLog: [],
startTime: new Date()
});
await instance.save();

// 3. 创建执行上下文
const context: ExecutionContext = {
instanceId: instance._id.toString(),
workflowId,
userId,
variables: { ...triggerData },
triggerData
};

this.runningInstances.set(context.instanceId, context);

// 4. 查找触发节点并开始执行
const triggerNodes = workflow.nodes.filter(node => node.type === 'trigger');
if (triggerNodes.length === 0) {
throw new Error('工作流程没有触发节点');
}

// 5. 将执行任务加入队列
await this.executionQueue.add('executeWorkflow', {
context,
workflow: workflow.toObject()
});

this.emit('workflow.started', {
instanceId: context.instanceId,
workflowId,
userId
});

return context.instanceId;

} catch (error) {
this.logger.error('执行工作流程失败:', error);
throw error;
}
}

// 执行单个节点
private async executeNode(
node: any,
context: ExecutionContext,
workflow: any
): Promise<NodeExecutionResult> {
const startTime = new Date();

try {
this.logger.info(`开始执行节点: ${node.id} (${node.type})`);

// 1. 记录节点开始执行
await this.logNodeExecution(context.instanceId, node.id, 'running', {
startTime,
input: context.variables
});

this.emit('node.started', {
instanceId: context.instanceId,
nodeId: node.id,
nodeType: node.type
});

// 2. 获取节点执行器
const executor = this.nodeRegistry.getExecutor(node.type);
if (!executor) {
throw new Error(`未找到节点类型 ${node.type} 的执行器`);
}

// 3. 准备节点输入数据
const inputData = await this.prepareNodeInput(node, context, workflow);

// 4. 执行节点
const result = await executor.execute(node.data.config, inputData, context);

// 5. 处理执行结果
if (result.success) {
// 更新上下文变量
if (result.data) {
context.variables = { ...context.variables, ...result.data };
}

// 记录成功执行
await this.logNodeExecution(context.instanceId, node.id, 'completed', {
endTime: new Date(),
executionTime: Date.now() - startTime.getTime(),
output: result.data
});

this.emit('node.completed', {
instanceId: context.instanceId,
nodeId: node.id,
result: result.data
});

// 6. 查找并执行下一个节点
const nextNodes = this.findNextNodes(node, workflow, result);
for (const nextNode of nextNodes) {
// 检查节点是否可以执行(所有前置节点都已完成)
if (await this.canExecuteNode(nextNode, context, workflow)) {
// 将下一个节点加入执行队列
await this.executionQueue.add('executeNode', {
node: nextNode,
context,
workflow
});
}
}

// 7. 检查工作流程是否完成
if (await this.isWorkflowCompleted(context, workflow)) {
await this.completeWorkflow(context);
}

return { success: true, data: result.data, nextNodes: nextNodes.map(n => n.id) };

} else {
// 记录执行失败
await this.logNodeExecution(context.instanceId, node.id, 'failed', {
endTime: new Date(),
executionTime: Date.now() - startTime.getTime(),
error: result.error
});

this.emit('node.failed', {
instanceId: context.instanceId,
nodeId: node.id,
error: result.error
});

// 根据错误处理策略决定是否继续
const shouldContinue = await this.handleNodeError(node, result.error, context);
if (!shouldContinue) {
await this.failWorkflow(context, result.error);
}

return { success: false, error: result.error };
}

} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);

this.logger.error(`节点执行异常: ${node.id}`, error);

// 记录执行异常
await this.logNodeExecution(context.instanceId, node.id, 'failed', {
endTime: new Date(),
executionTime: Date.now() - startTime.getTime(),
error: errorMessage
});

this.emit('node.error', {
instanceId: context.instanceId,
nodeId: node.id,
error: errorMessage
});

await this.failWorkflow(context, errorMessage);
return { success: false, error: errorMessage };
}
}

// 准备节点输入数据
private async prepareNodeInput(
node: any,
context: ExecutionContext,
workflow: any
): Promise<any> {
const inputData: any = {
// 基础上下文信息
_context: {
instanceId: context.instanceId,
workflowId: context.workflowId,
userId: context.userId,
nodeId: node.id
},
// 全局变量
...context.variables
};

// 获取前置节点的输出数据
const incomingEdges = workflow.edges.filter(edge => edge.target === node.id);
for (const edge of incomingEdges) {
const sourceNodeOutput = await this.getNodeOutput(context.instanceId, edge.source);
if (sourceNodeOutput) {
// 根据连接点映射数据
const mappedData = this.mapNodeData(sourceNodeOutput, edge);
Object.assign(inputData, mappedData);
}
}

return inputData;
}

// 查找下一个要执行的节点
private findNextNodes(currentNode: any, workflow: any, result: NodeExecutionResult): any[] {
const outgoingEdges = workflow.edges.filter(edge => edge.source === currentNode.id);
const nextNodes: any[] = [];

for (const edge of outgoingEdges) {
const targetNode = workflow.nodes.find(node => node.id === edge.target);
if (targetNode) {
// 对于条件节点,根据执行结果选择分支
if (currentNode.type === 'condition') {
const conditionResult = result.data?.result;
if (
(edge.sourceHandle === 'true' && conditionResult) ||
(edge.sourceHandle === 'false' && !conditionResult) ||
(!edge.sourceHandle) // 默认输出
) {
nextNodes.push(targetNode);
}
} else {
nextNodes.push(targetNode);
}
}
}

return nextNodes;
}

// 设置队列处理器
private setupQueueProcessors(): void {
// 工作流程执行处理器
this.executionQueue.process('executeWorkflow', async (job) => {
const { context, workflow } = job.data;

// 执行触发节点
const triggerNodes = workflow.nodes.filter(node => node.type === 'trigger');
for (const triggerNode of triggerNodes) {
await this.executeNode(triggerNode, context, workflow);
}
});

// 节点执行处理器
this.executionQueue.process('executeNode', async (job) => {
const { node, context, workflow } = job.data;
await this.executeNode(node, context, workflow);
});

// 错误处理
this.executionQueue.on('failed', (job, err) => {
this.logger.error(`队列任务失败: ${job.id}`, err);
});
}

// 注册内置节点类型
private registerBuiltinNodes(): void {
this.nodeRegistry.registerBuiltinNodes();
}

// 其他辅助方法...
private async logNodeExecution(instanceId: string, nodeId: string, status: string, data: any) {
await WorkflowInstanceModel.findByIdAndUpdate(instanceId, {
$push: {
executionLog: {
nodeId,
status,
timestamp: new Date(),
...data
}
}
});
}

private async completeWorkflow(context: ExecutionContext) {
await WorkflowInstanceModel.findByIdAndUpdate(context.instanceId, {
status: 'completed',
endTime: new Date()
});

this.runningInstances.delete(context.instanceId);
this.emit('workflow.completed', { instanceId: context.instanceId });
this.logger.info(`工作流程执行完成: ${context.instanceId}`);
}

private async failWorkflow(context: ExecutionContext, error: string) {
await WorkflowInstanceModel.findByIdAndUpdate(context.instanceId, {
status: 'failed',
error,
endTime: new Date()
});

this.runningInstances.delete(context.instanceId);
this.emit('workflow.failed', { instanceId: context.instanceId, error });
this.logger.error(`工作流程执行失败: ${context.instanceId}`, error);
}

// 停止工作流程执行
async stopWorkflow(instanceId: string): Promise<void> {
const context = this.runningInstances.get(instanceId);
if (context) {
await WorkflowInstanceModel.findByIdAndUpdate(instanceId, {
status: 'cancelled',
endTime: new Date()
});

this.runningInstances.delete(instanceId);
this.emit('workflow.cancelled', { instanceId });
this.logger.info(`工作流程执行已停止: ${instanceId}`);
}
}
}

(三)节点注册表和执行器实现

节点注册表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
// server/src/services/NodeRegistry.ts
import { ExecutionContext } from './WorkflowEngine';

export interface NodeExecutor {
execute(config: any, inputData: any, context: ExecutionContext): Promise<{
success: boolean;
data?: any;
error?: string;
}>;
}

export class NodeRegistry {
private executors: Map<string, NodeExecutor> = new Map();

// 注册节点执行器
register(nodeType: string, executor: NodeExecutor) {
this.executors.set(nodeType, executor);
console.log(`✅ 节点类型已注册: ${nodeType}`);
}

// 获取节点执行器
getExecutor(nodeType: string): NodeExecutor | undefined {
return this.executors.get(nodeType);
}

// 注册内置节点
registerBuiltinNodes() {
// HTTP请求节点
this.register('http_request', new HttpRequestExecutor());

// 邮件发送节点
this.register('email', new EmailExecutor());

// 条件判断节点
this.register('condition', new ConditionExecutor());

// 数据转换节点
this.register('transform', new TransformExecutor());

// Slack消息节点
this.register('slack_message', new SlackMessageExecutor());

// GitHub操作节点
this.register('github_action', new GitHubActionExecutor());
}
}

// HTTP请求执行器
class HttpRequestExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const axios = require('axios');

const requestConfig = {
url: this.replaceVariables(config.url || inputData.url, inputData),
method: config.method || 'GET',
headers: config.headers || {},
data: config.body || inputData.body,
timeout: config.timeout || 30000
};

// 处理认证
if (config.auth) {
if (config.auth.type === 'bearer') {
requestConfig.headers.Authorization = `Bearer ${config.auth.token}`;
} else if (config.auth.type === 'basic') {
requestConfig.auth = {
username: config.auth.username,
password: config.auth.password
};
}
}

const response = await axios(requestConfig);

return {
success: true,
data: {
status: response.status,
statusText: response.statusText,
headers: response.headers,
body: response.data,
url: response.config.url
}
};
} catch (error) {
return {
success: false,
error: `HTTP请求失败: ${error.response?.data?.message || error.message}`
};
}
}

private replaceVariables(template: string, variables: any): string {
if (!template) return '';

return template.replace(/\{\{([^}]+)\}\}/g, (match, key) => {
const keys = key.trim().split('.');
let value = variables;

for (const k of keys) {
if (value && typeof value === 'object' && k in value) {
value = value[k];
} else {
return match; // 保持原样如果找不到变量
}
}

return String(value);
});
}
}

// 条件判断执行器
class ConditionExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const { field, operator, value } = config;
const actualValue = this.getNestedValue(inputData, field);

let result = false;

switch (operator) {
case 'equals':
result = actualValue === value;
break;
case 'not_equals':
result = actualValue !== value;
break;
case 'greater_than':
result = Number(actualValue) > Number(value);
break;
case 'less_than':
result = Number(actualValue) < Number(value);
break;
case 'greater_equal':
result = Number(actualValue) >= Number(value);
break;
case 'less_equal':
result = Number(actualValue) <= Number(value);
break;
case 'contains':
result = String(actualValue).includes(String(value));
break;
case 'not_contains':
result = !String(actualValue).includes(String(value));
break;
case 'starts_with':
result = String(actualValue).startsWith(String(value));
break;
case 'ends_with':
result = String(actualValue).endsWith(String(value));
break;
case 'regex':
result = new RegExp(value).test(String(actualValue));
break;
case 'is_empty':
result = !actualValue || actualValue === '' || actualValue === null || actualValue === undefined;
break;
case 'is_not_empty':
result = !!actualValue && actualValue !== '' && actualValue !== null && actualValue !== undefined;
break;
default:
throw new Error(`不支持的操作符: ${operator}`);
}

return {
success: true,
data: {
result,
actualValue,
expectedValue: value,
operator,
field
}
};
} catch (error) {
return {
success: false,
error: `条件判断失败: ${error.message}`
};
}
}

private getNestedValue(obj: any, path: string): any {
return path.split('.').reduce((current, key) => {
return current && current[key] !== undefined ? current[key] : undefined;
}, obj);
}
}

// Slack消息执行器
class SlackMessageExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const { WebClient } = require('@slack/web-api');

// 从连接器管理器获取Slack令牌
const connectorManager = new (require('./ConnectorManager').ConnectorManager)();
const slackConnector = await connectorManager.getConnector(
context.userId,
'slack',
config.connectorId
);

if (!slackConnector) {
throw new Error('Slack连接器未配置或已失效');
}

const slack = new WebClient(slackConnector.config.accessToken);

const message = this.replaceVariables(config.message, inputData);
const channel = config.channel || inputData.channel;

if (!channel) {
throw new Error('未指定Slack频道');
}

const messageOptions: any = {
channel,
text: message
};

// 支持富文本块
if (config.blocks && Array.isArray(config.blocks)) {
messageOptions.blocks = config.blocks.map(block =>
this.processSlackBlock(block, inputData)
);
}

// 支持附件
if (config.attachments && Array.isArray(config.attachments)) {
messageOptions.attachments = config.attachments;
}

// 支持线程回复
if (config.threadTs || inputData.threadTs) {
messageOptions.thread_ts = config.threadTs || inputData.threadTs;
}

const result = await slack.chat.postMessage(messageOptions);

return {
success: true,
data: {
messageId: result.ts,
channel: result.channel,
permalink: await this.getPermalink(slack, result.channel, result.ts)
}
};
} catch (error) {
return {
success: false,
error: `发送Slack消息失败: ${error.message}`
};
}
}

private replaceVariables(template: string, variables: any): string {
if (!template) return '';

return template.replace(/\{\{([^}]+)\}\}/g, (match, key) => {
const keys = key.trim().split('.');
let value = variables;

for (const k of keys) {
if (value && typeof value === 'object' && k in value) {
value = value[k];
} else {
return match;
}
}

return String(value);
});
}

private processSlackBlock(block: any, variables: any): any {
// 递归处理Slack块中的变量替换
if (typeof block === 'string') {
return this.replaceVariables(block, variables);
} else if (Array.isArray(block)) {
return block.map(item => this.processSlackBlock(item, variables));
} else if (typeof block === 'object' && block !== null) {
const processedBlock = {};
for (const [key, value] of Object.entries(block)) {
processedBlock[key] = this.processSlackBlock(value, variables);
}
return processedBlock;
}
return block;
}

private async getPermalink(slack: any, channel: string, ts: string): Promise<string> {
try {
const result = await slack.chat.getPermalink({ channel, message_ts: ts });
return result.permalink;
} catch (error) {
return '';
}
}
}

// 邮件发送执行器
class EmailExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const nodemailer = require('nodemailer');

// 从连接器管理器获取邮件配置
const connectorManager = new (require('./ConnectorManager').ConnectorManager)();
const emailConnector = await connectorManager.getConnector(
context.userId,
'email',
config.connectorId
);

if (!emailConnector) {
throw new Error('邮件连接器未配置或已失效');
}

const transporter = nodemailer.createTransporter({
host: emailConnector.config.host,
port: emailConnector.config.port,
secure: emailConnector.config.secure,
auth: {
user: emailConnector.config.username,
pass: emailConnector.config.password
}
});

const mailOptions = {
from: config.from || emailConnector.config.from,
to: this.replaceVariables(config.to, inputData),
cc: config.cc ? this.replaceVariables(config.cc, inputData) : undefined,
bcc: config.bcc ? this.replaceVariables(config.bcc, inputData) : undefined,
subject: this.replaceVariables(config.subject, inputData),
text: config.textContent ? this.replaceVariables(config.textContent, inputData) : undefined,
html: config.htmlContent ? this.replaceVariables(config.htmlContent, inputData) : undefined,
attachments: config.attachments || undefined
};

const result = await transporter.sendMail(mailOptions);

return {
success: true,
data: {
messageId: result.messageId,
accepted: result.accepted,
rejected: result.rejected
}
};
} catch (error) {
return {
success: false,
error: `发送邮件失败: ${error.message}`
};
}
}

private replaceVariables(template: string, variables: any): string {
if (!template) return '';

return template.replace(/\{\{([^}]+)\}\}/g, (match, key) => {
const keys = key.trim().split('.');
let value = variables;

for (const k of keys) {
if (value && typeof value === 'object' && k in value) {
value = value[k];
} else {
return match;
}
}

return String(value);
});
}
}

// 数据转换执行器
class TransformExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const { script, language = 'javascript' } = config;

if (language === 'javascript') {
return this.executeJavaScript(script, inputData, context);
} else {
throw new Error(`不支持的脚本语言: ${language}`);
}
} catch (error) {
return {
success: false,
error: `数据转换失败: ${error.message}`
};
}
}

private async executeJavaScript(script: string, inputData: any, context: ExecutionContext) {
try {
// 创建安全的执行环境
const vm = require('vm');
const sandbox = {
input: inputData,
output: {},
console: {
log: (...args) => console.log(`[Transform ${context.instanceId}]`, ...args)
},
// 提供一些实用函数
JSON,
Math,
Date,
String,
Number,
Array,
Object
};

// 执行脚本
const vmContext = vm.createContext(sandbox);
vm.runInContext(script, vmContext, {
timeout: 30000, // 30秒超时
displayErrors: true
});

return {
success: true,
data: sandbox.output
};
} catch (error) {
return {
success: false,
error: `JavaScript执行失败: ${error.message}`
};
}
}
}

// GitHub操作执行器
class GitHubActionExecutor implements NodeExecutor {
async execute(config: any, inputData: any, context: ExecutionContext) {
try {
const { Octokit } = require('@octokit/rest');

// 从连接器管理器获取GitHub令牌
const connectorManager = new (require('./ConnectorManager').ConnectorManager)();
const githubConnector = await connectorManager.getConnector(
context.userId,
'github',
config.connectorId
);

if (!githubConnector) {
throw new Error('GitHub连接器未配置或已失效');
}

const octokit = new Octokit({
auth: githubConnector.config.accessToken
});

const { action, owner, repo } = config;

switch (action) {
case 'create_issue':
return await this.createIssue(octokit, config, inputData);
case 'create_pr':
return await this.createPullRequest(octokit, config, inputData);
case 'add_comment':
return await this.addComment(octokit, config, inputData);
case 'create_release':
return await this.createRelease(octokit, config, inputData);
default:
throw new Error(`不支持的GitHub操作: ${action}`);
}
} catch (error) {
return {
success: false,
error: `GitHub操作失败: ${error.message}`
};
}
}

private async createIssue(octokit: any, config: any, inputData: any) {
const result = await octokit.issues.create({
owner: config.owner,
repo: config.repo,
title: this.replaceVariables(config.title, inputData),
body: this.replaceVariables(config.body, inputData),
labels: config.labels || [],
assignees: config.assignees || []
});

return {
success: true,
data: {
issueNumber: result.data.number,
issueUrl: result.data.html_url,
issueId: result.data.id
}
};
}

private async createPullRequest(octokit: any, config: any, inputData: any) {
const result = await octokit.pulls.create({
owner: config.owner,
repo: config.repo,
title: this.replaceVariables(config.title, inputData),
body: this.replaceVariables(config.body, inputData),
head: config.head,
base: config.base || 'main'
});

return {
success: true,
data: {
prNumber: result.data.number,
prUrl: result.data.html_url,
prId: result.data.id
}
};
}

private async addComment(octokit: any, config: any, inputData: any) {
const result = await octokit.issues.createComment({
owner: config.owner,
repo: config.repo,
issue_number: config.issueNumber || inputData.issueNumber,
body: this.replaceVariables(config.body, inputData)
});

return {
success: true,
data: {
commentId: result.data.id,
commentUrl: result.data.html_url
}
};
}

private async createRelease(octokit: any, config: any, inputData: any) {
const result = await octokit.repos.createRelease({
owner: config.owner,
repo: config.repo,
tag_name: this.replaceVariables(config.tagName, inputData),
name: this.replaceVariables(config.name, inputData),
body: this.replaceVariables(config.body, inputData),
draft: config.draft || false,
prerelease: config.prerelease || false
});

return {
success: true,
data: {
releaseId: result.data.id,
releaseUrl: result.data.html_url,
tagName: result.data.tag_name
}
};
}

private replaceVariables(template: string, variables: any): string {
if (!template) return '';

return template.replace(/\{\{([^}]+)\}\}/g, (match, key) => {
const keys = key.trim().split('.');
let value = variables;

for (const k of keys) {
if (value && typeof value === 'object' && k in value) {
value = value[k];
} else {
return match;
}
}

return String(value);
});
}
}

(四)连接器管理系统

连接器管理器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
// server/src/services/ConnectorManager.ts
import { ConnectorModel } from '../models';
import { encrypt, decrypt } from '../utils/crypto';
import { Logger } from '../utils/logger';

export interface Connector {
id: string;
type: string;
name: string;
config: any;
isActive: boolean;
}

export class ConnectorManager {
private logger: Logger;

constructor() {
this.logger = new Logger('ConnectorManager');
}

// 创建连接器
async createConnector(
userId: string,
type: string,
name: string,
config: any
): Promise<string> {
try {
this.logger.info(`创建连接器: ${type} - ${name}`);

// 验证配置
const validation = await this.validateConfig(type, config);
if (!validation.isValid) {
throw new Error(`配置验证失败: ${validation.errors.join(', ')}`);
}

// 加密敏感配置信息
const encryptedConfig = this.encryptConfig(config);

const connector = new ConnectorModel({
userId,
type,
name,
config: encryptedConfig,
isActive: true,
createdAt: new Date(),
updatedAt: new Date()
});

await connector.save();

// 测试连接
const isValid = await this.testConnection(type, config);
if (!isValid) {
await connector.deleteOne();
throw new Error('连接测试失败,请检查配置');
}

this.logger.info(`连接器创建成功: ${connector._id}`);
return connector._id.toString();
} catch (error) {
this.logger.error('创建连接器失败:', error);
throw error;
}
}

// 获取连接器
async getConnector(
userId: string,
type: string,
connectorId?: string
): Promise<Connector | null> {
try {
const query: any = { userId, type, isActive: true };
if (connectorId) {
query._id = connectorId;
}

const connector = await ConnectorModel.findOne(query);
if (!connector) {
return null;
}

// 解密配置信息
const decryptedConfig = this.decryptConfig(connector.config);

return {
id: connector._id.toString(),
type: connector.type,
name: connector.name,
config: decryptedConfig,
isActive: connector.isActive
};
} catch (error) {
this.logger.error('获取连接器失败:', error);
return null;
}
}

// 更新连接器
async updateConnector(
connectorId: string,
userId: string,
updates: Partial<{ name: string; config: any }>
): Promise<void> {
try {
const updateData: any = { updatedAt: new Date() };

if (updates.name) {
updateData.name = updates.name;
}

if (updates.config) {
// 验证新配置
const connector = await ConnectorModel.findOne({ _id: connectorId, userId });
if (!connector) {
throw new Error('连接器不存在');
}

const validation = await this.validateConfig(connector.type, updates.config);
if (!validation.isValid) {
throw new Error(`配置验证失败: ${validation.errors.join(', ')}`);
}

// 测试新配置
const isValid = await this.testConnection(connector.type, updates.config);
if (!isValid) {
throw new Error('连接测试失败,请检查配置');
}

updateData.config = this.encryptConfig(updates.config);
}

await ConnectorModel.findOneAndUpdate(
{ _id: connectorId, userId },
updateData
);

this.logger.info(`连接器更新成功: ${connectorId}`);
} catch (error) {
this.logger.error('更新连接器失败:', error);
throw error;
}
}

// 删除连接器
async deleteConnector(connectorId: string, userId: string): Promise<void> {
try {
await ConnectorModel.findOneAndUpdate(
{ _id: connectorId, userId },
{ isActive: false, updatedAt: new Date() }
);

this.logger.info(`连接器删除成功: ${connectorId}`);
} catch (error) {
this.logger.error('删除连接器失败:', error);
throw error;
}
}

// 测试连接
async testConnection(type: string, config: any): Promise<boolean> {
try {
switch (type) {
case 'slack':
return await this.testSlackConnection(config);
case 'github':
return await this.testGitHubConnection(config);
case 'email':
return await this.testEmailConnection(config);
case 'webhook':
return await this.testWebhookConnection(config);
default:
this.logger.warn(`未知连接器类型: ${type}`);
return true; // 默认通过
}
} catch (error) {
this.logger.error(`测试${type}连接失败:`, error);
return false;
}
}

// 验证配置
private async validateConfig(type: string, config: any): Promise<{ isValid: boolean; errors: string[] }> {
const errors: string[] = [];

switch (type) {
case 'slack':
if (!config.accessToken) errors.push('缺少Slack访问令牌');
break;
case 'github':
if (!config.accessToken) errors.push('缺少GitHub访问令牌');
break;
case 'email':
if (!config.host) errors.push('缺少SMTP主机');
if (!config.port) errors.push('缺少SMTP端口');
if (!config.username) errors.push('缺少用户名');
if (!config.password) errors.push('缺少密码');
break;
case 'webhook':
if (!config.url) errors.push('缺少Webhook URL');
break;
}

return { isValid: errors.length === 0, errors };
}

private async testSlackConnection(config: any): Promise<boolean> {
const { WebClient } = require('@slack/web-api');
const slack = new WebClient(config.accessToken);

try {
const result = await slack.auth.test();
return result.ok;
} catch (error) {
return false;
}
}

private async testGitHubConnection(config: any): Promise<boolean> {
const { Octokit } = require('@octokit/rest');
const octokit = new Octokit({ auth: config.accessToken });

try {
const result = await octokit.users.getAuthenticated();
return !!result.data;
} catch (error) {
return false;
}
}

private async testEmailConnection(config: any): Promise<boolean> {
const nodemailer = require('nodemailer');

try {
const transporter = nodemailer.createTransporter({
host: config.host,
port: config.port,
secure: config.secure,
auth: {
user: config.username,
pass: config.password
}
});

await transporter.verify();
return true;
} catch (error) {
return false;
}
}

private async testWebhookConnection(config: any): Promise<boolean> {
const axios = require('axios');

try {
const response = await axios.head(config.url, { timeout: 5000 });
return response.status < 400;
} catch (error) {
return false;
}
}

private encryptConfig(config: any): any {
// 加密敏感字段
const sensitiveFields = ['accessToken', 'refreshToken', 'apiKey', 'password', 'secret'];
const encrypted = { ...config };

for (const field of sensitiveFields) {
if (encrypted[field]) {
encrypted[field] = encrypt(encrypted[field]);
}
}

return encrypted;
}

private decryptConfig(config: any): any {
// 解密敏感字段
const sensitiveFields = ['accessToken', 'refreshToken', 'apiKey', 'password', 'secret'];
const decrypted = { ...config };

for (const field of sensitiveFields) {
if (decrypted[field]) {
decrypted[field] = decrypt(decrypted[field]);
}
}

return decrypted;
}
}

四、部署与优化

(一)Docker容器化部署

前端Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# frontend/Dockerfile
FROM node:18-alpine AS builder

WORKDIR /app

# 复制package文件
COPY package*.json ./
RUN npm ci --only=production

# 复制源代码并构建
COPY . .
RUN npm run build

# 生产环境
FROM nginx:alpine

# 复制构建产物
COPY --from=builder /app/build /usr/share/nginx/html

# 复制nginx配置
COPY nginx.conf /etc/nginx/nginx.conf

EXPOSE 80

CMD ["nginx", "-g", "daemon off;"]

后端Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# backend/Dockerfile
FROM node:18-alpine

WORKDIR /app

# 安装依赖
COPY package*.json ./
RUN npm ci --only=production

# 复制源代码
COPY . .

# 构建TypeScript
RUN npm run build

# 创建非root用户
RUN addgroup -g 1001 -S nodejs
RUN adduser -S nodejs -u 1001

USER nodejs

EXPOSE 3001

# 健康检查
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3001/health || exit 1

CMD ["node", "dist/index.js"]

Docker Compose配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# docker-compose.yml
version: '3.8'

services:
# 前端服务
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
ports:
- "3000:80"
environment:
- REACT_APP_API_BASE_URL=http://localhost:3001/api
depends_on:
- backend
networks:
- workflow-network

# 后端服务
backend:
build:
context: ./backend
dockerfile: Dockerfile
ports:
- "3001:3001"
environment:
- NODE_ENV=production
- MONGODB_URI=mongodb://mongodb:27017/workflow
- REDIS_URL=redis://redis:6379
- JWT_SECRET=${JWT_SECRET}
- ENCRYPTION_KEY=${ENCRYPTION_KEY}
depends_on:
- mongodb
- redis
networks:
- workflow-network
volumes:
- ./logs:/app/logs

# MongoDB数据库
mongodb:
image: mongo:6.0
ports:
- "27017:27017"
environment:
- MONGO_INITDB_ROOT_USERNAME=${MONGO_USERNAME}
- MONGO_INITDB_ROOT_PASSWORD=${MONGO_PASSWORD}
volumes:
- mongodb_data:/data/db
- ./mongodb/init:/docker-entrypoint-initdb.d
networks:
- workflow-network

# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
command: redis-server --appendonly yes --requirepass ${REDIS_PASSWORD}
volumes:
- redis_data:/data
networks:
- workflow-network

# Nginx负载均衡器
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf
- ./nginx/ssl:/etc/nginx/ssl
depends_on:
- frontend
- backend
networks:
- workflow-network

volumes:
mongodb_data:
redis_data:

networks:
workflow-network:
driver: bridge

(二)性能优化策略

1. 数据库优化

1
2
3
4
5
6
7
8
9
10
11
12
13
// MongoDB索引优化
db.workflows.createIndex({ "userId": 1, "status": 1 });
db.workflows.createIndex({ "createdAt": -1 });
db.workflowInstances.createIndex({ "workflowId": 1, "status": 1 });
db.workflowInstances.createIndex({ "userId": 1, "startTime": -1 });
db.connectors.createIndex({ "userId": 1, "type": 1, "isActive": 1 });

// 复合索引用于复杂查询
db.workflowInstances.createIndex({
"userId": 1,
"status": 1,
"startTime": -1
});

2. 缓存策略实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// server/src/services/CacheManager.ts
import Redis from 'ioredis';
import { Logger } from '../utils/logger';

export class CacheManager {
private redis: Redis;
private logger: Logger;

constructor() {
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3
});

this.logger = new Logger('CacheManager');
}

// 缓存工作流程定义
async cacheWorkflow(workflowId: string, workflow: any, ttl: number = 3600): Promise<void> {
try {
const key = `workflow:${workflowId}`;
await this.redis.setex(key, ttl, JSON.stringify(workflow));
this.logger.debug(`工作流程已缓存: ${workflowId}`);
} catch (error) {
this.logger.error('缓存工作流程失败:', error);
}
}

// 获取缓存的工作流程
async getCachedWorkflow(workflowId: string): Promise<any | null> {
try {
const key = `workflow:${workflowId}`;
const cached = await this.redis.get(key);
return cached ? JSON.parse(cached) : null;
} catch (error) {
this.logger.error('获取缓存工作流程失败:', error);
return null;
}
}

// 缓存执行状态
async cacheExecutionStatus(instanceId: string, status: any, ttl: number = 1800): Promise<void> {
try {
const key = `execution:${instanceId}`;
await this.redis.setex(key, ttl, JSON.stringify(status));
} catch (error) {
this.logger.error('缓存执行状态失败:', error);
}
}

// 缓存连接器配置
async cacheConnector(userId: string, connectorId: string, config: any, ttl: number = 7200): Promise<void> {
try {
const key = `connector:${userId}:${connectorId}`;
await this.redis.setex(key, ttl, JSON.stringify(config));
} catch (error) {
this.logger.error('缓存连接器失败:', error);
}
}

// 清除缓存
async clearCache(pattern: string): Promise<void> {
try {
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
this.logger.info(`清除缓存: ${keys.length} 个键`);
}
} catch (error) {
this.logger.error('清除缓存失败:', error);
}
}
}

3. 任务队列优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// server/src/services/QueueManager.ts
import Bull from 'bull';
import { Logger } from '../utils/logger';

export class QueueManager {
private queues: Map<string, Bull.Queue> = new Map();
private logger: Logger;

constructor() {
this.logger = new Logger('QueueManager');
this.initializeQueues();
}

private initializeQueues(): void {
// 高优先级队列 - 用户触发的工作流程
const highPriorityQueue = new Bull('high-priority', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
},
});

// 普通优先级队列 - 定时触发的工作流程
const normalPriorityQueue = new Bull('normal-priority', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
removeOnComplete: 50,
removeOnFail: 25,
attempts: 2,
backoff: {
type: 'fixed',
delay: 5000,
},
},
});

// 低优先级队列 - 批量处理任务
const lowPriorityQueue = new Bull('low-priority', {
redis: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
},
defaultJobOptions: {
removeOnComplete: 25,
removeOnFail: 10,
attempts: 1,
},
});

this.queues.set('high', highPriorityQueue);
this.queues.set('normal', normalPriorityQueue);
this.queues.set('low', lowPriorityQueue);

// 设置并发处理数
highPriorityQueue.process(10);
normalPriorityQueue.process(5);
lowPriorityQueue.process(2);

this.setupQueueEvents();
}

private setupQueueEvents(): void {
this.queues.forEach((queue, name) => {
queue.on('completed', (job) => {
this.logger.info(`队列 ${name} 任务完成: ${job.id}`);
});

queue.on('failed', (job, err) => {
this.logger.error(`队列 ${name} 任务失败: ${job.id}`, err);
});

queue.on('stalled', (job) => {
this.logger.warn(`队列 ${name} 任务停滞: ${job.id}`);
});
});
}

// 添加任务到指定优先级队列
async addJob(priority: 'high' | 'normal' | 'low', jobType: string, data: any, options?: any): Promise<void> {
const queue = this.queues.get(priority);
if (!queue) {
throw new Error(`未找到优先级队列: ${priority}`);
}

await queue.add(jobType, data, options);
this.logger.debug(`任务已添加到 ${priority} 优先级队列: ${jobType}`);
}

// 获取队列统计信息
async getQueueStats(): Promise<any> {
const stats = {};

for (const [name, queue] of this.queues) {
const waiting = await queue.getWaiting();
const active = await queue.getActive();
const completed = await queue.getCompleted();
const failed = await queue.getFailed();

stats[name] = {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length
};
}

return stats;
}
}

(三)监控与告警系统

1. 应用监控中间件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// server/src/middleware/monitoring.ts
import { Request, Response, NextFunction } from 'express';
import { Counter, Histogram, register } from 'prom-client';

// 创建监控指标
const httpRequestsTotal = new Counter({
name: 'http_requests_total',
help: 'Total number of HTTP requests',
labelNames: ['method', 'route', 'status_code']
});

const httpRequestDuration = new Histogram({
name: 'http_request_duration_seconds',
help: 'Duration of HTTP requests in seconds',
labelNames: ['method', 'route'],
buckets: [0.1, 0.5, 1, 2, 5, 10]
});

const workflowExecutionsTotal = new Counter({
name: 'workflow_executions_total',
help: 'Total number of workflow executions',
labelNames: ['status', 'workflow_id']
});

const nodeExecutionsTotal = new Counter({
name: 'node_executions_total',
help: 'Total number of node executions',
labelNames: ['node_type', 'status']
});

const activeWorkflowInstances = new Counter({
name: 'active_workflow_instances',
help: 'Number of currently active workflow instances'
});

// HTTP请求监控中间件
export const httpMetricsMiddleware = (req: Request, res: Response, next: NextFunction) => {
const start = Date.now();

res.on('finish', () => {
const duration = (Date.now() - start) / 1000;
const route = req.route?.path || req.path;

httpRequestsTotal
.labels(req.method, route, res.statusCode.toString())
.inc();

httpRequestDuration
.labels(req.method, route)
.observe(duration);
});

next();
};

// 工作流程监控
export class WorkflowMonitor {
static recordExecution(workflowId: string, status: string): void {
workflowExecutionsTotal
.labels(status, workflowId)
.inc();
}

static recordNodeExecution(nodeType: string, status: string): void {
nodeExecutionsTotal
.labels(nodeType, status)
.inc();
}

static incrementActiveInstances(): void {
activeWorkflowInstances.inc();
}

static decrementActiveInstances(): void {
activeWorkflowInstances.inc(-1);
}
}

// 指标导出端点
export const metricsHandler = (req: Request, res: Response) => {
res.set('Content-Type', register.contentType);
res.end(register.metrics());
};

2. 健康检查系统

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// server/src/services/HealthChecker.ts
import mongoose from 'mongoose';
import Redis from 'ioredis';
import { Logger } from '../utils/logger';

interface HealthStatus {
status: 'healthy' | 'unhealthy';
timestamp: string;
services: {
database: ServiceStatus;
redis: ServiceStatus;
queue: ServiceStatus;
external: ServiceStatus;
};
uptime: number;
memory: {
used: number;
total: number;
percentage: number;
};
}

interface ServiceStatus {
status: 'up' | 'down';
responseTime?: number;
error?: string;
}

export class HealthChecker {
private redis: Redis;
private logger: Logger;

constructor() {
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
});
this.logger = new Logger('HealthChecker');
}

async getHealthStatus(): Promise<HealthStatus> {
const startTime = Date.now();

const [database, redis, queue, external] = await Promise.all([
this.checkDatabase(),
this.checkRedis(),
this.checkQueue(),
this.checkExternalServices()
]);

const memory = process.memoryUsage();
const memoryUsage = {
used: Math.round(memory.heapUsed / 1024 / 1024),
total: Math.round(memory.heapTotal / 1024 / 1024),
percentage: Math.round((memory.heapUsed / memory.heapTotal) * 100)
};

const overallStatus = this.determineOverallStatus([database, redis, queue, external]);

return {
status: overallStatus,
timestamp: new Date().toISOString(),
services: {
database,
redis,
queue,
external
},
uptime: process.uptime(),
memory: memoryUsage
};
}

private async checkDatabase(): Promise<ServiceStatus> {
try {
const start = Date.now();
await mongoose.connection.db.admin().ping();
const responseTime = Date.now() - start;

return {
status: 'up',
responseTime
};
} catch (error) {
return {
status: 'down',
error: error.message
};
}
}

private async checkRedis(): Promise<ServiceStatus> {
try {
const start = Date.now();
await this.redis.ping();
const responseTime = Date.now() - start;

return {
status: 'up',
responseTime
};
} catch (error) {
return {
status: 'down',
error: error.message
};
}
}

private async checkQueue(): Promise<ServiceStatus> {
try {
// 检查队列连接状态
const queueInfo = await this.redis.info('server');

return {
status: queueInfo ? 'up' : 'down'
};
} catch (error) {
return {
status: 'down',
error: error.message
};
}
}

private async checkExternalServices(): Promise<ServiceStatus> {
try {
// 这里可以检查关键的外部服务
// 例如:Slack API、GitHub API等
return {
status: 'up'
};
} catch (error) {
return {
status: 'down',
error: error.message
};
}
}

private determineOverallStatus(services: ServiceStatus[]): 'healthy' | 'unhealthy' {
const criticalServices = services.slice(0, 3); // database, redis, queue
const allCriticalUp = criticalServices.every(service => service.status === 'up');

return allCriticalUp ? 'healthy' : 'unhealthy';
}
}

五、总结与展望

(一)项目成果总结

通过本文的深入解析,我们成功构建了一个完整的企业级工作流程构建器系统,主要成果包括:

1. 技术架构成果

  • 前端可视化设计器:基于React Flow实现的拖拽式流程设计界面,支持复杂的节点连接和实时预览
  • 后端执行引擎:基于Node.js的异步执行引擎,支持并行处理和错误恢复
  • 插件化节点系统:可扩展的节点注册表,支持自定义节点类型和第三方集成
  • 统一连接器框架:标准化的第三方服务集成接口,支持多种认证方式

2. 核心功能实现

  • 可视化流程设计:直观的拖拽界面,支持复杂的分支和循环逻辑
  • 实时执行监控:WebSocket实时通信,提供执行状态的即时反馈
  • 错误处理机制:完善的错误捕获、重试和恢复策略
  • 性能优化:多级缓存、任务队列和数据库索引优化

3. 企业级特性

  • 安全性:数据加密、认证授权和访问控制
  • 可扩展性:微服务架构、容器化部署和水平扩展
  • 监控告警:全面的应用监控和健康检查系统
  • 高可用性:故障转移、负载均衡和数据备份

(二)技术难点与解决方案

1. 复杂流程执行的挑战

  • 问题:如何处理包含分支、循环和并行执行的复杂工作流程
  • 解决方案:基于有向无环图的执行引擎,使用拓扑排序算法确定执行顺序,通过任务队列实现异步并行处理

2. 第三方服务集成的复杂性

  • 问题:不同服务的API格式、认证方式和错误处理机制差异很大
  • 解决方案:设计统一的连接器接口,标准化数据格式转换,实现可插拔的认证策略

3. 实时状态同步的技术挑战

  • 问题:如何在分布式环境中实时同步工作流程执行状态
  • 解决方案:基于Socket.io的实时通信,结合Redis发布订阅机制,确保状态更新的及时性和一致性

(三)未来发展方向

1. AI智能化增强

  • 智能流程推荐:基于机器学习分析用户行为,推荐最优的流程设计模式
  • 自动错误诊断:AI驱动的错误分析和修复建议系统
  • 自然语言处理:支持自然语言描述转换为可视化工作流程

2. 企业级功能扩展

  • 细粒度权限控制:基于角色的访问控制和审批流程
  • 版本管理系统:工作流程的版本控制、分支管理和回滚功能
  • 合规性支持:满足GDPR、SOX等法规要求的审计日志和数据保护

3. 生态系统建设

  • 开放API平台:提供完整的REST API和SDK,支持第三方开发
  • 插件市场:开放的插件生态系统,支持社区贡献和商业插件
  • 模板库:丰富的行业模板和最佳实践分享平台

4. 技术演进方向

  • 边缘计算支持:支持在边缘设备上执行轻量级工作流程
  • 区块链集成:利用区块链技术确保工作流程执行的不可篡改性
  • 量子计算准备:为未来的量子计算环境设计兼容的架构

(四)最佳实践建议

1. 设计原则

  • 模块化设计:保持组件的独立性和可重用性
  • 错误优先:始终考虑错误情况和异常处理
  • 性能意识:在设计阶段就考虑性能优化策略

2. 开发建议

  • 测试驱动:编写全面的单元测试和集成测试
  • 文档完善:维护详细的API文档和用户指南
  • 代码质量:使用代码检查工具和持续集成流程

3. 运维建议

  • 监控完善:建立全面的监控和告警体系
  • 备份策略:制定完善的数据备份和恢复计划
  • 安全更新:定期更新依赖包和安全补丁

工作流程构建器作为企业数字化转型的重要工具,将在未来的自动化浪潮中发挥越来越重要的作用。通过本文的实战项目,我们不仅掌握了核心技术实现,更重要的是理解了系统设计的思路和方法,为构建更加智能、高效的自动化平台奠定了坚实的基础。


参考资料