1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
| #!/usr/bin/env python3 """ 分布式任务调度器健康检查与故障转移 """
import threading import time import random import socket import json from datetime import datetime from typing import Dict, List, Optional import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
class TaskSchedulerNode: """任务调度节点""" def __init__(self, node_id: str, host: str, port: int): self.node_id = node_id self.host = host self.port = port self.is_leader = False self.is_healthy = True self.last_heartbeat = time.time() self.tasks = [] def to_dict(self): return { 'node_id': self.node_id, 'host': self.host, 'port': self.port, 'is_leader': self.is_leader, 'is_healthy': self.is_healthy }
class DistributedSchedulerMonitor: """分布式调度器监控器""" def __init__(self, nodes: List[Dict]): self.nodes = [] for node_info in nodes: node = TaskSchedulerNode( node_id=node_info['id'], host=node_info['host'], port=node_info['port'] ) self.nodes.append(node) # 选举第一个节点为leader if self.nodes: self.nodes[0].is_leader = True self.heartbeat_interval = 5 # 秒 self.heartbeat_timeout = 15 # 秒 self.monitor_thread = None self.running = False # 统计信息 self.stats = { 'health_checks': 0, 'failovers': 0, 'last_failover': None } def start_monitoring(self): """启动监控线程""" self.running = True self.monitor_thread = threading.Thread(target=self._monitor_loop) self.monitor_thread.daemon = True self.monitor_thread.start() logger.info("监控线程已启动") def stop_monitoring(self): """停止监控""" self.running = False if self.monitor_thread: self.monitor_thread.join(timeout=10) logger.info("监控线程已停止") def _monitor_loop(self): """监控主循环""" while self.running: try: self._check_all_nodes() self._elect_leader_if_needed() self._log_status() except Exception as e: logger.error(f"监控循环异常: {e}") time.sleep(self.heartbeat_interval) def _check_all_nodes(self): """检查所有节点健康状态""" threads = [] for node in self.nodes: thread = threading.Thread( target=self._check_node_health, args=(node,) ) thread.start() threads.append(thread) # 等待所有检查完成 for thread in threads: thread.join() self.stats['health_checks'] += 1 def _check_node_health(self, node: TaskSchedulerNode): """检查单个节点健康状态""" try: # 模拟健康检查(实际可能是HTTP请求或TCP连接) if random.random() < 0.05: # 5%概率模拟检查失败 raise ConnectionError("模拟网络故障") # 更新心跳时间 node.last_heartbeat = time.time() node.is_healthy = True # 如果是leader,额外检查负载 if node.is_leader: self._check_leader_load(node) except (ConnectionError, socket.timeout, socket.error) as e: logger.warning(f"节点 {node.node_id} 健康检查失败: {e}") node.is_healthy = False def _check_leader_load(self, leader_node: TaskSchedulerNode): """检查leader节点负载""" # 模拟获取leader负载 load = random.randint(0, 100) if load > 80: logger.warning(f"Leader节点 {leader_node.node_id} 负载过高: {load}%") # 模拟触发负载均衡 if random.random() < 0.3: self._trigger_load_balance() def _trigger_load_balance(self): """触发负载均衡""" logger.info("触发负载均衡...") # 简化实现:随机选择一个健康节点分担任务 healthy_nodes = [n for n in self.nodes if n.is_healthy and not n.is_leader] if healthy_nodes: selected = random.choice(healthy_nodes) logger.info(f"选择节点 {selected.node_id} 分担负载") def _elect_leader_if_needed(self): """如果需要,选举新的leader""" current_leader = self._get_current_leader() # 检查leader是否健康 if current_leader and not current_leader.is_healthy: logger.warning(f"Leader节点 {current_leader.node_id} 不健康,开始故障转移") self._perform_failover(current_leader) def _get_current_leader(self) -> Optional[TaskSchedulerNode]: """获取当前leader节点""" for node in self.nodes: if node.is_leader: return node return None def _perform_failover(self, failed_leader: TaskSchedulerNode): """执行故障转移""" # 标记原leader为非leader failed_leader.is_leader = False # 选择新的leader(选择最健康的节点) candidates = [n for n in self.nodes if n.is_healthy and n != failed_leader] if not candidates: logger.error("没有可用的健康节点作为leader候选") return # 选择策略:选择最近心跳时间最新的节点 new_leader = max(candidates, key=lambda n: n.last_heartbeat) new_leader.is_leader = True # 更新统计 self.stats['failovers'] += 1 self.stats['last_failover'] = datetime.now().isoformat() logger.info(f"故障转移完成: {failed_leader.node_id} -> {new_leader.node_id}") # 通知所有节点leader变更 self._notify_leader_change(new_leader) def _notify_leader_change(self, new_leader: TaskSchedulerNode): """通知所有节点leader变更""" notification = { 'type': 'leader_change', 'new_leader_id': new_leader.node_id, 'timestamp': time.time() } # 简化实现:记录日志 logger.info(f"通知leader变更: {json.dumps(notification)}") def _log_status(self): """记录状态日志""" status = { 'timestamp': datetime.now().isoformat(), 'total_nodes': len(self.nodes), 'healthy_nodes': len([n for n in self.nodes if n.is_healthy]), 'current_leader': self._get_current_leader().node_id if self._get_current_leader() else None, 'stats': self.stats } logger.debug(f"系统状态: {json.dumps(status, indent=2)}") def add_task(self, task_data: Dict): """添加任务到leader节点""" leader = self._get_current_leader() if not leader: logger.error("没有可用的leader节点") return False if not leader.is_healthy: logger.error("Leader节点不健康") return False # 模拟任务分配 leader.tasks.append(task_data) logger.info(f"任务已添加到leader节点 {leader.node_id}: {task_data.get('id', 'unknown')}") return True def get_status_report(self) -> Dict: """获取状态报告""" return { 'monitor_running': self.running, 'nodes': [node.to_dict() for node in self.nodes], 'stats': self.stats, 'timestamp': datetime.now().isoformat() }
# 使用示例 def main(): """主函数""" # 模拟3个节点 nodes_config = [ {'id': 'node-1', 'host': '192.168.1.101', 'port': 8080}, {'id': 'node-2', 'host': '192.168.1.102', 'port': 8080}, {'id': 'node-3', 'host': '192.168.1.103', 'port': 8080} ] monitor = DistributedSchedulerMonitor(nodes_config) try: # 启动监控 monitor.start_monitoring() # 模拟运行一段时间 for i in range(10): time.sleep(3) # 随机添加任务 if random.random() < 0.5: task = {'id': f'task-{i}', 'data': f'payload-{i}'} monitor.add_task(task) # 打印状态 if i % 3 == 0: report = monitor.get_status_report() print(f"\n状态报告: {json.dumps(report, indent=2)}") except KeyboardInterrupt: print("\n收到中断信号") finally: monitor.stop_monitoring() print("监控已停止")
if __name__ == "__main__": main()
|