1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
| import hmac import hashlib import json from datetime import datetime, timedelta from typing import Optional
class AntiCheatSystem: """防作弊系统""" def __init__(self, secret_key: str): self.secret_key = secret_key self.user_request_history = {} self.rate_limits = { 'draw_single': {'count': 100, 'window': 3600}, 'draw_multiple': {'count': 20, 'window': 3600}, } def validate_draw_request(self, user_id: str, request_data: Dict) -> Tuple[bool, str]: """验证抽奖请求""" if not self._verify_signature(request_data): return False, "请求签名验证失败" if not self._verify_timestamp(request_data.get('timestamp')): return False, "请求时间戳无效" if not self._verify_user_permission(user_id, request_data): return False, "用户权限不足" action = request_data.get('action', 'draw_single') if not self._check_rate_limit(user_id, action): return False, "请求频率过高" if not self._verify_game_state(user_id, request_data): return False, "游戏状态异常" return True, "验证通过" def _verify_signature(self, request_data: Dict) -> bool: """验证请求签名""" received_signature = request_data.pop('signature', '') sorted_params = sorted(request_data.items()) param_string = '&'.join([f"{k}={v}" for k, v in sorted_params]) expected_signature = hmac.new( self.secret_key.encode(), param_string.encode(), hashlib.sha256 ).hexdigest() return hmac.compare_digest(received_signature, expected_signature) def _verify_timestamp(self, timestamp: Optional[int]) -> bool: """验证时间戳""" if not timestamp: return False current_time = int(datetime.now().timestamp()) time_diff = abs(current_time - timestamp) return time_diff <= 300 def _verify_user_permission(self, user_id: str, request_data: Dict) -> bool: """验证用户权限""" if self._is_user_banned(user_id): return False required_currency = request_data.get('cost', 0) if not self._check_user_currency(user_id, required_currency): return False return True def _check_rate_limit(self, user_id: str, action: str) -> bool: """检查频率限制""" if action not in self.rate_limits: return True limit_config = self.rate_limits[action] current_time = datetime.now() if user_id not in self.user_request_history: self.user_request_history[user_id] = {} if action not in self.user_request_history[user_id]: self.user_request_history[user_id][action] = [] request_history = self.user_request_history[user_id][action] window_start = current_time - timedelta(seconds=limit_config['window']) request_history[:] = [req_time for req_time in request_history if req_time > window_start] if len(request_history) >= limit_config['count']: return False request_history.append(current_time) return True def _verify_game_state(self, user_id: str, request_data: Dict) -> bool: """验证游戏状态""" if not self._is_user_online(user_id): return False client_version = request_data.get('client_version') if not self._is_version_supported(client_version): return False device_id = request_data.get('device_id') if not self._verify_device_consistency(user_id, device_id): return False return True def _is_user_banned(self, user_id: str) -> bool: """检查用户是否被封禁""" return False def _check_user_currency(self, user_id: str, required_amount: int) -> bool: """检查用户货币是否足够""" return True def _is_user_online(self, user_id: str) -> bool: """检查用户是否在线""" return True def _is_version_supported(self, version: str) -> bool: """检查客户端版本是否支持""" return True def _verify_device_consistency(self, user_id: str, device_id: str) -> bool: """验证设备信息一致性""" return True
class ServerSideLootSystem: """服务端抽奖系统""" def __init__(self, anti_cheat: AntiCheatSystem): self.anti_cheat = anti_cheat self.loot_boxes = {} self.user_inventories = {} def process_draw_request(self, user_id: str, request_data: Dict) -> Dict: """处理抽奖请求""" is_valid, error_msg = self.anti_cheat.validate_draw_request(user_id, request_data) if not is_valid: return { 'success': False, 'error': error_msg, 'error_code': 'VALIDATION_FAILED' } try: cost = request_data.get('cost', 0) if not self._deduct_user_currency(user_id, cost): return { 'success': False, 'error': '资源不足', 'error_code': 'INSUFFICIENT_CURRENCY' } box_id = request_data.get('box_id') draw_count = request_data.get('count', 1) loot_box = self.loot_boxes.get(box_id) if not loot_box: return { 'success': False, 'error': '抽奖箱不存在', 'error_code': 'INVALID_LOOT_BOX' } results = loot_box.draw_multiple(user_id, draw_count) self._add_items_to_inventory(user_id, results) self._log_draw_result(user_id, box_id, results, cost) return { 'success': True, 'results': [ { 'item_id': item.id, 'name': item.name, 'rarity': item.rarity.name, 'count': 1 } for item in results ], 'remaining_currency': self._get_user_currency(user_id) } except Exception as e: self._refund_user_currency(user_id, cost) return { 'success': False, 'error': '系统错误,请重试', 'error_code': 'SYSTEM_ERROR' } def _deduct_user_currency(self, user_id: str, amount: int) -> bool: """扣除用户货币""" return True def _add_items_to_inventory(self, user_id: str, items: List[LootItem]): """添加物品到用户背包""" if user_id not in self.user_inventories: self.user_inventories[user_id] = {} inventory = self.user_inventories[user_id] for item in items: if item.id in inventory: inventory[item.id]['count'] += 1 else: inventory[item.id] = { 'name': item.name, 'rarity': item.rarity.name, 'count': 1 } def _log_draw_result(self, user_id: str, box_id: str, results: List[LootItem], cost: int): """记录抽奖日志""" log_entry = { 'user_id': user_id, 'box_id': box_id, 'timestamp': datetime.now().isoformat(), 'cost': cost, 'results': [{'item_id': item.id, 'rarity': item.rarity.name} for item in results] } print(f"抽奖日志: {json.dumps(log_entry, ensure_ascii=False)}") def _get_user_currency(self, user_id: str) -> int: """获取用户货币余额""" return 1000 def _refund_user_currency(self, user_id: str, amount: int): """退还用户货币""" pass
|