前言

企业微信作为腾讯推出的企业级即时通讯工具,其群聊功能中的”已读未读”状态管理是一个看似简单但技术实现极其复杂的功能。想象一下,在一个拥有500人的企业群中,每发送一条消息,系统需要实时跟踪500个用户的阅读状态,并且要保证数据的一致性、实时性和高可用性。这就像在一个巨大的图书馆中,实时记录每个读者对每本书的阅读进度,并且要确保所有管理员都能实时看到最新的统计数据。本文将深入分析企业微信群聊已读未读功能的技术实现原理,包括数据库设计、分布式架构、实时同步机制等核心技术。

一、功能需求与技术挑战

(一)功能需求分析

企业微信群聊已读未读的核心需求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
已读未读功能需求:

基础功能:
├── 消息发送:用户发送消息到群聊
├── 状态跟踪:实时跟踪每个成员的阅读状态
├── 状态展示:显示"已读X人,未读Y人"
├── 详细列表:查看具体哪些人已读/未读
└── 实时更新:状态变化时实时推送更新

高级功能:
├── 批量标记:支持批量标记为已读
├── 历史查询:查询历史消息的阅读状态
├── 统计分析:群消息阅读率统计
├── 权限控制:不同角色的查看权限
└── 离线处理:离线用户上线后的状态同步

性能要求:
├── 高并发:支持大群(500+人)的实时状态更新
├── 低延迟:消息状态变化<100ms内推送
├── 高可用:99.9%的服务可用性
├── 数据一致性:确保状态数据的强一致性
└── 存储优化:海量状态数据的高效存储

技术挑战分析:

  1. 数据量爆炸:500人群×每天1000条消息=50万条状态记录/天
  2. 实时性要求:状态变化需要毫秒级推送给所有在线用户
  3. 一致性保证:分布式环境下的数据一致性维护
  4. 存储优化:海量状态数据的高效存储和查询
  5. 网络开销:减少不必要的网络传输和推送

(二)系统架构设计

整体架构概览:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
// 企业微信群聊已读未读系统架构
class MessageReadStatusSystem {
constructor() {
// 核心组件
this.messageService = new MessageService(); // 消息服务
this.readStatusService = new ReadStatusService(); // 已读状态服务
this.realtimeService = new RealtimeService(); // 实时推送服务
this.cacheService = new CacheService(); // 缓存服务
this.databaseService = new DatabaseService(); // 数据库服务

// 分布式组件
this.messageQueue = new MessageQueue(); // 消息队列
this.distributedLock = new DistributedLock(); // 分布式锁
this.eventBus = new EventBus(); // 事件总线

this.initializeSystem();
}

// 系统初始化
initializeSystem() {
this.setupEventHandlers(); // 设置事件处理器
this.initializeCache(); // 初始化缓存
this.startBackgroundTasks(); // 启动后台任务
}

// 核心业务流程:发送消息
async sendMessage(groupId, senderId, messageContent) {
try {
// 1. 创建消息记录
const message = await this.messageService.createMessage({
groupId: groupId,
senderId: senderId,
content: messageContent,
timestamp: Date.now()
});

// 2. 获取群成员列表
const groupMembers = await this.getGroupMembers(groupId);

// 3. 初始化已读状态(发送者默认已读)
await this.readStatusService.initializeReadStatus(
message.messageId,
groupMembers,
senderId
);

// 4. 实时推送消息给群成员
await this.realtimeService.broadcastMessage(groupId, message);

// 5. 推送初始已读状态
await this.pushReadStatusUpdate(groupId, message.messageId);

return message;

} catch (error) {
console.error('发送消息失败:', error);
throw error;
}
}

// 核心业务流程:标记已读
async markAsRead(messageId, userId) {
try {
// 1. 使用分布式锁防止并发问题
const lockKey = `read_status:${messageId}:${userId}`;
const lock = await this.distributedLock.acquire(lockKey, 5000);

try {
// 2. 检查是否已经标记为已读
const currentStatus = await this.readStatusService.getReadStatus(messageId, userId);
if (currentStatus && currentStatus.isRead) {
return; // 已经是已读状态,无需重复处理
}

// 3. 更新已读状态
await this.readStatusService.markAsRead(messageId, userId);

// 4. 更新缓存
await this.cacheService.updateReadStatusCache(messageId, userId, true);

// 5. 获取消息所属群组
const message = await this.messageService.getMessage(messageId);

// 6. 实时推送状态更新
await this.pushReadStatusUpdate(message.groupId, messageId);

// 7. 发布事件供其他服务处理
this.eventBus.publish('message.read', {
messageId: messageId,
userId: userId,
groupId: message.groupId,
timestamp: Date.now()
});

} finally {
// 释放分布式锁
await this.distributedLock.release(lock);
}

} catch (error) {
console.error('标记已读失败:', error);
throw error;
}
}

// 推送已读状态更新
async pushReadStatusUpdate(groupId, messageId) {
try {
// 1. 获取最新的已读状态统计
const statusSummary = await this.readStatusService.getReadStatusSummary(messageId);

// 2. 构建推送数据
const updateData = {
messageId: messageId,
readCount: statusSummary.readCount,
unreadCount: statusSummary.unreadCount,
totalCount: statusSummary.totalCount,
readUsers: statusSummary.readUsers,
timestamp: Date.now()
};

// 3. 推送给群内所有在线用户
await this.realtimeService.pushToGroup(groupId, 'read_status_update', updateData);

} catch (error) {
console.error('推送已读状态更新失败:', error);
}
}

// 获取群成员列表
async getGroupMembers(groupId) {
// 优先从缓存获取
let members = await this.cacheService.getGroupMembers(groupId);

if (!members) {
// 缓存未命中,从数据库获取
members = await this.databaseService.getGroupMembers(groupId);

// 更新缓存
await this.cacheService.setGroupMembers(groupId, members, 3600); // 缓存1小时
}

return members;
}

// 批量标记已读
async batchMarkAsRead(messageIds, userId) {
try {
// 1. 批量获取消息信息
const messages = await this.messageService.getMessages(messageIds);

// 2. 按群组分组处理
const messagesByGroup = this.groupMessagesByGroup(messages);

// 3. 并行处理各个群组
const promises = Object.entries(messagesByGroup).map(async ([groupId, groupMessages]) => {
// 使用事务确保数据一致性
return await this.databaseService.transaction(async (trx) => {
// 批量更新已读状态
await this.readStatusService.batchMarkAsRead(
groupMessages.map(m => m.messageId),
userId,
trx
);

// 批量更新缓存
await this.cacheService.batchUpdateReadStatusCache(
groupMessages.map(m => m.messageId),
userId,
true
);

// 推送状态更新
for (const message of groupMessages) {
await this.pushReadStatusUpdate(groupId, message.messageId);
}
});
});

await Promise.all(promises);

} catch (error) {
console.error('批量标记已读失败:', error);
throw error;
}
}

// 按群组分组消息
groupMessagesByGroup(messages) {
return messages.reduce((groups, message) => {
if (!groups[message.groupId]) {
groups[message.groupId] = [];
}
groups[message.groupId].push(message);
return groups;
}, {});
}

// 设置事件处理器
setupEventHandlers() {
// 用户上线事件
this.eventBus.subscribe('user.online', async (event) => {
await this.handleUserOnline(event.userId);
});

// 用户离线事件
this.eventBus.subscribe('user.offline', async (event) => {
await this.handleUserOffline(event.userId);
});

// 群成员变更事件
this.eventBus.subscribe('group.member.changed', async (event) => {
await this.handleGroupMemberChanged(event.groupId, event.changes);
});
}

// 处理用户上线
async handleUserOnline(userId) {
try {
// 1. 获取用户的未读消息
const unreadMessages = await this.readStatusService.getUserUnreadMessages(userId);

// 2. 推送未读消息状态
if (unreadMessages.length > 0) {
await this.realtimeService.pushToUser(userId, 'unread_messages', {
messages: unreadMessages,
count: unreadMessages.length
});
}

} catch (error) {
console.error('处理用户上线失败:', error);
}
}

// 处理用户离线
async handleUserOffline(userId) {
// 清理用户相关的实时连接缓存
await this.realtimeService.cleanupUserConnection(userId);
}

// 处理群成员变更
async handleGroupMemberChanged(groupId, changes) {
try {
// 清理群成员缓存
await this.cacheService.deleteGroupMembers(groupId);

// 如果有新成员加入,需要为历史消息初始化已读状态
if (changes.added && changes.added.length > 0) {
await this.initializeReadStatusForNewMembers(groupId, changes.added);
}

} catch (error) {
console.error('处理群成员变更失败:', error);
}
}

// 为新成员初始化历史消息的已读状态
async initializeReadStatusForNewMembers(groupId, newMembers) {
try {
// 获取最近的消息(比如最近100条)
const recentMessages = await this.messageService.getRecentMessages(groupId, 100);

// 为新成员初始化这些消息的未读状态
const initPromises = newMembers.map(async (memberId) => {
return await this.readStatusService.initializeReadStatusForUser(
recentMessages.map(m => m.messageId),
memberId,
false // 新成员对历史消息默认为未读
);
});

await Promise.all(initPromises);

} catch (error) {
console.error('为新成员初始化已读状态失败:', error);
}
}

// 启动后台任务
startBackgroundTasks() {
// 定期清理过期的已读状态数据
setInterval(async () => {
await this.cleanupExpiredReadStatus();
}, 24 * 60 * 60 * 1000); // 每天执行一次

// 定期同步缓存和数据库
setInterval(async () => {
await this.syncCacheWithDatabase();
}, 5 * 60 * 1000); // 每5分钟执行一次
}

// 清理过期的已读状态数据
async cleanupExpiredReadStatus() {
try {
const expiredThreshold = Date.now() - (30 * 24 * 60 * 60 * 1000); // 30天前
await this.readStatusService.cleanupExpiredStatus(expiredThreshold);
console.log('已清理过期的已读状态数据');
} catch (error) {
console.error('清理过期数据失败:', error);
}
}

// 同步缓存和数据库
async syncCacheWithDatabase() {
try {
// 获取缓存中的脏数据
const dirtyData = await this.cacheService.getDirtyReadStatusData();

if (dirtyData.length > 0) {
// 批量同步到数据库
await this.readStatusService.batchSyncReadStatus(dirtyData);

// 清理缓存中的脏标记
await this.cacheService.clearDirtyFlags(dirtyData.map(d => d.key));

console.log(`已同步${dirtyData.length}条已读状态数据`);
}

} catch (error) {
console.error('同步缓存和数据库失败:', error);
}
}
}

二、数据库设计与存储优化

(一)核心数据表设计

消息表和已读状态表设计:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
-- 消息表:存储群聊消息的基本信息
CREATE TABLE messages (
message_id BIGINT PRIMARY KEY, -- 消息唯一ID
group_id BIGINT NOT NULL, -- 群组ID
sender_id BIGINT NOT NULL, -- 发送者用户ID
message_type TINYINT DEFAULT 1, -- 消息类型:1文本/2图片/3文件等
content TEXT, -- 消息内容
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 创建时间
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
is_deleted BOOLEAN DEFAULT FALSE, -- 是否已删除

-- 索引优化:群组消息查询是最频繁的操作
INDEX idx_group_created (group_id, created_at DESC),
INDEX idx_sender_created (sender_id, created_at DESC),

-- 分区策略:按月分区,提高查询性能
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
PARTITION p202501 VALUES LESS THAN (202502),
PARTITION p202502 VALUES LESS THAN (202503),
-- ... 更多分区
PARTITION p_future VALUES LESS THAN MAXVALUE
)
) ENGINE=InnoDB;

-- 已读状态表:存储每个用户对每条消息的阅读状态
-- 这是系统的核心表,数据量最大,需要特别优化
CREATE TABLE message_read_status (
id BIGINT PRIMARY KEY AUTO_INCREMENT, -- 自增主键
message_id BIGINT NOT NULL, -- 消息ID
user_id BIGINT NOT NULL, -- 用户ID
is_read BOOLEAN DEFAULT FALSE, -- 是否已读
read_at TIMESTAMP NULL, -- 阅读时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

-- 唯一约束:每个用户对每条消息只能有一条状态记录
UNIQUE KEY uk_message_user (message_id, user_id),

-- 核心查询索引
INDEX idx_message_read (message_id, is_read), -- 按消息查询已读状态
INDEX idx_user_unread (user_id, is_read, created_at), -- 查询用户未读消息
INDEX idx_read_at (read_at), -- 按阅读时间查询

-- 分区策略:按消息ID范围分区,便于数据清理
PARTITION BY RANGE (message_id DIV 1000000) (
PARTITION p0 VALUES LESS THAN (1000),
PARTITION p1 VALUES LESS THAN (2000),
-- ... 更多分区
PARTITION p_future VALUES LESS THAN MAXVALUE
)
) ENGINE=InnoDB;

-- 群组成员表:存储群组成员关系
CREATE TABLE group_members (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
group_id BIGINT NOT NULL, -- 群组ID
user_id BIGINT NOT NULL, -- 用户ID
role TINYINT DEFAULT 3, -- 角色:1群主/2管理员/3普通成员
joined_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, -- 加入时间
is_active BOOLEAN DEFAULT TRUE, -- 是否活跃成员

UNIQUE KEY uk_group_user (group_id, user_id),
INDEX idx_group_active (group_id, is_active),
INDEX idx_user_groups (user_id, is_active)
) ENGINE=InnoDB;

-- 已读状态汇总表:缓存每条消息的已读统计,减少实时计算
CREATE TABLE message_read_summary (
message_id BIGINT PRIMARY KEY, -- 消息ID
total_count INT NOT NULL DEFAULT 0, -- 总人数
read_count INT NOT NULL DEFAULT 0, -- 已读人数
unread_count INT NOT NULL DEFAULT 0, -- 未读人数
last_read_at TIMESTAMP NULL, -- 最后阅读时间
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

INDEX idx_updated_at (updated_at) -- 用于增量同步
) ENGINE=InnoDB;

存储优化策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- 1. 读写分离优化:创建只读副本表用于统计查询
CREATE TABLE message_read_status_readonly LIKE message_read_status;

-- 2. 历史数据归档表:将旧数据迁移到归档表
CREATE TABLE message_read_status_archive (
LIKE message_read_status INCLUDING ALL
) PARTITION BY RANGE (YEAR(created_at)) (
PARTITION p2023 VALUES LESS THAN (2024),
PARTITION p2024 VALUES LESS THAN (2025),
PARTITION p_current VALUES LESS THAN MAXVALUE
);

-- 3. 创建物化视图用于复杂统计查询
CREATE VIEW user_read_statistics AS
SELECT
user_id,
COUNT(*) as total_messages,
SUM(CASE WHEN is_read = 1 THEN 1 ELSE 0 END) as read_messages,
SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END) as unread_messages,
AVG(CASE WHEN is_read = 1 AND read_at IS NOT NULL
THEN TIMESTAMPDIFF(SECOND, created_at, read_at)
ELSE NULL END) as avg_read_delay_seconds
FROM message_read_status
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY user_id;

(二)已读状态服务实现

ReadStatusService核心实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
// 已读状态服务:处理消息阅读状态的核心业务逻辑
class ReadStatusService {
constructor(databaseService, cacheService, messageQueue) {
this.db = databaseService;
this.cache = cacheService;
this.mq = messageQueue;

// 批处理配置
this.batchSize = 1000;
this.batchTimeout = 5000; // 5秒
this.pendingBatch = new Map();

this.initializeBatchProcessor();
}

// 初始化消息的已读状态
async initializeReadStatus(messageId, groupMembers, senderId) {
try {
// 1. 准备批量插入数据
const statusRecords = groupMembers.map(member => ({
message_id: messageId,
user_id: member.userId,
is_read: member.userId === senderId ? 1 : 0, // 发送者默认已读
read_at: member.userId === senderId ? new Date() : null,
created_at: new Date()
}));

// 2. 批量插入已读状态记录
await this.db.batchInsert('message_read_status', statusRecords);

// 3. 初始化汇总统计
const readCount = senderId ? 1 : 0;
const totalCount = groupMembers.length;

await this.db.insert('message_read_summary', {
message_id: messageId,
total_count: totalCount,
read_count: readCount,
unread_count: totalCount - readCount,
last_read_at: senderId ? new Date() : null
});

// 4. 初始化缓存
await this.initializeReadStatusCache(messageId, statusRecords);

console.log(`已初始化消息${messageId}的已读状态,共${totalCount}人`);

} catch (error) {
console.error('初始化已读状态失败:', error);
throw error;
}
}

// 标记消息为已读
async markAsRead(messageId, userId) {
try {
// 1. 检查当前状态
const currentStatus = await this.getReadStatus(messageId, userId);
if (currentStatus && currentStatus.is_read) {
return; // 已经是已读状态
}

const readTime = new Date();

// 2. 更新数据库状态
const updateResult = await this.db.update(
'message_read_status',
{
is_read: 1,
read_at: readTime,
updated_at: readTime
},
{
message_id: messageId,
user_id: userId,
is_read: 0 // 只更新未读的记录
}
);

if (updateResult.affectedRows > 0) {
// 3. 更新汇总统计
await this.updateReadSummary(messageId, 1); // 已读数+1

// 4. 更新缓存
await this.updateReadStatusCache(messageId, userId, true, readTime);

// 5. 异步发送统计事件
this.mq.publish('read_status.updated', {
messageId: messageId,
userId: userId,
isRead: true,
readAt: readTime
});

return true;
}

return false;

} catch (error) {
console.error('标记已读失败:', error);
throw error;
}
}

// 批量标记已读
async batchMarkAsRead(messageIds, userId, transaction = null) {
const trx = transaction || this.db;

try {
const readTime = new Date();

// 1. 批量更新数据库
const updateSql = `
UPDATE message_read_status
SET is_read = 1, read_at = ?, updated_at = ?
WHERE message_id IN (${messageIds.map(() => '?').join(',')})
AND user_id = ? AND is_read = 0
`;

const updateResult = await trx.query(updateSql, [
readTime, readTime, ...messageIds, userId
]);

if (updateResult.affectedRows > 0) {
// 2. 批量更新汇总统计
const summaryUpdatePromises = messageIds.map(messageId =>
this.updateReadSummary(messageId, 1, trx)
);
await Promise.all(summaryUpdatePromises);

// 3. 批量更新缓存
const cacheUpdatePromises = messageIds.map(messageId =>
this.updateReadStatusCache(messageId, userId, true, readTime)
);
await Promise.all(cacheUpdatePromises);

// 4. 发送批量更新事件
this.mq.publish('read_status.batch_updated', {
messageIds: messageIds,
userId: userId,
readAt: readTime,
count: updateResult.affectedRows
});
}

return updateResult.affectedRows;

} catch (error) {
console.error('批量标记已读失败:', error);
throw error;
}
}

// 获取消息的已读状态汇总
async getReadStatusSummary(messageId) {
try {
// 1. 优先从缓存获取
const cacheKey = `read_summary:${messageId}`;
let summary = await this.cache.get(cacheKey);

if (!summary) {
// 2. 缓存未命中,从数据库获取
summary = await this.db.selectOne(
'message_read_summary',
{ message_id: messageId }
);

if (!summary) {
// 3. 汇总表没有数据,实时计算
summary = await this.calculateReadSummary(messageId);
}

// 4. 更新缓存
await this.cache.set(cacheKey, summary, 300); // 缓存5分钟
}

return {
messageId: messageId,
totalCount: summary.total_count,
readCount: summary.read_count,
unreadCount: summary.unread_count,
lastReadAt: summary.last_read_at,
readUsers: await this.getReadUsers(messageId) // 获取已读用户列表
};

} catch (error) {
console.error('获取已读状态汇总失败:', error);
throw error;
}
}

// 实时计算已读状态汇总
async calculateReadSummary(messageId) {
try {
const sql = `
SELECT
COUNT(*) as total_count,
SUM(CASE WHEN is_read = 1 THEN 1 ELSE 0 END) as read_count,
SUM(CASE WHEN is_read = 0 THEN 1 ELSE 0 END) as unread_count,
MAX(read_at) as last_read_at
FROM message_read_status
WHERE message_id = ?
`;

const result = await this.db.query(sql, [messageId]);
const summary = result[0];

// 更新或插入汇总表
await this.db.insertOrUpdate('message_read_summary', {
message_id: messageId,
total_count: summary.total_count,
read_count: summary.read_count,
unread_count: summary.unread_count,
last_read_at: summary.last_read_at
});

return summary;

} catch (error) {
console.error('计算已读状态汇总失败:', error);
throw error;
}
}

// 获取已读用户列表
async getReadUsers(messageId, limit = 100) {
try {
const cacheKey = `read_users:${messageId}`;
let readUsers = await this.cache.get(cacheKey);

if (!readUsers) {
const sql = `
SELECT mrs.user_id, mrs.read_at, u.username, u.avatar_url
FROM message_read_status mrs
LEFT JOIN users u ON mrs.user_id = u.user_id
WHERE mrs.message_id = ? AND mrs.is_read = 1
ORDER BY mrs.read_at ASC
LIMIT ?
`;

readUsers = await this.db.query(sql, [messageId, limit]);

// 缓存已读用户列表
await this.cache.set(cacheKey, readUsers, 60); // 缓存1分钟
}

return readUsers;

} catch (error) {
console.error('获取已读用户列表失败:', error);
return [];
}
}

// 获取用户的未读消息
async getUserUnreadMessages(userId, limit = 100) {
try {
const sql = `
SELECT m.message_id, m.group_id, m.sender_id, m.content,
m.created_at, mrs.created_at as status_created_at
FROM message_read_status mrs
JOIN messages m ON mrs.message_id = m.message_id
WHERE mrs.user_id = ? AND mrs.is_read = 0 AND m.is_deleted = 0
ORDER BY m.created_at DESC
LIMIT ?
`;

return await this.db.query(sql, [userId, limit]);

} catch (error) {
console.error('获取用户未读消息失败:', error);
return [];
}
}

// 更新已读状态汇总
async updateReadSummary(messageId, readCountDelta, transaction = null) {
const trx = transaction || this.db;

try {
const sql = `
UPDATE message_read_summary
SET read_count = read_count + ?,
unread_count = unread_count - ?,
last_read_at = CASE WHEN ? > 0 THEN NOW() ELSE last_read_at END,
updated_at = NOW()
WHERE message_id = ?
`;

await trx.query(sql, [readCountDelta, readCountDelta, readCountDelta, messageId]);

// 清除相关缓存
await this.cache.delete(`read_summary:${messageId}`);
await this.cache.delete(`read_users:${messageId}`);

} catch (error) {
console.error('更新已读状态汇总失败:', error);
throw error;
}
}

// 初始化已读状态缓存
async initializeReadStatusCache(messageId, statusRecords) {
try {
const pipeline = this.cache.pipeline();

// 缓存每个用户的状态
statusRecords.forEach(record => {
const userStatusKey = `read_status:${messageId}:${record.user_id}`;
pipeline.setex(userStatusKey, 3600, JSON.stringify({
isRead: record.is_read === 1,
readAt: record.read_at
}));
});

// 缓存汇总信息
const readCount = statusRecords.filter(r => r.is_read === 1).length;
const summaryKey = `read_summary:${messageId}`;
pipeline.setex(summaryKey, 300, JSON.stringify({
total_count: statusRecords.length,
read_count: readCount,
unread_count: statusRecords.length - readCount,
last_read_at: statusRecords.find(r => r.is_read === 1)?.read_at || null
}));

await pipeline.exec();

} catch (error) {
console.error('初始化已读状态缓存失败:', error);
}
}

// 更新已读状态缓存
async updateReadStatusCache(messageId, userId, isRead, readAt = null) {
try {
const userStatusKey = `read_status:${messageId}:${userId}`;

await this.cache.setex(userStatusKey, 3600, JSON.stringify({
isRead: isRead,
readAt: readAt
}));

// 清除汇总缓存,强制重新计算
await this.cache.delete(`read_summary:${messageId}`);
await this.cache.delete(`read_users:${messageId}`);

} catch (error) {
console.error('更新已读状态缓存失败:', error);
}
}

// 获取用户对消息的已读状态
async getReadStatus(messageId, userId) {
try {
// 优先从缓存获取
const cacheKey = `read_status:${messageId}:${userId}`;
let status = await this.cache.get(cacheKey);

if (status) {
return JSON.parse(status);
}

// 缓存未命中,从数据库获取
const dbStatus = await this.db.selectOne(
'message_read_status',
{ message_id: messageId, user_id: userId }
);

if (dbStatus) {
const statusData = {
isRead: dbStatus.is_read === 1,
readAt: dbStatus.read_at
};

// 更新缓存
await this.cache.setex(cacheKey, 3600, JSON.stringify(statusData));

return statusData;
}

return null;

} catch (error) {
console.error('获取已读状态失败:', error);
return null;
}
}

// 初始化批处理器
initializeBatchProcessor() {
// 定期处理批量操作
setInterval(() => {
this.processPendingBatches();
}, this.batchTimeout);
}

// 处理待处理的批量操作
async processPendingBatches() {
for (const [batchKey, batch] of this.pendingBatch.entries()) {
if (batch.items.length >= this.batchSize ||
Date.now() - batch.createdAt >= this.batchTimeout) {

try {
await this.executeBatch(batch);
this.pendingBatch.delete(batchKey);
} catch (error) {
console.error('执行批处理失败:', error);
}
}
}
}

// 执行批处理
async executeBatch(batch) {
switch (batch.type) {
case 'mark_read':
await this.executeBatchMarkRead(batch.items);
break;
case 'update_summary':
await this.executeBatchUpdateSummary(batch.items);
break;
default:
console.warn('未知的批处理类型:', batch.type);
}
}

// 执行批量标记已读
async executeBatchMarkRead(items) {
const groupedByMessage = items.reduce((groups, item) => {
if (!groups[item.messageId]) {
groups[item.messageId] = [];
}
groups[item.messageId].push(item.userId);
return groups;
}, {});

for (const [messageId, userIds] of Object.entries(groupedByMessage)) {
await this.batchMarkAsRead([messageId], userIds);
}
}

// 清理过期的已读状态数据
async cleanupExpiredStatus(expiredThreshold) {
try {
// 1. 删除过期的状态记录
const deleteStatusSql = `
DELETE mrs FROM message_read_status mrs
JOIN messages m ON mrs.message_id = m.message_id
WHERE m.created_at < ?
`;

const statusResult = await this.db.query(deleteStatusSql, [new Date(expiredThreshold)]);

// 2. 删除过期的汇总记录
const deleteSummarySql = `
DELETE mrs FROM message_read_summary mrs
JOIN messages m ON mrs.message_id = m.message_id
WHERE m.created_at < ?
`;

const summaryResult = await this.db.query(deleteSummarySql, [new Date(expiredThreshold)]);

console.log(`已清理${statusResult.affectedRows}条状态记录,${summaryResult.affectedRows}条汇总记录`);

} catch (error) {
console.error('清理过期数据失败:', error);
throw error;
}
}
}

三、实时推送与缓存优化

(一)实时推送服务实现

RealtimeService核心实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
// 实时推送服务:处理WebSocket连接和消息推送
class RealtimeService {
constructor() {
this.connections = new Map(); // 用户连接映射
this.groupConnections = new Map(); // 群组连接映射
this.connectionPool = new Map(); // 连接池
this.pushQueue = []; // 推送队列
this.rateLimiter = new RateLimiter(); // 限流器

this.initializeWebSocketServer();
this.startPushProcessor();
}

// 初始化WebSocket服务器
initializeWebSocketServer() {
this.wss = new WebSocketServer({
port: 8080,
perMessageDeflate: true, // 启用消息压缩
maxPayload: 16 * 1024, // 最大消息大小16KB
clientTracking: true // 跟踪客户端连接
});

this.wss.on('connection', (ws, request) => {
this.handleNewConnection(ws, request);
});

// 定期清理无效连接
setInterval(() => {
this.cleanupInactiveConnections();
}, 30000); // 30秒清理一次
}

// 处理新连接
async handleNewConnection(ws, request) {
try {
// 1. 验证用户身份
const token = this.extractTokenFromRequest(request);
const userInfo = await this.validateToken(token);

if (!userInfo) {
ws.close(1008, 'Invalid token');
return;
}

// 2. 创建连接对象
const connection = {
ws: ws,
userId: userInfo.userId,
userInfo: userInfo,
connectedAt: Date.now(),
lastPingAt: Date.now(),
isAlive: true,
subscriptions: new Set() // 订阅的群组
};

// 3. 注册连接
this.registerConnection(connection);

// 4. 设置连接事件处理
this.setupConnectionHandlers(connection);

// 5. 发送连接确认
this.sendToConnection(connection, {
type: 'connection_established',
userId: userInfo.userId,
timestamp: Date.now()
});

console.log(`用户${userInfo.userId}建立WebSocket连接`);

} catch (error) {
console.error('处理新连接失败:', error);
ws.close(1011, 'Server error');
}
}

// 注册连接
registerConnection(connection) {
const userId = connection.userId;

// 如果用户已有连接,关闭旧连接(单点登录)
if (this.connections.has(userId)) {
const oldConnection = this.connections.get(userId);
oldConnection.ws.close(1000, 'New connection established');
}

// 注册新连接
this.connections.set(userId, connection);

// 更新连接池统计
this.updateConnectionPoolStats();
}

// 设置连接事件处理
setupConnectionHandlers(connection) {
const ws = connection.ws;

// 消息处理
ws.on('message', (data) => {
this.handleMessage(connection, data);
});

// 连接关闭处理
ws.on('close', (code, reason) => {
this.handleConnectionClose(connection, code, reason);
});

// 错误处理
ws.on('error', (error) => {
console.error(`WebSocket错误 (用户${connection.userId}):`, error);
});

// 心跳检测
ws.on('pong', () => {
connection.lastPingAt = Date.now();
connection.isAlive = true;
});

// 启动心跳
this.startHeartbeat(connection);
}

// 处理客户端消息
async handleMessage(connection, data) {
try {
const message = JSON.parse(data.toString());

// 限流检查
if (!this.rateLimiter.checkLimit(connection.userId, 'message', 100, 60)) {
this.sendToConnection(connection, {
type: 'error',
message: 'Rate limit exceeded'
});
return;
}

switch (message.type) {
case 'subscribe_group':
await this.handleGroupSubscription(connection, message.groupId);
break;

case 'unsubscribe_group':
await this.handleGroupUnsubscription(connection, message.groupId);
break;

case 'mark_read':
await this.handleMarkRead(connection, message);
break;

case 'ping':
this.sendToConnection(connection, { type: 'pong' });
break;

default:
console.warn('未知消息类型:', message.type);
}

} catch (error) {
console.error('处理客户端消息失败:', error);
this.sendToConnection(connection, {
type: 'error',
message: 'Invalid message format'
});
}
}

// 处理群组订阅
async handleGroupSubscription(connection, groupId) {
try {
// 验证用户是否有权限访问该群组
const hasPermission = await this.checkGroupPermission(connection.userId, groupId);
if (!hasPermission) {
this.sendToConnection(connection, {
type: 'subscription_error',
groupId: groupId,
message: 'Permission denied'
});
return;
}

// 添加订阅
connection.subscriptions.add(groupId);

// 更新群组连接映射
if (!this.groupConnections.has(groupId)) {
this.groupConnections.set(groupId, new Set());
}
this.groupConnections.get(groupId).add(connection.userId);

// 发送订阅确认
this.sendToConnection(connection, {
type: 'subscription_confirmed',
groupId: groupId,
timestamp: Date.now()
});

console.log(`用户${connection.userId}订阅群组${groupId}`);

} catch (error) {
console.error('处理群组订阅失败:', error);
}
}

// 处理群组取消订阅
async handleGroupUnsubscription(connection, groupId) {
connection.subscriptions.delete(groupId);

const groupConnSet = this.groupConnections.get(groupId);
if (groupConnSet) {
groupConnSet.delete(connection.userId);
if (groupConnSet.size === 0) {
this.groupConnections.delete(groupId);
}
}

this.sendToConnection(connection, {
type: 'unsubscription_confirmed',
groupId: groupId
});
}

// 处理标记已读
async handleMarkRead(connection, message) {
try {
// 这里可以直接调用已读状态服务
// 实际实现中会通过消息队列异步处理
this.sendToConnection(connection, {
type: 'mark_read_confirmed',
messageId: message.messageId,
timestamp: Date.now()
});

} catch (error) {
console.error('处理标记已读失败:', error);
}
}

// 广播消息到群组
async broadcastMessage(groupId, message) {
const pushData = {
type: 'new_message',
groupId: groupId,
message: message,
timestamp: Date.now()
};

await this.pushToGroup(groupId, 'new_message', pushData);
}

// 推送消息到群组
async pushToGroup(groupId, eventType, data) {
const groupConnections = this.groupConnections.get(groupId);
if (!groupConnections || groupConnections.size === 0) {
return; // 群组没有在线用户
}

// 构建推送任务
const pushTasks = Array.from(groupConnections).map(userId => ({
userId: userId,
eventType: eventType,
data: data,
priority: this.getPushPriority(eventType),
createdAt: Date.now()
}));

// 添加到推送队列
this.pushQueue.push(...pushTasks);

// 如果队列过长,立即处理
if (this.pushQueue.length > 1000) {
await this.processPushQueue();
}
}

// 推送消息到单个用户
async pushToUser(userId, eventType, data) {
const connection = this.connections.get(userId);
if (!connection || !this.isConnectionAlive(connection)) {
return false;
}

const pushData = {
type: eventType,
data: data,
timestamp: Date.now()
};

return this.sendToConnection(connection, pushData);
}

// 发送消息到连接
sendToConnection(connection, data) {
try {
if (!this.isConnectionAlive(connection)) {
return false;
}

const message = JSON.stringify(data);
connection.ws.send(message);
return true;

} catch (error) {
console.error('发送消息失败:', error);
this.handleConnectionError(connection);
return false;
}
}

// 启动推送处理器
startPushProcessor() {
// 定期处理推送队列
setInterval(async () => {
if (this.pushQueue.length > 0) {
await this.processPushQueue();
}
}, 100); // 100ms处理一次
}

// 处理推送队列
async processPushQueue() {
if (this.pushQueue.length === 0) return;

// 按优先级排序
this.pushQueue.sort((a, b) => b.priority - a.priority);

// 批量处理
const batchSize = 500;
const batch = this.pushQueue.splice(0, batchSize);

// 并行推送
const pushPromises = batch.map(task => this.executePushTask(task));
await Promise.allSettled(pushPromises);
}

// 执行推送任务
async executePushTask(task) {
try {
const connection = this.connections.get(task.userId);
if (!connection || !this.isConnectionAlive(connection)) {
return;
}

// 检查推送限流
if (!this.rateLimiter.checkLimit(task.userId, 'push', 200, 60)) {
console.warn(`用户${task.userId}推送限流`);
return;
}

const pushData = {
type: task.eventType,
data: task.data,
timestamp: Date.now()
};

this.sendToConnection(connection, pushData);

} catch (error) {
console.error('执行推送任务失败:', error);
}
}

// 获取推送优先级
getPushPriority(eventType) {
const priorities = {
'new_message': 10,
'read_status_update': 8,
'typing_indicator': 5,
'user_status_change': 3,
'system_notification': 1
};

return priorities[eventType] || 1;
}

// 启动心跳检测
startHeartbeat(connection) {
const heartbeatInterval = setInterval(() => {
if (!connection.isAlive) {
console.log(`用户${connection.userId}心跳超时,关闭连接`);
clearInterval(heartbeatInterval);
connection.ws.terminate();
return;
}

connection.isAlive = false;
connection.ws.ping();

}, 30000); // 30秒心跳间隔

connection.heartbeatInterval = heartbeatInterval;
}

// 检查连接是否存活
isConnectionAlive(connection) {
return connection &&
connection.ws &&
connection.ws.readyState === WebSocket.OPEN &&
connection.isAlive;
}

// 处理连接关闭
handleConnectionClose(connection, code, reason) {
console.log(`用户${connection.userId}断开连接: ${code} ${reason}`);

// 清理连接
this.cleanupConnection(connection);
}

// 处理连接错误
handleConnectionError(connection) {
console.error(`连接错误,用户${connection.userId}`);
this.cleanupConnection(connection);
}

// 清理连接
cleanupConnection(connection) {
const userId = connection.userId;

// 从连接映射中移除
this.connections.delete(userId);

// 从群组连接中移除
connection.subscriptions.forEach(groupId => {
const groupConnSet = this.groupConnections.get(groupId);
if (groupConnSet) {
groupConnSet.delete(userId);
if (groupConnSet.size === 0) {
this.groupConnections.delete(groupId);
}
}
});

// 清理心跳定时器
if (connection.heartbeatInterval) {
clearInterval(connection.heartbeatInterval);
}

// 更新连接池统计
this.updateConnectionPoolStats();
}

// 清理无效连接
cleanupInactiveConnections() {
const now = Date.now();
const timeout = 5 * 60 * 1000; // 5分钟超时

for (const [userId, connection] of this.connections.entries()) {
if (now - connection.lastPingAt > timeout || !this.isConnectionAlive(connection)) {
console.log(`清理无效连接: 用户${userId}`);
this.cleanupConnection(connection);
}
}
}

// 更新连接池统计
updateConnectionPoolStats() {
const stats = {
totalConnections: this.connections.size,
totalGroups: this.groupConnections.size,
timestamp: Date.now()
};

// 可以发送到监控系统
console.log('连接池统计:', stats);
}

// 验证Token
async validateToken(token) {
try {
// 这里应该调用认证服务验证token
// 返回用户信息
return {
userId: 12345,
username: 'test_user'
};
} catch (error) {
console.error('Token验证失败:', error);
return null;
}
}

// 从请求中提取Token
extractTokenFromRequest(request) {
const url = new URL(request.url, 'http://localhost');
return url.searchParams.get('token');
}

// 检查群组权限
async checkGroupPermission(userId, groupId) {
// 这里应该调用权限服务检查用户是否有权限访问群组
return true; // 简化实现
}

// 清理用户连接
async cleanupUserConnection(userId) {
const connection = this.connections.get(userId);
if (connection) {
this.cleanupConnection(connection);
}
}

// 获取连接统计
getConnectionStats() {
return {
totalConnections: this.connections.size,
totalGroups: this.groupConnections.size,
queueLength: this.pushQueue.length,
timestamp: Date.now()
};
}
}

(二)缓存服务与性能优化

CacheService核心实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
// 缓存服务:Redis集群缓存管理
class CacheService {
constructor() {
this.redis = this.initializeRedisCluster();
this.localCache = new Map(); // 本地缓存
this.cacheStats = {
hits: 0,
misses: 0,
sets: 0,
deletes: 0
};

this.initializeCacheOptimizations();
}

// 初始化Redis集群
initializeRedisCluster() {
const Redis = require('ioredis');

// Redis集群配置
const cluster = new Redis.Cluster([
{ host: 'redis-node-1', port: 6379 },
{ host: 'redis-node-2', port: 6379 },
{ host: 'redis-node-3', port: 6379 }
], {
redisOptions: {
password: process.env.REDIS_PASSWORD,
db: 0
},
enableOfflineQueue: false,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3,
lazyConnect: true
});

// 错误处理
cluster.on('error', (error) => {
console.error('Redis集群错误:', error);
});

cluster.on('connect', () => {
console.log('Redis集群连接成功');
});

return cluster;
}

// 获取缓存(支持多级缓存)
async get(key) {
try {
// 1. 先检查本地缓存
if (this.localCache.has(key)) {
const localData = this.localCache.get(key);
if (localData.expireAt > Date.now()) {
this.cacheStats.hits++;
return localData.value;
} else {
this.localCache.delete(key);
}
}

// 2. 检查Redis缓存
const redisValue = await this.redis.get(key);
if (redisValue !== null) {
this.cacheStats.hits++;

// 解析JSON数据
let parsedValue;
try {
parsedValue = JSON.parse(redisValue);
} catch (e) {
parsedValue = redisValue;
}

// 更新本地缓存(热点数据)
this.setLocalCache(key, parsedValue, 60000); // 本地缓存1分钟

return parsedValue;
}

this.cacheStats.misses++;
return null;

} catch (error) {
console.error('获取缓存失败:', error);
this.cacheStats.misses++;
return null;
}
}

// 设置缓存
async set(key, value, ttl = 3600) {
try {
const serializedValue = typeof value === 'string' ? value : JSON.stringify(value);

if (ttl > 0) {
await this.redis.setex(key, ttl, serializedValue);
} else {
await this.redis.set(key, serializedValue);
}

// 同时更新本地缓存(对于热点数据)
if (this.isHotKey(key)) {
this.setLocalCache(key, value, Math.min(ttl * 1000, 300000)); // 最多本地缓存5分钟
}

this.cacheStats.sets++;
return true;

} catch (error) {
console.error('设置缓存失败:', error);
return false;
}
}

// 设置缓存(带过期时间)
async setex(key, ttl, value) {
return await this.set(key, value, ttl);
}

// 删除缓存
async delete(key) {
try {
// 删除Redis缓存
await this.redis.del(key);

// 删除本地缓存
this.localCache.delete(key);

this.cacheStats.deletes++;
return true;

} catch (error) {
console.error('删除缓存失败:', error);
return false;
}
}

// 批量获取
async mget(keys) {
try {
const results = await this.redis.mget(keys);
const parsedResults = results.map(result => {
if (result === null) return null;
try {
return JSON.parse(result);
} catch (e) {
return result;
}
});

this.cacheStats.hits += parsedResults.filter(r => r !== null).length;
this.cacheStats.misses += parsedResults.filter(r => r === null).length;

return parsedResults;

} catch (error) {
console.error('批量获取缓存失败:', error);
return keys.map(() => null);
}
}

// 批量设置
async mset(keyValuePairs, ttl = 3600) {
try {
const pipeline = this.redis.pipeline();

for (const [key, value] of keyValuePairs) {
const serializedValue = typeof value === 'string' ? value : JSON.stringify(value);
if (ttl > 0) {
pipeline.setex(key, ttl, serializedValue);
} else {
pipeline.set(key, serializedValue);
}
}

await pipeline.exec();
this.cacheStats.sets += keyValuePairs.length;
return true;

} catch (error) {
console.error('批量设置缓存失败:', error);
return false;
}
}

// 原子递增
async incr(key, delta = 1) {
try {
if (delta === 1) {
return await this.redis.incr(key);
} else {
return await this.redis.incrby(key, delta);
}
} catch (error) {
console.error('递增操作失败:', error);
return null;
}
}

// 原子递减
async decr(key, delta = 1) {
try {
if (delta === 1) {
return await this.redis.decr(key);
} else {
return await this.redis.decrby(key, delta);
}
} catch (error) {
console.error('递减操作失败:', error);
return null;
}
}

// 设置本地缓存
setLocalCache(key, value, ttl) {
const expireAt = Date.now() + ttl;
this.localCache.set(key, { value, expireAt });

// 限制本地缓存大小
if (this.localCache.size > 10000) {
this.cleanupLocalCache();
}
}

// 清理本地缓存
cleanupLocalCache() {
const now = Date.now();
const keysToDelete = [];

for (const [key, data] of this.localCache.entries()) {
if (data.expireAt <= now) {
keysToDelete.push(key);
}
}

keysToDelete.forEach(key => this.localCache.delete(key));

// 如果还是太大,删除最老的数据
if (this.localCache.size > 8000) {
const entries = Array.from(this.localCache.entries());
entries.sort((a, b) => a[1].expireAt - b[1].expireAt);

const toDelete = entries.slice(0, 2000);
toDelete.forEach(([key]) => this.localCache.delete(key));
}
}

// 判断是否为热点key
isHotKey(key) {
// 已读状态相关的key通常是热点数据
return key.includes('read_status') ||
key.includes('read_summary') ||
key.includes('group_members');
}

// 获取群组成员(带缓存优化)
async getGroupMembers(groupId) {
const key = `group_members:${groupId}`;
return await this.get(key);
}

// 设置群组成员
async setGroupMembers(groupId, members, ttl = 3600) {
const key = `group_members:${groupId}`;
return await this.set(key, members, ttl);
}

// 删除群组成员缓存
async deleteGroupMembers(groupId) {
const key = `group_members:${groupId}`;
return await this.delete(key);
}

// 批量更新已读状态缓存
async batchUpdateReadStatusCache(messageIds, userId, isRead) {
try {
const pipeline = this.redis.pipeline();

messageIds.forEach(messageId => {
const key = `read_status:${messageId}:${userId}`;
const value = JSON.stringify({
isRead: isRead,
readAt: isRead ? new Date() : null
});
pipeline.setex(key, 3600, value);
});

await pipeline.exec();
return true;

} catch (error) {
console.error('批量更新已读状态缓存失败:', error);
return false;
}
}

// 获取脏数据(需要同步到数据库的数据)
async getDirtyReadStatusData() {
try {
const dirtyKeys = await this.redis.smembers('dirty_read_status_keys');
if (dirtyKeys.length === 0) return [];

const dirtyData = [];
const pipeline = this.redis.pipeline();

dirtyKeys.forEach(key => {
pipeline.get(key);
});

const results = await pipeline.exec();

results.forEach((result, index) => {
if (result[1]) {
try {
const data = JSON.parse(result[1]);
dirtyData.push({
key: dirtyKeys[index],
data: data
});
} catch (e) {
console.error('解析脏数据失败:', e);
}
}
});

return dirtyData;

} catch (error) {
console.error('获取脏数据失败:', error);
return [];
}
}

// 清理脏标记
async clearDirtyFlags(keys) {
try {
if (keys.length === 0) return;

await this.redis.srem('dirty_read_status_keys', ...keys);

} catch (error) {
console.error('清理脏标记失败:', error);
}
}

// 标记数据为脏
async markAsDirty(key) {
try {
await this.redis.sadd('dirty_read_status_keys', key);
} catch (error) {
console.error('标记脏数据失败:', error);
}
}

// 初始化缓存优化
initializeCacheOptimizations() {
// 定期清理本地缓存
setInterval(() => {
this.cleanupLocalCache();
}, 60000); // 每分钟清理一次

// 定期输出缓存统计
setInterval(() => {
this.logCacheStats();
}, 300000); // 每5分钟输出一次统计

// 预热热点数据
this.warmupHotData();
}

// 预热热点数据
async warmupHotData() {
try {
// 预热活跃群组的成员信息
const activeGroups = await this.getActiveGroups();

const warmupPromises = activeGroups.map(async (groupId) => {
const key = `group_members:${groupId}`;
const cached = await this.get(key);
if (!cached) {
// 从数据库加载并缓存
// const members = await this.databaseService.getGroupMembers(groupId);
// await this.setGroupMembers(groupId, members);
}
});

await Promise.all(warmupPromises);
console.log(`已预热${activeGroups.length}个活跃群组的数据`);

} catch (error) {
console.error('预热热点数据失败:', error);
}
}

// 获取活跃群组列表
async getActiveGroups() {
// 这里应该从数据库或其他服务获取活跃群组列表
// 简化实现,返回固定列表
return [1001, 1002, 1003, 1004, 1005];
}

// 输出缓存统计
logCacheStats() {
const hitRate = this.cacheStats.hits / (this.cacheStats.hits + this.cacheStats.misses) * 100;

console.log('缓存统计:', {
hits: this.cacheStats.hits,
misses: this.cacheStats.misses,
hitRate: hitRate.toFixed(2) + '%',
sets: this.cacheStats.sets,
deletes: this.cacheStats.deletes,
localCacheSize: this.localCache.size
});
}

// 获取缓存统计
getCacheStats() {
const hitRate = this.cacheStats.hits / (this.cacheStats.hits + this.cacheStats.misses) * 100;

return {
hits: this.cacheStats.hits,
misses: this.cacheStats.misses,
hitRate: isNaN(hitRate) ? 0 : hitRate,
sets: this.cacheStats.sets,
deletes: this.cacheStats.deletes,
localCacheSize: this.localCache.size,
timestamp: Date.now()
};
}

// 清空所有缓存
async flushAll() {
try {
await this.redis.flushall();
this.localCache.clear();

// 重置统计
this.cacheStats = {
hits: 0,
misses: 0,
sets: 0,
deletes: 0
};

return true;

} catch (error) {
console.error('清空缓存失败:', error);
return false;
}
}

// 创建管道操作
pipeline() {
return this.redis.pipeline();
}

// 关闭连接
async close() {
try {
await this.redis.quit();
this.localCache.clear();
} catch (error) {
console.error('关闭Redis连接失败:', error);
}
}
}

// 限流器实现
class RateLimiter {
constructor() {
this.windows = new Map(); // 滑动窗口
}

// 检查限流
checkLimit(key, action, limit, windowSeconds) {
const windowKey = `${key}:${action}`;
const now = Date.now();
const windowStart = now - (windowSeconds * 1000);

if (!this.windows.has(windowKey)) {
this.windows.set(windowKey, []);
}

const window = this.windows.get(windowKey);

// 清理过期记录
while (window.length > 0 && window[0] < windowStart) {
window.shift();
}

// 检查是否超过限制
if (window.length >= limit) {
return false;
}

// 记录当前请求
window.push(now);

return true;
}

// 清理过期窗口
cleanup() {
const now = Date.now();
const expiredThreshold = now - (3600 * 1000); // 1小时前

for (const [key, window] of this.windows.entries()) {
if (window.length === 0 || window[window.length - 1] < expiredThreshold) {
this.windows.delete(key);
}
}
}
}

四、技术总结与架构优化

(一)系统架构总结

通过深入分析企业微信群聊已读未读功能的实现原理,我们可以看到这是一个典型的分布式系统架构:

核心技术架构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
企业微信已读未读系统技术栈:

数据存储层:
├── MySQL集群:消息和状态数据的持久化存储
├── Redis集群:高性能缓存和实时数据
├── 分库分表:海量数据的水平扩展
└── 读写分离:查询性能优化

业务逻辑层:
├── 消息服务:处理消息的发送和管理
├── 状态服务:管理已读未读状态的核心逻辑
├── 推送服务:实时消息推送和状态同步
└── 权限服务:用户权限验证和群组管理

通信协议层:
├── WebSocket:实时双向通信
├── HTTP/REST:标准API接口
├── 消息队列:异步任务处理
└── RPC调用:服务间通信

基础设施层:
├── 负载均衡:流量分发和故障转移
├── 服务发现:动态服务注册和发现
├── 监控告警:系统健康状态监控
└── 容器化:Docker + Kubernetes部署

关键技术特点:

  1. 分布式架构:微服务化设计,各组件独立部署和扩展
  2. 数据一致性:通过分布式锁和事务保证数据一致性
  3. 高性能缓存:多级缓存架构,支持热点数据快速访问
  4. 实时通信:WebSocket长连接,毫秒级状态推送
  5. 弹性扩展:支持水平扩展,应对用户增长

(二)性能优化策略

数据库优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 1. 索引优化策略
-- 复合索引:优化多条件查询
CREATE INDEX idx_message_user_read ON message_read_status (message_id, user_id, is_read);

-- 覆盖索引:避免回表查询
CREATE INDEX idx_read_summary_covering ON message_read_status (message_id, is_read)
INCLUDE (user_id, read_at);

-- 2. 分区策略优化
-- 按时间分区:便于历史数据清理
ALTER TABLE message_read_status
PARTITION BY RANGE (UNIX_TIMESTAMP(created_at)) (
PARTITION p202501 VALUES LESS THAN (UNIX_TIMESTAMP('2025-02-01')),
PARTITION p202502 VALUES LESS THAN (UNIX_TIMESTAMP('2025-03-01')),
-- 自动分区管理
);

-- 3. 查询优化
-- 使用EXISTS替代IN,提高大数据量查询性能
SELECT m.message_id, m.content
FROM messages m
WHERE EXISTS (
SELECT 1 FROM message_read_status mrs
WHERE mrs.message_id = m.message_id
AND mrs.user_id = ? AND mrs.is_read = 0
) LIMIT 50;

缓存优化策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 缓存分层策略
class OptimizedCacheStrategy {
constructor() {
// L1缓存:本地内存缓存(最热数据)
this.l1Cache = new LRUCache({ max: 10000, ttl: 60000 });

// L2缓存:Redis缓存(热数据)
this.l2Cache = new RedisCache();

// L3缓存:数据库(冷数据)
this.l3Cache = new DatabaseCache();
}

// 智能缓存获取
async smartGet(key) {
// L1缓存命中
let value = this.l1Cache.get(key);
if (value !== undefined) {
return value;
}

// L2缓存命中
value = await this.l2Cache.get(key);
if (value !== null) {
// 提升到L1缓存
this.l1Cache.set(key, value);
return value;
}

// L3数据库查询
value = await this.l3Cache.get(key);
if (value !== null) {
// 同时缓存到L1和L2
this.l1Cache.set(key, value);
await this.l2Cache.set(key, value, 3600);
return value;
}

return null;
}

// 预测性缓存预热
async predictivePreheat(userId) {
// 基于用户行为预测可能访问的数据
const userGroups = await this.getUserActiveGroups(userId);
const recentMessages = await this.getRecentMessages(userGroups, 100);

// 预热相关的已读状态数据
const preheatPromises = recentMessages.map(async (message) => {
const statusKey = `read_status:${message.id}:${userId}`;
await this.smartGet(statusKey);
});

await Promise.all(preheatPromises);
}
}

实时推送优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 智能推送策略
class SmartPushStrategy {
constructor() {
this.userActivityTracker = new UserActivityTracker();
this.pushPriorityCalculator = new PushPriorityCalculator();
}

// 智能推送决策
async shouldPush(userId, eventType, data) {
// 1. 检查用户活跃状态
const userActivity = await this.userActivityTracker.getUserActivity(userId);
if (!userActivity.isActive) {
return false; // 用户不活跃,不推送
}

// 2. 计算推送优先级
const priority = this.pushPriorityCalculator.calculate(eventType, data, userActivity);
if (priority < 5) {
return false; // 优先级太低,不推送
}

// 3. 检查推送频率限制
const canPush = await this.checkRateLimit(userId, eventType);
if (!canPush) {
return false; // 超过频率限制
}

return true;
}

// 批量推送优化
async batchPush(pushTasks) {
// 按用户分组
const userGroups = this.groupByUser(pushTasks);

// 并行推送,但限制并发数
const concurrency = 100;
const chunks = this.chunkArray(Object.entries(userGroups), concurrency);

for (const chunk of chunks) {
const promises = chunk.map(([userId, tasks]) =>
this.pushToUser(userId, tasks)
);
await Promise.all(promises);
}
}
}

(三)监控与运维

系统监控指标:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// 监控指标收集
class SystemMonitor {
constructor() {
this.metrics = {
// 业务指标
messagesSent: 0,
readStatusUpdates: 0,
activeConnections: 0,

// 性能指标
avgResponseTime: 0,
cacheHitRate: 0,
dbQueryTime: 0,

// 错误指标
errorRate: 0,
timeoutRate: 0,
connectionDrops: 0
};

this.startMetricsCollection();
}

// 关键性能指标监控
collectKPIs() {
return {
// 消息处理性能
messageProcessingRate: this.calculateMessageRate(),
avgMessageLatency: this.calculateAvgLatency(),

// 已读状态性能
readStatusUpdateRate: this.calculateReadStatusRate(),
readStatusAccuracy: this.calculateAccuracy(),

// 系统资源使用
cpuUsage: this.getCPUUsage(),
memoryUsage: this.getMemoryUsage(),
diskIOPS: this.getDiskIOPS(),

// 网络性能
networkThroughput: this.getNetworkThroughput(),
connectionCount: this.getActiveConnections(),

// 数据库性能
dbConnectionPool: this.getDBPoolStatus(),
slowQueryCount: this.getSlowQueryCount(),

// 缓存性能
cacheHitRate: this.getCacheHitRate(),
cacheMemoryUsage: this.getCacheMemoryUsage()
};
}

// 告警规则配置
setupAlerts() {
const alertRules = [
{
name: 'high_error_rate',
condition: 'error_rate > 0.01', // 错误率超过1%
severity: 'critical',
action: 'immediate_notification'
},
{
name: 'low_cache_hit_rate',
condition: 'cache_hit_rate < 0.8', // 缓存命中率低于80%
severity: 'warning',
action: 'cache_optimization_suggestion'
},
{
name: 'high_response_time',
condition: 'avg_response_time > 1000', // 平均响应时间超过1秒
severity: 'warning',
action: 'performance_investigation'
},
{
name: 'connection_drops',
condition: 'connection_drops > 100', // 连接断开数过多
severity: 'critical',
action: 'network_investigation'
}
];

return alertRules;
}
}

(四)技术展望与优化方向

未来优化方向:

  1. AI智能优化:使用机器学习优化推送策略和缓存预热
  2. 边缘计算:在边缘节点部署缓存,减少延迟
  3. 数据压缩:优化数据传输和存储的压缩算法
  4. 自适应架构:根据负载自动调整系统配置
  5. 区块链技术:探索去中心化的消息状态同步

技术创新点:

  • 状态压缩算法:使用位图等数据结构压缩存储空间
  • 智能预测推送:基于用户行为预测推送需求
  • 动态分片策略:根据数据热度动态调整分片策略
  • 多活架构:跨地域的多活部署,提升可用性

企业微信的群聊已读未读功能展示了现代分布式系统在处理海量实时数据时的技术复杂性和创新性。通过合理的架构设计、性能优化和运维监控,可以在保证功能完整性的同时,提供优秀的用户体验。这种技术实现不仅适用于即时通讯场景,也为其他需要实时状态同步的应用提供了宝贵的参考。

参考资料