摘要:开发 MCP Server 及 Inspector 工具需要深入理解协议细节和网络通信机制。以下是分步骤详解,基于假设的MCP(自定义消息控制协议)实现,采用 TCP 协议,包含消息头、数据类型、负载和校验码。
开发 MCP Server 及 Inspector 工具需要深入理解协议细节和网络通信机制。以下是分步骤详解,基于假设的 MCP(自定义消息控制协议) 实现,采用 TCP 协议,包含消息头、数据类型、负载和校验码。
一、MCP 协议定义(假设样例)
Python
# 消息格式:[HEADER][TYPE][LENGTH][DATA][checksum]
# HEADER: 2字节魔术字 0x4D43 (MC)
# TYPE: 1字节消息类型 (0x01:请求, 0x02:响应)
# LENGTH: 2字节数据长度(大端)
# DATA: 变长负载数据
# CHECKSUM: 1字节异或校验(从HEADER到DATA所有字节异或)
二、MCP Server 开发(Python 实现)
1. 使用 socket 库处理 TCP 连接
python
import socket
import threading
class MCPServer:
def __init__(self, host='0.0.0.0', port=9999):
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.bind((host, port))
self.server.listen(5)
print(f"[*] MCP Server listening on {host}:{port}")
def handle_client(self, client_socket):
try:
while True:
data = client_socket.recv(1024)
if not data:
break
# 解析MCP消息
response = self.process_mcp_message(data)
if response:
client_socket.send(response)
except Exception as e:
print(f"Client error: {e}")
finally:
client_socket.close
def start(self):
while True:
client_sock, addr = self.server.accept
print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
client_handler = threading.Thread(
target=self.handle_client, args=(client_sock,))
client_handler.start
def process_mcp_message(self, data):
# 解析和校验逻辑
if len(data)
return None
header = data[0:2]
if header != b'MC':
return None
msg_type = data[2]
LENGTH = int.from_bytes(data[3:5], byteorder='big')
if len(data)
return None
payload = data[5:5+length]
checksum = data[5+length]
# 计算校验
calculated_checksum = 0
for byte in data[:5+length]:
calculated_checksum ^= byte
if calculated_checksum != checksum:
return None
# 处理业务逻辑
print(f"Received MCP message: Type={msg_type}, Data={payload}")
# 构造响应(示例)
response = self.build_mcp_response(payload)
return response
def build_mcp_response(self, data):
# 构造响应消息
header = b'MC'
msg_type = 0x02
response_data = b'Server Processed: ' + data
length = len(response_data).to_bytes(2, 'big')
checksum = 0
message = header + bytes([msg_type]) + length + response_data
for byte in message:
checksum ^= byte
message += bytes([checksum])
return message
if __name__ == "__main__":
server = MCPServer
server.start
三、MCP Inspector 工具开发
1. 方案选择:TCP 代理 + 数据监控
Inspector 作为中间人,拦截并记录流量。
python
import socket
from threading import Thread
import argparse
class MCPInspector:
def __init__(self, listen_host, listen_port, remote_host, remote_port):
self.listen_host = listen_host
self.listen_port = listen_port
self.remote_host = remote_host
self.remote_port = remote_port
def proxy_handler(self, client_sock):
remote_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
remote_sock.connect((self.remote_host, self.remote_port))
# 双向流量转发
def forward(src, dst, label):
while True:
data = src.recv(4096)
if not data:
break
self.inspect_data(data, label)
dst.send(data)
# 启动线程监控双向数据
Thread(target=forward, args=(client_sock, remote_sock, '-> Server')).start
Thread(target=forward, args=(remote_sock, client_sock, '
def inspect_data(self, data, direction):
# 解析MCP消息并打印
print(f"\n{direction} MCP Message:")
try:
if data[:2] != b'MC':
print("Invalid MCP Header")
return
msg_type = data[2]
length = int.from_bytes(data[3:5], 'big')
payload = data[5:5+length]
checksum = data[5+length]
print(f"Type: {msg_type}, Length: {length}, Payload: {payload}, Checksum: {checksum}")
except Exception as e:
print(f"Parse error: {e}")
def start(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((self.listen_host, self.listen_port))
server.listen(5)
print(f"[*] Inspector listening on {self.listen_host}:{self.listen_port}")
while True:
client_sock, addr = server.accept
print(f"[*] Inspector accepted connection from {addr}")
handler = Thread(target=self.proxy_handler, args=(client_sock,))
handler.start
if __name__ == "__main__":
parser = argparse.ArgumentParser
parser.add_argument("--listen_host", default="0.0.0.0")
parser.add_argument("--listen_port", type=int, default=9998)
parser.add_argument("--remote_host", default="127.0.0.1")
parser.add_argument("--remote_port", type=int, default=9999)
args = parser.parse_args
inspector = MCPInspector(
args.listen_host, args.listen_port,
args.remote_host, args.remote_port
)
inspector.start
四、运行流程
启动 MCP Serverbash
python mcp_server.py
启动 Inspector(作为代理)bash
python mcp_inspector.py --listen_port 9998 --remote_port 9999
客户端连接 Inspector 而非直接连接 Serverpython
# 示例客户端代码
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(('127.0.0.1', 9998)) # 连接到Inspector
client.send(b'MC\x01\x00\x05Hello\x00') # 示例消息(校验和可能无效)
response = client.recv(1024)
print("Response:", response)
五、扩展优化方向
协议增强Ø 支持更复杂的数据类型(JSON、二进制结构)。
Ø 添加身份验证(消息头包含 Token)。
性能优化Ø 使用 asyncio 替代多线程。
Ø 消息处理异步化。
Inspector 功能增强Ø 图形界面(Tkinter/PyQt)。
Ø 数据包存储与回放。
Ø 流量统计(消息类型分布)。
错误处理Ø 完善校验失败的重传机制。
Ø 心跳包检测连接状态。
六、关键问题解决
TCP 粘包处理Ø 在 process_mcp_message 中根据 LENGTH 字段精确切割数据。
校验和验证Ø 遍历所有相关字节进行异或计算,确保数据完整性。
跨平台兼容性Ø 使用标准库实现,确保Windows/Linux/macOS兼容。
通过此方案,可实现一个基础的 MCP 通信系统及监控工具,适用于物联网、工业控制等场景的调试与开发。
来源:老客数据一点号