Matrix 的安装与配置

安装 Matrix 插件

1
2
openclaw plugins install @openclaw/matrix
npm install -g vector-im/matrix-bot-sdk

获取 accessToken

1
2
3
4
curl -XPOST \
-H "Content-Type: application/json" \
-d '{"type":"m.login.password", "user":"@openclaw:matrix.example.com", "password":"yourpassword"}' \
"https://marex.example.com/_matrix/client/r0/login"

修改 OpenClaw 配置

添加 channels

1
2
3
4
5
6
7
8
9
10
11
{
"channels": {
"matrix": {
"enabled": true,
"homeserver": "https://matrix.example.org",
"accessToken": "syt_***",
"encryption": true,
"dm": { "policy": "pairing" },
},
},
}

EE2E 配置方式

上面的步骤都很简单,但是到认证这一步遇到麻烦。虽然官网的文档中写到:

On first connection, OpenClaw requests device verification from your other sessions.

但是实际上并没有,根据 reddit 中的说法:

OpenClaw’s Matrix plugin creates an encrypted device, but it shows as “Unverified” in Element. The built-in requestOwnUserVerification() method doesn’t exist in the bot-sdk, so no client UI can verify it interactively.

程序没有被完成,按照帖子中的教程可以解决认证问题。

网页登录账号获取恢复密钥

登录 element 或是任意你喜欢的 Matrix 客户端,获取恢复密钥。

通过恢复密钥认证

在配置好后运行一次 OpenClaw,会生成 ~/.openclaw/matrix/accounts/default/<homeserver>__<user>/<token-hash>/crypto/ 文件。

创建 python 脚本后运行即可认证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
#!/usr/bin/env python3
#~/.openclaw/verify_device.py
import asyncio
import base64
import json
import aiohttp
from cryptography.hazmat.primitives import hashes, hmac as crypto_hmac
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.backends import default_backend

# 配置
HOMESERVER = "https://matrix.example.com"
USER_ID = "@openclaw:matrix.example.com"
ACCESS_TOKEN = "syt_***"
RECOVERY_KEY = "**** **** ..."
DEVICE_ID = "**********"

HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json"}


def decode_recovery_key(key: str) -> bytes:
"""Base58 解码恢复密钥(Matrix 格式:2字节前缀 + 32字节密钥 + 1字节校验)"""
key = key.replace(" ", "")
alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
num = 0
for char in key:
num = num * 58 + alphabet.index(char)
raw = num.to_bytes(35, 'big')
prefix, payload, parity = raw[:2], raw[2:34], raw[34]
expected_parity = 0
for b in prefix + payload:
expected_parity ^= b
if expected_parity != parity:
raise ValueError(f"恢复密钥校验失败: expected {expected_parity:#04x}, got {parity:#04x}")
return payload


def decrypt_ssss_secret(ssss_key: bytes, secret_name: str, enc: dict) -> bytes:
"""
解密 m.secret_storage.v1.aes-hmac-sha2 格式的 SSSS 加密数据。
返回解密后的原始字节(若明文为 base64 字符串则已 decode)。
"""
# HKDF 派生 AES key(32字节)和 HMAC key(32字节),info = secret 名称
derived = HKDF(
algorithm=hashes.SHA256(),
length=64,
salt=b'\x00' * 32,
info=secret_name.encode(),
backend=default_backend()
).derive(ssss_key)
aes_key = derived[:32]
mac_key = derived[32:]

iv_bytes = base64.b64decode(enc["iv"])
ciphertext_bytes = base64.b64decode(enc["ciphertext"])
expected_mac = base64.b64decode(enc["mac"])

# 验证 HMAC-SHA256
h = crypto_hmac.HMAC(mac_key, hashes.SHA256(), backend=default_backend())
h.update(ciphertext_bytes)
computed_mac = h.finalize()
if computed_mac != expected_mac:
raise ValueError("HMAC 验证失败,SSSS key 或加密数据有误")

# AES-256-CTR 解密
decryptor = Cipher(
algorithms.AES(aes_key), modes.CTR(iv_bytes), backend=default_backend()
).decryptor()
plaintext = decryptor.update(ciphertext_bytes) + decryptor.finalize()

# matrix-js-sdk 将私钥以 base64(无 padding)字符串存入 SSSS
# 解密后得到该字符串,再 decode 回 32 字节
plaintext_str = plaintext.decode('utf-8')
# 补齐 base64 padding
padding = '=' * (-len(plaintext_str) % 4)
return base64.b64decode(plaintext_str + padding)


async def get_account_data(session: aiohttp.ClientSession, event_type: str) -> dict:
url = f"{HOMESERVER}/_matrix/client/v3/user/{USER_ID}/account_data/{event_type}"
async with session.get(url, headers=HEADERS) as resp:
resp.raise_for_status()
return await resp.json()


async def verify_device():
async with aiohttp.ClientSession() as session:
# 1. 解码恢复密钥
ssss_key = decode_recovery_key(RECOVERY_KEY)
print(f"SSSS key: {ssss_key.hex()}")

# 2. 获取默认 SSSS key ID 及加密的 self-signing 私钥
key_id = (await get_account_data(session, "m.secret_storage.default_key"))["key"]
print(f"Default key ID: {key_id}")

cross_signing_data = await get_account_data(session, "m.cross_signing.self_signing")
enc = cross_signing_data["encrypted"][key_id]
print(f"Encrypted self-signing key IV: {enc['iv']}")

# 3. 解密 self-signing 私钥
self_signing_seed = decrypt_ssss_secret(ssss_key, "m.cross_signing.self_signing", enc)
print(f"Decrypted self-signing seed ({len(self_signing_seed)} bytes): {self_signing_seed.hex()}")

# 4. 查询设备密钥,获取要签名的完整设备信息
async with session.post(
f"{HOMESERVER}/_matrix/client/v3/keys/query",
headers=HEADERS,
json={"device_keys": {USER_ID: [DEVICE_ID]}}
) as resp:
resp.raise_for_status()
device_keys_resp = await resp.json()

device_data = device_keys_resp["device_keys"][USER_ID][DEVICE_ID]

# 获取 self-signing 公钥 ID(用于签名 key name)
self_signing_keys = device_keys_resp["self_signing_keys"][USER_ID]["keys"]
self_signing_key_id = next(iter(self_signing_keys)) # e.g. "ed25519:YVNCFy..."
print(f"Self-signing key ID: {self_signing_key_id}")

# 5. 构造规范 JSON(去掉 signatures 和 unsigned 字段,按 key 排序,无空格)
sign_data = {k: v for k, v in device_data.items() if k not in ("signatures", "unsigned")}
canonical = json.dumps(sign_data, separators=(',', ':'), sort_keys=True)
print(f"Canonical JSON: {canonical}")

# 6. 用 self-signing 私钥对规范 JSON 进行 ed25519 签名
private_key = Ed25519PrivateKey.from_private_bytes(self_signing_seed)
signature_bytes = private_key.sign(canonical.encode('utf-8'))
sig_b64 = base64.b64encode(signature_bytes).decode()
print(f"Signature: {sig_b64}")

# 验证公钥与服务器返回的 self-signing 公钥是否匹配
pub_bytes = private_key.public_key().public_bytes_raw()
derived_pub_b64 = base64.b64encode(pub_bytes).decode().rstrip('=')
expected_pub = self_signing_keys[self_signing_key_id].rstrip('=')
if derived_pub_b64 != expected_pub:
raise ValueError(
f"公钥不匹配!\n 派生: {derived_pub_b64}\n 服务器: {expected_pub}"
)
print("公钥验证通过 ✓")

# 7. 上传签名
upload_body = {
USER_ID: {
DEVICE_ID: {
**sign_data,
"signatures": {
USER_ID: {
self_signing_key_id: sig_b64
}
}
}
}
}
async with session.post(
f"{HOMESERVER}/_matrix/client/v3/keys/signatures/upload",
headers=HEADERS,
json=upload_body
) as resp:
resp.raise_for_status()
result = await resp.json()

failures = result.get("failures", {})
if failures:
print(f"上传失败: {json.dumps(failures, indent=2)}")
else:
print(f"设备 {DEVICE_ID} 验证成功!")


asyncio.run(verify_device())