Linux运维工程师笔试题第二十六套

作为SRE,肯定要认真的审视每一行代码,不能因为你的巡检工具把正常服务打挂了。不仅要问”这行代码能运行吗?”,更要问:

  1. “如果输入异常大,会怎样?”(容量规划)
  2. “如果中途需要停止,能优雅退出吗?”(可中断性)
  3. “如果运行在受限环境,会出问题吗?”(资源限制)
  4. “如果被恶意使用,会造成什么危害?”(安全性)

当面对面试官的时候,面试官想看到当面试人分析代码时:

  • 排查思路是否系统化?(是东一榔头西一棒子,还是有清晰的优先级)
  • 对风险的评估是否准确?(是过度紧张,还是能区分“严重漏洞”和“代码异味”)
  • 解决方案是否工程化?(是打补丁,还是能设计出健壮的、可复用的模式)
  1. 分析下面的代码有什么BUG或者隐患?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import os
import re
def analyze_error_logs(log_dir, pattern="ERROR"):
 """
 分析指定目录下所有日志文件中的错误行
 """
 error_count = 0
 for filename in os.listdir(log_dir):
  if filename.endswith('.log'):
  filepath = os.path.join(log_dir, filename)
  with open(filepath, 'r') as f:
  for line in f:
  if re.search(pattern, line):
  error_count += 1
  print(f"Found error in {filename}: {line.strip()}")
 print(f"Total errors found: {error_count}")
 return error_count
# 使用示例
if name == "main":
 analyze_error_logs("/var/log/myapp/")。

【提示】with用法是Python推荐的文件操作方式,比手动f.close()更安全,不会导致文件描述符泄漏。 其次,for line in f: 这段代码实际上是逐行读取的,不会一次性加载整个文件到内存,所以不会导致OOM。这是处理大日志文件的正确方式。这个比较简单,应该都能看出来。

  1. 分析下面的代码有什么BUG或者隐患?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import requests
import time
from datetime import datetime

class ServiceMonitor:
def __init__(self, endpoints):
self.endpoints = endpoints
self.results = []

def check_endpoint(self, url):
try:
start = time.time()
response = requests.get(url, timeout=5)
elapsed = time.time() - start

if response.status_code == 200:
return {
'url': url,
'status': 'UP',
'response_time': elapsed,
'timestamp': datetime.now()
}
else:
return {
'url': url,
'status': 'DOWN',
'response_time': elapsed,
'timestamp': datetime.now()
}
except Exception as e:
return {
'url': url,
'status': 'ERROR',
'error': str(e),
'timestamp': datetime.now()
}

def run_checks(self):
for endpoint in self.endpoints:
result = self.check_endpoint(endpoint)
self.results.append(result)
time.sleep(1) # 避免请求过于密集

# 统计并告警
down_count = sum(1 for r in self.results if r['status'] != 'UP')
if down_count > len(self.endpoints) * 0.3:
self.send_alert(f"{down_count} services down!")

def send_alert(self, message):
# 模拟发送告警
print(f"ALERT: {message}")

# 使用示例
if __name__ == "__main__":
monitor = ServiceMonitor([
"http://api.service1.com/health",
"http://api.service2.com/health",
"http://api.service3.com/health"
])
monitor.run_checks()

【提示】除了几个简单,还有这几个:result = self.check_endpoint(endpoint) 这个写法在高频率监控时产生大量TCP连接,消耗系统资源,使用requests.Session()复用连接,而且endpoints列表被恶意注入或配置错误,那么这个脚本就会持续攻击目标服务器,直接P4故障升级P0。而且这里没有重试机制,要是网络抖动一下就记成了ERROR,那就误报了。然后time.sleep(1)是同步进行了,如果endpoints是100个就要100秒,实效性太差了。

  1. 分析下面的代码有什么BUG或者隐患?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import yaml
import json

def load_and_validate_config(config_path):
"""
加载并验证配置文件
"""
with open(config_path, 'r') as f:
if config_path.endswith('.yaml') or config_path.endswith('.yml'):
config = yaml.safe_load(f)
elif config_path.endswith('.json'):
config = json.load(f)
else:
raise ValueError("Unsupported config format")

# 验证必需字段
required_fields = ['service_name', 'port', 'database']
for field in required_fields:
if field not in config:
raise KeyError(f"Missing required field: {field}")

# 验证端口范围
if config['port'] < 1 or config['port'] > 65535:
raise ValueError(f"Invalid port number: {config['port']}")

# 验证数据库连接信息
db_config = config['database']
if 'host' not in db_config or 'name' not in db_config:
raise ValueError("Invalid database configuration")

return config

# 使用示例
if __name__ == "__main__":
try:
config = load_and_validate_config("app_config.yaml")
print(f"Config loaded successfully: {config['service_name']}")
except Exception as e:
print(f"Failed to load config: {e}")

【提示】config = yaml.safe_load(f) 这个已经将整个文件加载到内存,由于配置文件都不会超级大,所以不会导致内存OOM。但是它没有进一步校验 “port应该是整数”,“host应该是有效主机名/IP”这些格式问题。

  1. 分析下面的代码有什么BUG或者隐患?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
"""
批量分析Nginx访问日志,统计状态码分布和响应时间
"""

import os
import sys
import glob
import gzip
from collections import defaultdict
import datetime

def analyze_nginx_logs(log_dir, output_file="report.txt"):
"""
分析指定目录下的Nginx日志文件
"""
# 查找所有日志文件
log_files = []
for pattern in ["*.log", "*.log.gz"]:
log_files.extend(glob.glob(os.path.join(log_dir, pattern)))

if not log_files:
print(f"No log files found in {log_dir}")
return

# 统计数据结构
status_codes = defaultdict(int)
response_times = []
total_requests = 0

# 处理每个日志文件
for log_file in log_files:
print(f"Processing {log_file}...")

# 判断文件类型并打开
if log_file.endswith('.gz'):
f = gzip.open(log_file, 'rt', encoding='utf-8')
else:
f = open(log_file, 'r', encoding='utf-8')

# 逐行分析
for line in f:
total_requests += 1

# 解析日志行 (简化版Nginx日志格式)
# 示例: 127.0.0.1 - - [01/Jan/2024:10:00:00 +0800] "GET /api/test HTTP/1.1" 200 1024 "-" "Mozilla/5.0" 0.123
try:
parts = line.split()
if len(parts) < 12:
continue

# 提取状态码
status_code = int(parts[8])
status_codes[status_code] += 1

# 提取响应时间
response_time = float(parts[11])
response_times.append(response_time)

except (ValueError, IndexError) as e:
# 跳过解析失败的行
continue

# 文件处理完成
f.close()

# 生成报告
with open(output_file, 'w') as report:
report.write(f"=== Nginx Log Analysis Report ===\n")
report.write(f"Generated at: {datetime.datetime.now()}\n")
report.write(f"Log directory: {log_dir}\n")
report.write(f"Total files processed: {len(log_files)}\n")
report.write(f"Total requests: {total_requests}\n\n")

report.write("Status Code Distribution:\n")
for code, count in sorted(status_codes.items()):
percentage = (count / total_requests) * 100
report.write(f" {code}: {count} ({percentage:.2f}%)\n")

report.write("\nResponse Time Statistics (seconds):\n")
if response_times:
avg_time = sum(response_times) / len(response_times)
p95_time = sorted(response_times)[int(len(response_times) * 0.95)]
max_time = max(response_times)

report.write(f" Average: {avg_time:.3f}s\n")
report.write(f" P95: {p95_time:.3f}s\n")
report.write(f" Max: {max_time:.3f}s\n")

# 识别慢请求
slow_threshold = 1.0 # 1秒
slow_requests = [t for t in response_times if t > slow_threshold]
report.write(f" Slow requests (>1s): {len(slow_requests)} ({len(slow_requests)/len(response_times)*100:.2f}%)\n")
else:
report.write(" No valid response time data\n")

print(f"Report saved to {output_file}")

def main():
"""主函数"""
if len(sys.argv) < 2:
print("Usage: python log_analyzer.py <log_directory> [output_file]")
sys.exit(1)

log_dir = sys.argv[1]
output_file = sys.argv[2] if len(sys.argv) > 2 else "report.txt"

# 检查目录是否存在
if not os.path.isdir(log_dir):
print(f"Error: {log_dir} is not a valid directory")
sys.exit(1)

# 执行分析
analyze_nginx_logs(log_dir, output_file)

if __name__ == "__main__":
main()

【提示】除了几个简单,还有这几个:response_times = [] # 所有响应时间都存入列表,这里如果文件很大,直接就把内存撑破了,OOM。p95_time = sorted(response_times)[int(len(response_times) * 0.95)]这里用到了 sort,如果是几百万的数据,排序是非常消耗CPU的。encoding='utf-8' # 假设所有日志都是UTF-8 这里指考虑了UTF-8,没考虑到其他的情况。

  1. 最后一个,上点强度,看出来这段代码的隐患和BUG:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/env python3
"""
分布式任务调度器健康检查与故障转移
"""

import threading
import time
import random
import socket
import json
from datetime import datetime
from typing import Dict, List, Optional
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TaskSchedulerNode:
"""任务调度节点"""
def __init__(self, node_id: str, host: str, port: int):
self.node_id = node_id
self.host = host
self.port = port
self.is_leader = False
self.is_healthy = True
self.last_heartbeat = time.time()
self.tasks = []

def to_dict(self):
return {
'node_id': self.node_id,
'host': self.host,
'port': self.port,
'is_leader': self.is_leader,
'is_healthy': self.is_healthy
}

class DistributedSchedulerMonitor:
"""分布式调度器监控器"""

def __init__(self, nodes: List[Dict]):
self.nodes = []
for node_info in nodes:
node = TaskSchedulerNode(
node_id=node_info['id'],
host=node_info['host'],
port=node_info['port']
)
self.nodes.append(node)

# 选举第一个节点为leader
if self.nodes:
self.nodes[0].is_leader = True

self.heartbeat_interval = 5 # 秒
self.heartbeat_timeout = 15 # 秒
self.monitor_thread = None
self.running = False

# 统计信息
self.stats = {
'health_checks': 0,
'failovers': 0,
'last_failover': None
}

def start_monitoring(self):
"""启动监控线程"""
self.running = True
self.monitor_thread = threading.Thread(target=self._monitor_loop)
self.monitor_thread.daemon = True
self.monitor_thread.start()
logger.info("监控线程已启动")

def stop_monitoring(self):
"""停止监控"""
self.running = False
if self.monitor_thread:
self.monitor_thread.join(timeout=10)
logger.info("监控线程已停止")

def _monitor_loop(self):
"""监控主循环"""
while self.running:
try:
self._check_all_nodes()
self._elect_leader_if_needed()
self._log_status()
except Exception as e:
logger.error(f"监控循环异常: {e}")

time.sleep(self.heartbeat_interval)

def _check_all_nodes(self):
"""检查所有节点健康状态"""
threads = []
for node in self.nodes:
thread = threading.Thread(
target=self._check_node_health,
args=(node,)
)
thread.start()
threads.append(thread)

# 等待所有检查完成
for thread in threads:
thread.join()

self.stats['health_checks'] += 1

def _check_node_health(self, node: TaskSchedulerNode):
"""检查单个节点健康状态"""
try:
# 模拟健康检查(实际可能是HTTP请求或TCP连接)
if random.random() < 0.05: # 5%概率模拟检查失败
raise ConnectionError("模拟网络故障")

# 更新心跳时间
node.last_heartbeat = time.time()
node.is_healthy = True

# 如果是leader,额外检查负载
if node.is_leader:
self._check_leader_load(node)

except (ConnectionError, socket.timeout, socket.error) as e:
logger.warning(f"节点 {node.node_id} 健康检查失败: {e}")
node.is_healthy = False

def _check_leader_load(self, leader_node: TaskSchedulerNode):
"""检查leader节点负载"""
# 模拟获取leader负载
load = random.randint(0, 100)
if load > 80:
logger.warning(f"Leader节点 {leader_node.node_id} 负载过高: {load}%")
# 模拟触发负载均衡
if random.random() < 0.3:
self._trigger_load_balance()

def _trigger_load_balance(self):
"""触发负载均衡"""
logger.info("触发负载均衡...")
# 简化实现:随机选择一个健康节点分担任务
healthy_nodes = [n for n in self.nodes if n.is_healthy and not n.is_leader]
if healthy_nodes:
selected = random.choice(healthy_nodes)
logger.info(f"选择节点 {selected.node_id} 分担负载")

def _elect_leader_if_needed(self):
"""如果需要,选举新的leader"""
current_leader = self._get_current_leader()

# 检查leader是否健康
if current_leader and not current_leader.is_healthy:
logger.warning(f"Leader节点 {current_leader.node_id} 不健康,开始故障转移")
self._perform_failover(current_leader)

def _get_current_leader(self) -> Optional[TaskSchedulerNode]:
"""获取当前leader节点"""
for node in self.nodes:
if node.is_leader:
return node
return None

def _perform_failover(self, failed_leader: TaskSchedulerNode):
"""执行故障转移"""
# 标记原leader为非leader
failed_leader.is_leader = False

# 选择新的leader(选择最健康的节点)
candidates = [n for n in self.nodes if n.is_healthy and n != failed_leader]

if not candidates:
logger.error("没有可用的健康节点作为leader候选")
return

# 选择策略:选择最近心跳时间最新的节点
new_leader = max(candidates, key=lambda n: n.last_heartbeat)
new_leader.is_leader = True

# 更新统计
self.stats['failovers'] += 1
self.stats['last_failover'] = datetime.now().isoformat()

logger.info(f"故障转移完成: {failed_leader.node_id} -> {new_leader.node_id}")

# 通知所有节点leader变更
self._notify_leader_change(new_leader)

def _notify_leader_change(self, new_leader: TaskSchedulerNode):
"""通知所有节点leader变更"""
notification = {
'type': 'leader_change',
'new_leader_id': new_leader.node_id,
'timestamp': time.time()
}

# 简化实现:记录日志
logger.info(f"通知leader变更: {json.dumps(notification)}")

def _log_status(self):
"""记录状态日志"""
status = {
'timestamp': datetime.now().isoformat(),
'total_nodes': len(self.nodes),
'healthy_nodes': len([n for n in self.nodes if n.is_healthy]),
'current_leader': self._get_current_leader().node_id if self._get_current_leader() else None,
'stats': self.stats
}
logger.debug(f"系统状态: {json.dumps(status, indent=2)}")

def add_task(self, task_data: Dict):
"""添加任务到leader节点"""
leader = self._get_current_leader()
if not leader:
logger.error("没有可用的leader节点")
return False

if not leader.is_healthy:
logger.error("Leader节点不健康")
return False

# 模拟任务分配
leader.tasks.append(task_data)
logger.info(f"任务已添加到leader节点 {leader.node_id}: {task_data.get('id', 'unknown')}")
return True

def get_status_report(self) -> Dict:
"""获取状态报告"""
return {
'monitor_running': self.running,
'nodes': [node.to_dict() for node in self.nodes],
'stats': self.stats,
'timestamp': datetime.now().isoformat()
}

# 使用示例
def main():
"""主函数"""
# 模拟3个节点
nodes_config = [
{'id': 'node-1', 'host': '192.168.1.101', 'port': 8080},
{'id': 'node-2', 'host': '192.168.1.102', 'port': 8080},
{'id': 'node-3', 'host': '192.168.1.103', 'port': 8080}
]

monitor = DistributedSchedulerMonitor(nodes_config)

try:
# 启动监控
monitor.start_monitoring()

# 模拟运行一段时间
for i in range(10):
time.sleep(3)

# 随机添加任务
if random.random() < 0.5:
task = {'id': f'task-{i}', 'data': f'payload-{i}'}
monitor.add_task(task)

# 打印状态
if i % 3 == 0:
report = monitor.get_status_report()
print(f"\n状态报告: {json.dumps(report, indent=2)}")

except KeyboardInterrupt:
print("\n收到中断信号")
finally:
monitor.stop_monitoring()
print("监控已停止")

if __name__ == "__main__":
main()

【提示】

  1. 多个线程同时修改同一个node对象的is_healthylast_heartbeat属性,有数据不一致和脏读的问题。
  2. 然后new_leader = max(candidates, key=lambda n: n.last_heartbeat)这个选举只要时间最近的,但是不考虑节点的负载情况,可能会选出不合适的节点当leader。
  3. `# 标记原leader为非leader
    failed_leader.is_leader = False

    选择新的leader

    new_leader = max(candidates, key=lambda n: n.last_heartbeat)`这个地方没有考虑到网络抖动就直接给老leader干掉了,但是老leader本身不知道自己被干掉了。所以出现两个leader,导致脑裂,一波老节点还在连老leader,新的节点去链接新leader。
  4. leader.tasks.append(task_data) 把结果仅保存在内存中,没有持久化,任务无法重新分配。会产生数据不一致。
    当然还有额外几个,超出了我的能力了,能全看出来的,肯定是P9 P10级别的大神了。
感谢你请我喝咖啡~

Welcome to my other publishing channels