一、游戏抽奖系统概述

(一)抽奖系统的核心作用

  • 商业价值

    • 收入驱动:抽奖是手游的主要收入来源之一
    • 用户留存:通过奖励机制提高用户粘性
    • 付费转化:免费用户向付费用户的转化工具
    • 生命周期延长:持续的抽奖活动延长游戏生命周期
  • 游戏设计价值

    • 资源分发:控制游戏内稀有资源的分发速度
    • 平衡调节:通过概率调整维持游戏平衡
    • 社交互动:抽奖结果分享增加社交传播
    • 成就感营造:稀有物品获得带来的满足感
  • 技术挑战

    • 公平性保证:确保抽奖结果的真实随机性
    • 防作弊:防止外挂、内部作弊等行为
    • 高并发:支持大量用户同时抽奖
    • 数据一致性:确保奖品库存和用户数据一致

(二)抽奖系统分类

  • 按触发方式分类

    • 付费抽奖:钻石、代币等虚拟货币抽奖
    • 免费抽奖:每日免费次数、活动赠送
    • 任务抽奖:完成特定任务后的奖励抽奖
    • 时间抽奖:定时刷新的限时抽奖
  • 按奖品类型分类

    • 装备抽奖:武器、防具、饰品等装备
    • 角色抽奖:英雄、宠物、卡牌等角色
    • 资源抽奖:金币、经验、材料等资源
    • 道具抽奖:消耗品、强化材料等道具
  • 按概率机制分类

    • 固定概率:每次抽奖概率固定不变
    • 保底机制:一定次数内必出指定品质
    • 概率UP:特定时间内某些奖品概率提升
    • 累积概率:连续未中奖时概率逐渐提升

二、核心概率算法实现

(一)基础随机数生成

  • 伪随机数生成器
    • 线性同余生成器(LCG):简单快速,适合基础应用
    • 梅森旋转算法(MT19937):周期长,分布均匀
    • 密码学安全随机数:防止预测,适合高安全要求
    • 硬件随机数:基于物理噪声,真正随机
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import random
import hashlib
import time
from typing import List, Dict, Tuple
from dataclasses import dataclass
from enum import Enum

class RarityLevel(Enum):
"""稀有度等级"""
COMMON = 1 # 普通 - 白色
UNCOMMON = 2 # 不常见 - 绿色
RARE = 3 # 稀有 - 蓝色
EPIC = 4 # 史诗 - 紫色
LEGENDARY = 5 # 传说 - 橙色
MYTHIC = 6 # 神话 - 红色

@dataclass
class LootItem:
"""抽奖物品"""
id: int
name: str
rarity: RarityLevel
base_probability: float # 基础概率
current_probability: float # 当前概率
max_count: int = -1 # 最大数量,-1表示无限
current_count: int = 0 # 当前已发放数量

class SecureRandomGenerator:
"""安全随机数生成器"""

def __init__(self, seed: str = None):
if seed is None:
# 使用时间戳和系统随机数作为种子
seed = str(time.time()) + str(random.random())

self.seed = seed
self.counter = 0

def next_random(self) -> float:
"""生成下一个随机数 [0, 1)"""
# 使用SHA256哈希生成伪随机数
self.counter += 1
hash_input = f"{self.seed}_{self.counter}".encode()
hash_result = hashlib.sha256(hash_input).hexdigest()

# 取哈希值的前16位转换为浮点数
hex_value = int(hash_result[:16], 16)
max_value = 16 ** 16

return hex_value / max_value

def random_int(self, min_val: int, max_val: int) -> int:
"""生成指定范围的随机整数"""
range_size = max_val - min_val + 1
return min_val + int(self.next_random() * range_size)

class LootBoxSystem:
"""抽奖箱系统"""

def __init__(self, box_id: str, items: List[LootItem]):
self.box_id = box_id
self.items = items
self.total_probability = sum(item.current_probability for item in items)
self.random_generator = SecureRandomGenerator()

# 保底机制配置
self.pity_config = {
RarityLevel.EPIC: 50, # 50抽必出紫色
RarityLevel.LEGENDARY: 90, # 90抽必出橙色
RarityLevel.MYTHIC: 300 # 300抽必出红色
}

# 用户抽奖历史(实际应用中应存储在数据库)
self.user_pity_counters = {}

def draw_single(self, user_id: str) -> LootItem:
"""单次抽奖"""
# 检查保底机制
guaranteed_item = self._check_pity_system(user_id)
if guaranteed_item:
self._reset_pity_counter(user_id, guaranteed_item.rarity)
return guaranteed_item

# 正常概率抽奖
random_value = self.random_generator.next_random()
cumulative_probability = 0.0

# 按概率选择物品
for item in self.items:
cumulative_probability += item.current_probability / self.total_probability

if random_value <= cumulative_probability:
# 检查物品数量限制
if item.max_count > 0 and item.current_count >= item.max_count:
continue # 跳过已达上限的物品

# 更新抽奖计数器
self._update_pity_counters(user_id)
self._reset_pity_counter(user_id, item.rarity)

# 更新物品数量
item.current_count += 1

return item

# 兜底:返回最后一个物品(通常是最低级别)
return self.items[-1]

def draw_multiple(self, user_id: str, count: int) -> List[LootItem]:
"""多次抽奖"""
results = []
for _ in range(count):
item = self.draw_single(user_id)
results.append(item)
return results

def _check_pity_system(self, user_id: str) -> LootItem:
"""检查保底机制"""
if user_id not in self.user_pity_counters:
self.user_pity_counters[user_id] = {rarity: 0 for rarity in RarityLevel}

counters = self.user_pity_counters[user_id]

# 按稀有度从高到低检查保底
for rarity in [RarityLevel.MYTHIC, RarityLevel.LEGENDARY, RarityLevel.EPIC]:
if rarity in self.pity_config:
if counters[rarity] >= self.pity_config[rarity]:
# 触发保底,返回该稀有度的物品
eligible_items = [item for item in self.items if item.rarity == rarity]
if eligible_items:
return random.choice(eligible_items)

return None

def _update_pity_counters(self, user_id: str):
"""更新保底计数器"""
if user_id not in self.user_pity_counters:
self.user_pity_counters[user_id] = {rarity: 0 for rarity in RarityLevel}

# 所有稀有度计数器+1
for rarity in RarityLevel:
self.user_pity_counters[user_id][rarity] += 1

def _reset_pity_counter(self, user_id: str, obtained_rarity: RarityLevel):
"""重置保底计数器"""
if user_id not in self.user_pity_counters:
return

# 重置获得稀有度及以下等级的计数器
for rarity in RarityLevel:
if rarity.value <= obtained_rarity.value:
self.user_pity_counters[user_id][rarity] = 0

def get_probability_display(self, user_id: str) -> Dict:
"""获取概率显示信息(用于前端展示)"""
if user_id not in self.user_pity_counters:
self.user_pity_counters[user_id] = {rarity: 0 for rarity in RarityLevel}

counters = self.user_pity_counters[user_id]
display_info = {}

for rarity in RarityLevel:
items_of_rarity = [item for item in self.items if item.rarity == rarity]
if items_of_rarity:
total_prob = sum(item.current_probability for item in items_of_rarity)

display_info[rarity.name] = {
'probability': f"{total_prob:.3f}%",
'pity_count': counters.get(rarity, 0),
'pity_threshold': self.pity_config.get(rarity, 0),
'items': [{'name': item.name, 'probability': f"{item.current_probability:.3f}%"}
for item in items_of_rarity]
}

return display_info

# 使用示例
def create_sample_loot_box():
"""创建示例抽奖箱"""
items = [
# 神话级 - 0.1%
LootItem(1, "神话武器", RarityLevel.MYTHIC, 0.1, 0.1),

# 传说级 - 1.5%
LootItem(2, "传说剑", RarityLevel.LEGENDARY, 0.8, 0.8),
LootItem(3, "传说法杖", RarityLevel.LEGENDARY, 0.7, 0.7),

# 史诗级 - 8%
LootItem(4, "史诗护甲", RarityLevel.EPIC, 3.0, 3.0),
LootItem(5, "史诗饰品", RarityLevel.EPIC, 5.0, 5.0),

# 稀有级 - 20%
LootItem(6, "稀有材料", RarityLevel.RARE, 20.0, 20.0),

# 不常见 - 30%
LootItem(7, "强化石", RarityLevel.UNCOMMON, 30.0, 30.0),

# 普通 - 40.4%
LootItem(8, "金币", RarityLevel.COMMON, 40.4, 40.4),
]

return LootBoxSystem("premium_box", items)

# 测试抽奖系统
if __name__ == "__main__":
loot_box = create_sample_loot_box()
user_id = "test_user_001"

# 模拟100次抽奖
results = loot_box.draw_multiple(user_id, 100)

# 统计结果
rarity_counts = {}
for item in results:
rarity = item.rarity.name
rarity_counts[rarity] = rarity_counts.get(rarity, 0) + 1

print("100次抽奖结果统计:")
for rarity, count in rarity_counts.items():
print(f"{rarity}: {count}次 ({count}%)")

# 显示当前概率信息
print("\n当前概率信息:")
prob_info = loot_box.get_probability_display(user_id)
for rarity, info in prob_info.items():
print(f"{rarity}: {info['probability']}, 保底计数: {info['pity_count']}/{info['pity_threshold']}")

(二)高级概率机制

  • 动态概率调整
    • 时间衰减:长时间未抽到高级物品时提升概率
    • 用户行为分析:根据用户付费意愿调整概率
    • 库存控制:根据物品库存动态调整概率
    • 活动加成:特定时间段内概率提升
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
class DynamicProbabilitySystem:
"""动态概率系统"""

def __init__(self, base_loot_box: LootBoxSystem):
self.base_loot_box = base_loot_box
self.user_profiles = {} # 用户画像数据

def calculate_dynamic_probability(self, user_id: str, item: LootItem) -> float:
"""计算动态概率"""
base_prob = item.base_probability

# 获取用户画像
profile = self._get_user_profile(user_id)

# 1. 时间衰减加成
time_multiplier = self._calculate_time_multiplier(user_id, item.rarity)

# 2. 付费意愿加成
payment_multiplier = self._calculate_payment_multiplier(profile)

# 3. 库存控制
inventory_multiplier = self._calculate_inventory_multiplier(item)

# 4. 活动加成
event_multiplier = self._calculate_event_multiplier(item)

# 综合计算最终概率
final_probability = base_prob * time_multiplier * payment_multiplier * inventory_multiplier * event_multiplier

# 确保概率在合理范围内
return min(final_probability, base_prob * 5.0) # 最多5倍加成

def _get_user_profile(self, user_id: str) -> Dict:
"""获取用户画像"""
if user_id not in self.user_profiles:
self.user_profiles[user_id] = {
'total_spent': 0.0, # 总消费金额
'last_payment_time': 0, # 最后付费时间
'draw_count': 0, # 总抽奖次数
'last_rare_time': 0, # 最后获得稀有物品时间
'session_count': 0, # 会话次数
'avg_session_duration': 0 # 平均会话时长
}
return self.user_profiles[user_id]

def _calculate_time_multiplier(self, user_id: str, rarity: RarityLevel) -> float:
"""计算时间衰减加成"""
profile = self._get_user_profile(user_id)
current_time = time.time()

# 距离上次获得该稀有度物品的时间
time_since_last = current_time - profile.get('last_rare_time', current_time - 86400)

# 根据稀有度设置不同的时间阈值
time_thresholds = {
RarityLevel.EPIC: 3600, # 1小时
RarityLevel.LEGENDARY: 7200, # 2小时
RarityLevel.MYTHIC: 14400 # 4小时
}

threshold = time_thresholds.get(rarity, 3600)

if time_since_last > threshold:
# 超过阈值时间,开始增加概率
multiplier = 1.0 + min((time_since_last - threshold) / threshold * 0.5, 2.0)
return multiplier

return 1.0

def _calculate_payment_multiplier(self, profile: Dict) -> float:
"""计算付费意愿加成"""
total_spent = profile.get('total_spent', 0)
last_payment_time = profile.get('last_payment_time', 0)
current_time = time.time()

# 高付费用户给予一定加成
if total_spent > 1000: # 高付费用户
base_multiplier = 1.2
elif total_spent > 100: # 中付费用户
base_multiplier = 1.1
else: # 低付费用户
base_multiplier = 1.0

# 长时间未付费的用户给予诱导性加成
days_since_payment = (current_time - last_payment_time) / 86400
if days_since_payment > 7: # 7天未付费
base_multiplier *= 1.3

return base_multiplier

def _calculate_inventory_multiplier(self, item: LootItem) -> float:
"""计算库存控制加成"""
if item.max_count <= 0: # 无限制物品
return 1.0

# 计算库存剩余比例
remaining_ratio = (item.max_count - item.current_count) / item.max_count

if remaining_ratio < 0.1: # 库存不足10%
return 0.1 # 大幅降低概率
elif remaining_ratio < 0.3: # 库存不足30%
return 0.5 # 适度降低概率

return 1.0

def _calculate_event_multiplier(self, item: LootItem) -> float:
"""计算活动加成"""
# 这里可以根据当前活动配置返回不同的加成
# 示例:周末双倍概率
import datetime
current_time = datetime.datetime.now()

if current_time.weekday() >= 5: # 周六日
return 2.0

return 1.0

(三)保底机制实现

  • 硬保底:固定次数内必定获得指定品质物品
  • 软保底:接近保底次数时概率逐渐提升
  • 分层保底:不同稀有度有独立的保底计数
  • 重置机制:获得高品质物品后重置相应保底计数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class AdvancedPitySystem:
"""高级保底系统"""

def __init__(self):
# 保底配置
self.pity_configs = {
RarityLevel.EPIC: {
'hard_pity': 50, # 硬保底50抽
'soft_pity_start': 40, # 软保底从40抽开始
'soft_pity_increment': 0.06 # 每抽增加6%概率
},
RarityLevel.LEGENDARY: {
'hard_pity': 90,
'soft_pity_start': 75,
'soft_pity_increment': 0.06
},
RarityLevel.MYTHIC: {
'hard_pity': 300,
'soft_pity_start': 250,
'soft_pity_increment': 0.02
}
}

def calculate_pity_probability(self, user_id: str, rarity: RarityLevel,
base_probability: float, current_count: int) -> float:
"""计算保底概率"""
if rarity not in self.pity_configs:
return base_probability

config = self.pity_configs[rarity]

# 硬保底检查
if current_count >= config['hard_pity']:
return 1.0 # 100%概率

# 软保底检查
if current_count >= config['soft_pity_start']:
# 软保底期间,每抽增加一定概率
additional_draws = current_count - config['soft_pity_start']
additional_probability = additional_draws * config['soft_pity_increment']
return min(base_probability + additional_probability, 0.5) # 最高50%

return base_probability

def should_trigger_pity(self, user_id: str, rarity: RarityLevel, current_count: int) -> bool:
"""判断是否触发保底"""
if rarity not in self.pity_configs:
return False

return current_count >= self.pity_configs[rarity]['hard_pity']

三、防作弊与安全机制

(一)服务端验证

  • 双重验证:客户端请求+服务端验证
  • 时间戳验证:防止重放攻击
  • 签名验证:请求参数数字签名
  • 频率限制:防止异常高频抽奖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import hmac
import hashlib
import json
from datetime import datetime, timedelta
from typing import Optional

class AntiCheatSystem:
"""防作弊系统"""

def __init__(self, secret_key: str):
self.secret_key = secret_key
self.user_request_history = {} # 用户请求历史
self.rate_limits = {
'draw_single': {'count': 100, 'window': 3600}, # 1小时100次单抽
'draw_multiple': {'count': 20, 'window': 3600}, # 1小时20次十连抽
}

def validate_draw_request(self, user_id: str, request_data: Dict) -> Tuple[bool, str]:
"""验证抽奖请求"""
# 1. 验证请求签名
if not self._verify_signature(request_data):
return False, "请求签名验证失败"

# 2. 验证时间戳
if not self._verify_timestamp(request_data.get('timestamp')):
return False, "请求时间戳无效"

# 3. 验证用户权限
if not self._verify_user_permission(user_id, request_data):
return False, "用户权限不足"

# 4. 验证频率限制
action = request_data.get('action', 'draw_single')
if not self._check_rate_limit(user_id, action):
return False, "请求频率过高"

# 5. 验证游戏状态
if not self._verify_game_state(user_id, request_data):
return False, "游戏状态异常"

return True, "验证通过"

def _verify_signature(self, request_data: Dict) -> bool:
"""验证请求签名"""
received_signature = request_data.pop('signature', '')

# 按键名排序并拼接参数
sorted_params = sorted(request_data.items())
param_string = '&'.join([f"{k}={v}" for k, v in sorted_params])

# 计算HMAC签名
expected_signature = hmac.new(
self.secret_key.encode(),
param_string.encode(),
hashlib.sha256
).hexdigest()

return hmac.compare_digest(received_signature, expected_signature)

def _verify_timestamp(self, timestamp: Optional[int]) -> bool:
"""验证时间戳"""
if not timestamp:
return False

current_time = int(datetime.now().timestamp())
time_diff = abs(current_time - timestamp)

# 允许5分钟的时间偏差
return time_diff <= 300

def _verify_user_permission(self, user_id: str, request_data: Dict) -> bool:
"""验证用户权限"""
# 检查用户是否被封禁
if self._is_user_banned(user_id):
return False

# 检查用户资源是否足够
required_currency = request_data.get('cost', 0)
if not self._check_user_currency(user_id, required_currency):
return False

return True

def _check_rate_limit(self, user_id: str, action: str) -> bool:
"""检查频率限制"""
if action not in self.rate_limits:
return True

limit_config = self.rate_limits[action]
current_time = datetime.now()

# 初始化用户请求历史
if user_id not in self.user_request_history:
self.user_request_history[user_id] = {}

if action not in self.user_request_history[user_id]:
self.user_request_history[user_id][action] = []

request_history = self.user_request_history[user_id][action]

# 清理过期的请求记录
window_start = current_time - timedelta(seconds=limit_config['window'])
request_history[:] = [req_time for req_time in request_history if req_time > window_start]

# 检查是否超过限制
if len(request_history) >= limit_config['count']:
return False

# 记录当前请求
request_history.append(current_time)
return True

def _verify_game_state(self, user_id: str, request_data: Dict) -> bool:
"""验证游戏状态"""
# 检查用户是否在线
if not self._is_user_online(user_id):
return False

# 检查客户端版本
client_version = request_data.get('client_version')
if not self._is_version_supported(client_version):
return False

# 检查设备信息一致性
device_id = request_data.get('device_id')
if not self._verify_device_consistency(user_id, device_id):
return False

return True

def _is_user_banned(self, user_id: str) -> bool:
"""检查用户是否被封禁"""
# 实际实现中应查询数据库
return False

def _check_user_currency(self, user_id: str, required_amount: int) -> bool:
"""检查用户货币是否足够"""
# 实际实现中应查询用户资产
return True

def _is_user_online(self, user_id: str) -> bool:
"""检查用户是否在线"""
# 实际实现中应检查用户会话状态
return True

def _is_version_supported(self, version: str) -> bool:
"""检查客户端版本是否支持"""
# 实际实现中应检查版本兼容性
return True

def _verify_device_consistency(self, user_id: str, device_id: str) -> bool:
"""验证设备信息一致性"""
# 实际实现中应检查设备绑定信息
return True

class ServerSideLootSystem:
"""服务端抽奖系统"""

def __init__(self, anti_cheat: AntiCheatSystem):
self.anti_cheat = anti_cheat
self.loot_boxes = {} # 抽奖箱配置
self.user_inventories = {} # 用户背包

def process_draw_request(self, user_id: str, request_data: Dict) -> Dict:
"""处理抽奖请求"""
# 1. 防作弊验证
is_valid, error_msg = self.anti_cheat.validate_draw_request(user_id, request_data)
if not is_valid:
return {
'success': False,
'error': error_msg,
'error_code': 'VALIDATION_FAILED'
}

try:
# 2. 扣除用户资源
cost = request_data.get('cost', 0)
if not self._deduct_user_currency(user_id, cost):
return {
'success': False,
'error': '资源不足',
'error_code': 'INSUFFICIENT_CURRENCY'
}

# 3. 执行抽奖
box_id = request_data.get('box_id')
draw_count = request_data.get('count', 1)

loot_box = self.loot_boxes.get(box_id)
if not loot_box:
return {
'success': False,
'error': '抽奖箱不存在',
'error_code': 'INVALID_LOOT_BOX'
}

# 执行抽奖逻辑
results = loot_box.draw_multiple(user_id, draw_count)

# 4. 发放奖励到用户背包
self._add_items_to_inventory(user_id, results)

# 5. 记录抽奖日志
self._log_draw_result(user_id, box_id, results, cost)

# 6. 返回结果
return {
'success': True,
'results': [
{
'item_id': item.id,
'name': item.name,
'rarity': item.rarity.name,
'count': 1
} for item in results
],
'remaining_currency': self._get_user_currency(user_id)
}

except Exception as e:
# 异常处理:回滚用户资源
self._refund_user_currency(user_id, cost)

return {
'success': False,
'error': '系统错误,请重试',
'error_code': 'SYSTEM_ERROR'
}

def _deduct_user_currency(self, user_id: str, amount: int) -> bool:
"""扣除用户货币"""
# 实际实现中应操作数据库
return True

def _add_items_to_inventory(self, user_id: str, items: List[LootItem]):
"""添加物品到用户背包"""
if user_id not in self.user_inventories:
self.user_inventories[user_id] = {}

inventory = self.user_inventories[user_id]
for item in items:
if item.id in inventory:
inventory[item.id]['count'] += 1
else:
inventory[item.id] = {
'name': item.name,
'rarity': item.rarity.name,
'count': 1
}

def _log_draw_result(self, user_id: str, box_id: str, results: List[LootItem], cost: int):
"""记录抽奖日志"""
log_entry = {
'user_id': user_id,
'box_id': box_id,
'timestamp': datetime.now().isoformat(),
'cost': cost,
'results': [{'item_id': item.id, 'rarity': item.rarity.name} for item in results]
}

# 实际实现中应写入日志系统
print(f"抽奖日志: {json.dumps(log_entry, ensure_ascii=False)}")

def _get_user_currency(self, user_id: str) -> int:
"""获取用户货币余额"""
# 实际实现中应查询数据库
return 1000

def _refund_user_currency(self, user_id: str, amount: int):
"""退还用户货币"""
# 实际实现中应操作数据库
pass

(二)数据完整性保护

  • 数据库事务:确保抽奖操作的原子性
  • 日志审计:完整记录所有抽奖操作
  • 数据备份:定期备份关键数据
  • 异常监控:监控异常的抽奖行为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import sqlite3
import json
from contextlib import contextmanager
from datetime import datetime

class LootSystemDatabase:
"""抽奖系统数据库"""

def __init__(self, db_path: str):
self.db_path = db_path
self._init_database()

def _init_database(self):
"""初始化数据库表"""
with self.get_connection() as conn:
# 用户资产表
conn.execute('''
CREATE TABLE IF NOT EXISTS user_assets (
user_id TEXT PRIMARY KEY,
currency INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

# 用户背包表
conn.execute('''
CREATE TABLE IF NOT EXISTS user_inventory (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
item_id INTEGER,
item_name TEXT,
rarity TEXT,
count INTEGER DEFAULT 1,
obtained_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')

# 抽奖记录表
conn.execute('''
CREATE TABLE IF NOT EXISTS draw_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT,
box_id TEXT,
cost INTEGER,
results TEXT, -- JSON格式的抽奖结果
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT,
device_id TEXT
)
''')

# 保底计数表
conn.execute('''
CREATE TABLE IF NOT EXISTS pity_counters (
user_id TEXT,
rarity TEXT,
count INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, rarity)
)
''')

conn.commit()

@contextmanager
def get_connection(self):
"""获取数据库连接(上下文管理器)"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # 使结果可以按列名访问
try:
yield conn
finally:
conn.close()

def execute_draw_transaction(self, user_id: str, box_id: str, cost: int,
results: List[LootItem]) -> bool:
"""执行抽奖事务"""
try:
with self.get_connection() as conn:
# 开始事务
conn.execute('BEGIN TRANSACTION')

# 1. 检查并扣除用户货币
cursor = conn.execute(
'SELECT currency FROM user_assets WHERE user_id = ?',
(user_id,)
)
row = cursor.fetchone()

if not row or row['currency'] < cost:
conn.execute('ROLLBACK')
return False

# 扣除货币
conn.execute(
'UPDATE user_assets SET currency = currency - ?, last_updated = ? WHERE user_id = ?',
(cost, datetime.now(), user_id)
)

# 2. 添加物品到背包
for item in results:
conn.execute('''
INSERT INTO user_inventory (user_id, item_id, item_name, rarity, count)
VALUES (?, ?, ?, ?, 1)
''', (user_id, item.id, item.name, item.rarity.name))

# 3. 记录抽奖日志
results_json = json.dumps([
{'item_id': item.id, 'name': item.name, 'rarity': item.rarity.name}
for item in results
])

conn.execute('''
INSERT INTO draw_logs (user_id, box_id, cost, results)
VALUES (?, ?, ?, ?)
''', (user_id, box_id, cost, results_json))

# 4. 更新保底计数
for rarity in RarityLevel:
# 增加计数
conn.execute('''
INSERT OR REPLACE INTO pity_counters (user_id, rarity, count, last_updated)
VALUES (?, ?, COALESCE((SELECT count FROM pity_counters WHERE user_id = ? AND rarity = ?), 0) + 1, ?)
''', (user_id, rarity.name, user_id, rarity.name, datetime.now()))

# 重置获得稀有度的计数
for item in results:
conn.execute('''
UPDATE pity_counters
SET count = 0, last_updated = ?
WHERE user_id = ? AND rarity = ?
''', (datetime.now(), user_id, item.rarity.name))

# 提交事务
conn.execute('COMMIT')
return True

except Exception as e:
print(f"抽奖事务执行失败: {e}")
return False

def get_user_pity_counters(self, user_id: str) -> Dict[str, int]:
"""获取用户保底计数"""
with self.get_connection() as conn:
cursor = conn.execute(
'SELECT rarity, count FROM pity_counters WHERE user_id = ?',
(user_id,)
)

counters = {}
for row in cursor.fetchall():
counters[row['rarity']] = row['count']

return counters

def get_draw_statistics(self, user_id: str = None, days: int = 30) -> Dict:
"""获取抽奖统计信息"""
with self.get_connection() as conn:
base_query = '''
SELECT
COUNT(*) as total_draws,
SUM(cost) as total_cost,
AVG(cost) as avg_cost
FROM draw_logs
WHERE timestamp >= datetime('now', '-{} days')
'''.format(days)

if user_id:
base_query += ' AND user_id = ?'
cursor = conn.execute(base_query, (user_id,))
else:
cursor = conn.execute(base_query)

stats = cursor.fetchone()

return {
'total_draws': stats['total_draws'] or 0,
'total_cost': stats['total_cost'] or 0,
'avg_cost': stats['avg_cost'] or 0
}

四、用户体验与心理设计

(一)视觉效果设计

  • 动画效果:抽奖过程的视觉反馈
  • 音效设计:不同稀有度的音效区分
  • 特效展示:稀有物品的特殊展示效果
  • UI交互:流畅的操作体验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
// 前端抽奖动画系统
class LootBoxAnimation {
constructor(container, config) {
this.container = container;
this.config = {
animationDuration: 3000, // 动画总时长
suspenseDuration: 2000, // 悬念阶段时长
revealDuration: 1000, // 揭晓阶段时长
...config
};

this.audioManager = new AudioManager();
this.particleSystem = new ParticleSystem(container);
}

async playDrawAnimation(results) {
// 1. 播放开箱音效
this.audioManager.play('box_opening');

// 2. 显示抽奖容器
const boxElement = this.createLootBox();
this.container.appendChild(boxElement);

// 3. 悬念阶段 - 盒子摇晃动画
await this.playSuspenseAnimation(boxElement);

// 4. 确定最高稀有度物品
const highestRarity = this.getHighestRarity(results);

// 5. 播放对应稀有度的特效
await this.playRarityEffect(highestRarity);

// 6. 揭晓结果
await this.revealResults(results);

// 7. 清理动画元素
this.cleanup();
}

async playSuspenseAnimation(boxElement) {
return new Promise(resolve => {
// 盒子摇晃动画
boxElement.style.animation = `shake ${this.config.suspenseDuration}ms ease-in-out`;

// 播放悬念音效
this.audioManager.play('suspense_loop');

setTimeout(resolve, this.config.suspenseDuration);
});
}

async playRarityEffect(rarity) {
const effects = {
'COMMON': () => this.playCommonEffect(),
'UNCOMMON': () => this.playUncommonEffect(),
'RARE': () => this.playRareEffect(),
'EPIC': () => this.playEpicEffect(),
'LEGENDARY': () => this.playLegendaryEffect(),
'MYTHIC': () => this.playMythicEffect()
};

const effectFunction = effects[rarity] || effects['COMMON'];
await effectFunction();
}

async playLegendaryEffect() {
// 传说级特效:金光闪闪
this.audioManager.play('legendary_reveal');

// 屏幕闪光效果
const flash = document.createElement('div');
flash.className = 'screen-flash golden';
this.container.appendChild(flash);

// 粒子特效
this.particleSystem.emit('golden_particles', {
count: 100,
duration: 2000,
color: '#FFD700'
});

// 震动效果(移动端)
if (navigator.vibrate) {
navigator.vibrate([200, 100, 200]);
}

return new Promise(resolve => {
setTimeout(() => {
flash.remove();
resolve();
}, 1000);
});
}

async revealResults(results) {
const resultsContainer = document.createElement('div');
resultsContainer.className = 'results-container';

for (let i = 0; i < results.length; i++) {
const item = results[i];
const itemElement = this.createItemElement(item);

// 延迟显示每个物品
setTimeout(() => {
resultsContainer.appendChild(itemElement);
this.playItemRevealAnimation(itemElement, item.rarity);
}, i * 200);
}

this.container.appendChild(resultsContainer);

// 等待所有物品显示完成
return new Promise(resolve => {
setTimeout(resolve, results.length * 200 + 500);
});
}

createItemElement(item) {
const element = document.createElement('div');
element.className = `item-card rarity-${item.rarity.toLowerCase()}`;
element.innerHTML = `
<div class="item-icon">
<img src="/images/items/${item.id}.png" alt="${item.name}">
</div>
<div class="item-name">${item.name}</div>
<div class="item-rarity">${item.rarity}</div>
`;
return element;
}

playItemRevealAnimation(element, rarity) {
// 根据稀有度播放不同的揭晓动画
const animations = {
'COMMON': 'fadeInUp',
'UNCOMMON': 'bounceIn',
'RARE': 'zoomIn',
'EPIC': 'rotateIn',
'LEGENDARY': 'jackInTheBox',
'MYTHIC': 'lightSpeedIn'
};

const animation = animations[rarity] || animations['COMMON'];
element.style.animation = `${animation} 0.8s ease-out`;

// 播放对应音效
this.audioManager.play(`item_reveal_${rarity.toLowerCase()}`);
}

getHighestRarity(results) {
const rarityOrder = ['COMMON', 'UNCOMMON', 'RARE', 'EPIC', 'LEGENDARY', 'MYTHIC'];
let highest = 'COMMON';

for (const item of results) {
if (rarityOrder.indexOf(item.rarity) > rarityOrder.indexOf(highest)) {
highest = item.rarity;
}
}

return highest;
}
}

// 音效管理器
class AudioManager {
constructor() {
this.sounds = {};
this.volume = 0.7;
this.loadSounds();
}

loadSounds() {
const soundFiles = {
'box_opening': '/audio/box_opening.mp3',
'suspense_loop': '/audio/suspense_loop.mp3',
'legendary_reveal': '/audio/legendary_reveal.mp3',
'item_reveal_common': '/audio/item_common.mp3',
'item_reveal_legendary': '/audio/item_legendary.mp3'
};

for (const [name, url] of Object.entries(soundFiles)) {
const audio = new Audio(url);
audio.volume = this.volume;
this.sounds[name] = audio;
}
}

play(soundName) {
const sound = this.sounds[soundName];
if (sound) {
sound.currentTime = 0; // 重置播放位置
sound.play().catch(e => console.log('音频播放失败:', e));
}
}
}

// 粒子系统
class ParticleSystem {
constructor(container) {
this.container = container;
this.particles = [];
}

emit(type, config) {
const emitters = {
'golden_particles': () => this.emitGoldenParticles(config),
'rainbow_particles': () => this.emitRainbowParticles(config),
'sparkles': () => this.emitSparkles(config)
};

const emitter = emitters[type];
if (emitter) {
emitter();
}
}

emitGoldenParticles(config) {
for (let i = 0; i < config.count; i++) {
const particle = document.createElement('div');
particle.className = 'particle golden';
particle.style.cssText = `
position: absolute;
width: 4px;
height: 4px;
background: ${config.color};
border-radius: 50%;
pointer-events: none;
left: ${Math.random() * 100}%;
top: ${Math.random() * 100}%;
animation: particleFloat ${config.duration}ms ease-out forwards;
`;

this.container.appendChild(particle);

// 清理粒子
setTimeout(() => {
if (particle.parentNode) {
particle.parentNode.removeChild(particle);
}
}, config.duration);
}
}
}

(二)心理学应用

  • 损失厌恶:保底机制减少用户损失感
  • 变比强化:不定期奖励增强用户行为
  • 锚定效应:通过对比突出稀有物品价值
  • 社会认同:展示其他玩家的抽奖结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class PsychologyOptimizer:
"""心理学优化器"""

def __init__(self):
self.user_behavior_data = {}

def optimize_draw_experience(self, user_id: str, draw_history: List) -> Dict:
"""优化抽奖体验"""
profile = self._analyze_user_psychology(user_id, draw_history)

recommendations = {
'show_pity_progress': self._should_show_pity_progress(profile),
'highlight_near_guarantee': self._should_highlight_guarantee(profile),
'show_social_proof': self._should_show_social_proof(profile),
'recommend_draw_count': self._recommend_draw_count(profile),
'show_value_comparison': self._should_show_value_comparison(profile)
}

return recommendations

def _analyze_user_psychology(self, user_id: str, draw_history: List) -> Dict:
"""分析用户心理特征"""
if not draw_history:
return {'type': 'new_user', 'risk_tolerance': 'medium'}

# 分析抽奖模式
recent_draws = draw_history[-10:] # 最近10次抽奖

# 计算抽奖频率
draw_frequency = len(recent_draws) / max(1, (time.time() - recent_draws[0]['timestamp']) / 86400)

# 分析消费模式
total_spent = sum(draw['cost'] for draw in recent_draws)
avg_spend = total_spent / len(recent_draws)

# 分析情绪状态
consecutive_failures = self._count_consecutive_failures(recent_draws)

# 用户类型分类
if draw_frequency > 5: # 高频抽奖
user_type = 'addictive'
elif avg_spend > 100: # 高消费
user_type = 'whale'
elif consecutive_failures > 20: # 连续失败
user_type = 'frustrated'
else:
user_type = 'casual'

return {
'type': user_type,
'draw_frequency': draw_frequency,
'avg_spend': avg_spend,
'consecutive_failures': consecutive_failures,
'risk_tolerance': self._assess_risk_tolerance(recent_draws)
}

def _should_show_pity_progress(self, profile: Dict) -> bool:
"""是否显示保底进度"""
# 沮丧用户和高频用户需要看到进度
return profile['type'] in ['frustrated', 'addictive'] or profile['consecutive_failures'] > 10

def _should_highlight_guarantee(self, profile: Dict) -> bool:
"""是否突出显示保底机制"""
# 接近保底时突出显示
return profile['consecutive_failures'] > 30

def _should_show_social_proof(self, profile: Dict) -> bool:
"""是否显示社会认同"""
# 新用户和犹豫用户需要社会认同
return profile['type'] in ['new_user', 'casual']

def _recommend_draw_count(self, profile: Dict) -> int:
"""推荐抽奖次数"""
if profile['type'] == 'whale':
return 10 # 推荐十连抽
elif profile['type'] == 'frustrated':
return 1 # 推荐单抽,降低损失
else:
return 5 # 推荐五连抽

def _should_show_value_comparison(self, profile: Dict) -> bool:
"""是否显示价值对比"""
# 理性用户需要看到价值对比
return profile['risk_tolerance'] == 'low'

总结:游戏抽奖系统是一个复杂的技术与心理学结合的系统工程。从技术角度,需要实现公平的随机算法、完善的防作弊机制、高性能的并发处理;从产品角度,需要平衡用户体验与商业收益,运用心理学原理提升用户参与度。

核心技术要点包括:安全的随机数生成、多层次的概率机制、完善的保底系统、严格的防作弊验证、流畅的视觉体验设计。同时,需要深入理解用户心理,通过数据分析优化抽奖体验,在保证公平性的前提下最大化用户满意度和商业价值。

现代游戏抽奖系统已经发展成为融合概率论、密码学、心理学、用户体验设计等多个领域的综合性系统,其设计思路和技术方案对其他需要随机奖励机制的应用都有很好的参考价值。