1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
import asyncio import base64 import json import aiohttp from cryptography.hazmat.primitives import hashes, hmac as crypto_hmac from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.backends import default_backend
HOMESERVER = "https://matrix.example.com" USER_ID = "@openclaw:matrix.example.com" ACCESS_TOKEN = "syt_***" RECOVERY_KEY = "**** **** ..." DEVICE_ID = "**********"
HEADERS = {"Authorization": f"Bearer {ACCESS_TOKEN}", "Content-Type": "application/json"}
def decode_recovery_key(key: str) -> bytes: """Base58 解码恢复密钥(Matrix 格式:2字节前缀 + 32字节密钥 + 1字节校验)""" key = key.replace(" ", "") alphabet = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" num = 0 for char in key: num = num * 58 + alphabet.index(char) raw = num.to_bytes(35, 'big') prefix, payload, parity = raw[:2], raw[2:34], raw[34] expected_parity = 0 for b in prefix + payload: expected_parity ^= b if expected_parity != parity: raise ValueError(f"恢复密钥校验失败: expected {expected_parity:#04x}, got {parity:#04x}") return payload
def decrypt_ssss_secret(ssss_key: bytes, secret_name: str, enc: dict) -> bytes: """ 解密 m.secret_storage.v1.aes-hmac-sha2 格式的 SSSS 加密数据。 返回解密后的原始字节(若明文为 base64 字符串则已 decode)。 """ derived = HKDF( algorithm=hashes.SHA256(), length=64, salt=b'\x00' * 32, info=secret_name.encode(), backend=default_backend() ).derive(ssss_key) aes_key = derived[:32] mac_key = derived[32:]
iv_bytes = base64.b64decode(enc["iv"]) ciphertext_bytes = base64.b64decode(enc["ciphertext"]) expected_mac = base64.b64decode(enc["mac"])
h = crypto_hmac.HMAC(mac_key, hashes.SHA256(), backend=default_backend()) h.update(ciphertext_bytes) computed_mac = h.finalize() if computed_mac != expected_mac: raise ValueError("HMAC 验证失败,SSSS key 或加密数据有误")
decryptor = Cipher( algorithms.AES(aes_key), modes.CTR(iv_bytes), backend=default_backend() ).decryptor() plaintext = decryptor.update(ciphertext_bytes) + decryptor.finalize()
plaintext_str = plaintext.decode('utf-8') padding = '=' * (-len(plaintext_str) % 4) return base64.b64decode(plaintext_str + padding)
async def get_account_data(session: aiohttp.ClientSession, event_type: str) -> dict: url = f"{HOMESERVER}/_matrix/client/v3/user/{USER_ID}/account_data/{event_type}" async with session.get(url, headers=HEADERS) as resp: resp.raise_for_status() return await resp.json()
async def verify_device(): async with aiohttp.ClientSession() as session: ssss_key = decode_recovery_key(RECOVERY_KEY) print(f"SSSS key: {ssss_key.hex()}")
key_id = (await get_account_data(session, "m.secret_storage.default_key"))["key"] print(f"Default key ID: {key_id}")
cross_signing_data = await get_account_data(session, "m.cross_signing.self_signing") enc = cross_signing_data["encrypted"][key_id] print(f"Encrypted self-signing key IV: {enc['iv']}")
self_signing_seed = decrypt_ssss_secret(ssss_key, "m.cross_signing.self_signing", enc) print(f"Decrypted self-signing seed ({len(self_signing_seed)} bytes): {self_signing_seed.hex()}")
async with session.post( f"{HOMESERVER}/_matrix/client/v3/keys/query", headers=HEADERS, json={"device_keys": {USER_ID: [DEVICE_ID]}} ) as resp: resp.raise_for_status() device_keys_resp = await resp.json()
device_data = device_keys_resp["device_keys"][USER_ID][DEVICE_ID]
self_signing_keys = device_keys_resp["self_signing_keys"][USER_ID]["keys"] self_signing_key_id = next(iter(self_signing_keys)) print(f"Self-signing key ID: {self_signing_key_id}")
sign_data = {k: v for k, v in device_data.items() if k not in ("signatures", "unsigned")} canonical = json.dumps(sign_data, separators=(',', ':'), sort_keys=True) print(f"Canonical JSON: {canonical}")
private_key = Ed25519PrivateKey.from_private_bytes(self_signing_seed) signature_bytes = private_key.sign(canonical.encode('utf-8')) sig_b64 = base64.b64encode(signature_bytes).decode() print(f"Signature: {sig_b64}")
pub_bytes = private_key.public_key().public_bytes_raw() derived_pub_b64 = base64.b64encode(pub_bytes).decode().rstrip('=') expected_pub = self_signing_keys[self_signing_key_id].rstrip('=') if derived_pub_b64 != expected_pub: raise ValueError( f"公钥不匹配!\n 派生: {derived_pub_b64}\n 服务器: {expected_pub}" ) print("公钥验证通过 ✓")
upload_body = { USER_ID: { DEVICE_ID: { **sign_data, "signatures": { USER_ID: { self_signing_key_id: sig_b64 } } } } } async with session.post( f"{HOMESERVER}/_matrix/client/v3/keys/signatures/upload", headers=HEADERS, json=upload_body ) as resp: resp.raise_for_status() result = await resp.json()
failures = result.get("failures", {}) if failures: print(f"上传失败: {json.dumps(failures, indent=2)}") else: print(f"设备 {DEVICE_ID} 验证成功!")
asyncio.run(verify_device())
|