Files
TrustTunnel/scripts/deeplink_to_config.py
Aleksei Zhavoronkov 8856e7ba83 Pull request 161: Support deep-link config export in TrustTunnel
Squashed commit of the following:

commit f1e659de448becebd52d75be09af5b905b2bf51c
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Wed Feb 18 16:42:06 2026 +0300

    Support client_random_prefix in deeplink library

commit 15bc28e9161affaf75c8c1040fd1bc69a03214aa
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Wed Feb 18 16:41:00 2026 +0300

    Export client_random_prefix to clients config

commit 685cb51cdbd7b19fdcfac4a031947cff1017d3b2
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Wed Feb 18 12:11:02 2026 +0300

    Fix README git url

commit ed9820d9178be01eab9ae406c774c032fd17080f
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Tue Feb 17 18:08:52 2026 +0300

    Fix markdown lint

commit 14b4c1467389631184db0e3d35818b2dc130914d
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Tue Feb 17 18:02:27 2026 +0300

    Fix cert

commit fed3a9578e09ce7d5dd6bc47f5bc9db4f77902d8
Author: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
Date:   Tue Feb 17 16:31:44 2026 +0300

    Support deep-link config export in TrustTunnel.
    
    Create trusttunnel-deeplink library crate with encode/decode functionality.
    Add deep-link export to TrustTunnel and enable it by default.
    
    Signed-off-by: Zhavoronkov Aleksei <a.zhavoronkov@adguard.com>
2026-02-18 14:14:59 +00:00

273 lines
9.9 KiB
Python

#!/usr/bin/env python3
"""Convert a tt:// deep link URI back to a TrustTunnel endpoint TOML config.
Usage:
python3 deeplink_to_config.py <tt://...>
See DEEP_LINK.md for the specification.
"""
from __future__ import annotations
import base64
import sys
# ---------------------------------------------------------------------------
# TLS variable-length integer decoding (RFC 9000 §16)
# ---------------------------------------------------------------------------
def decode_varint(data: bytes, offset: int) -> tuple[int, int]:
"""Decode a TLS/QUIC variable-length integer at *offset*.
Returns (value, new_offset).
"""
if offset >= len(data):
raise ValueError("unexpected end of data while reading varint")
first = data[offset]
prefix = first >> 6
if prefix == 0:
return first & 0x3F, offset + 1
if prefix == 1:
if offset + 2 > len(data):
raise ValueError("truncated 2-byte varint")
return int.from_bytes(data[offset:offset + 2], "big") & 0x3FFF, offset + 2
if prefix == 2:
if offset + 4 > len(data):
raise ValueError("truncated 4-byte varint")
return int.from_bytes(data[offset:offset + 4], "big") & 0x3FFFFFFF, offset + 4
# prefix == 3
if offset + 8 > len(data):
raise ValueError("truncated 8-byte varint")
return int.from_bytes(data[offset:offset + 8], "big") & 0x3FFFFFFFFFFFFFFF, offset + 8
# ---------------------------------------------------------------------------
# Tag constants (must match config_to_deeplink.py)
# ---------------------------------------------------------------------------
TAG_HOSTNAME = 0x01
TAG_ADDRESS = 0x02
TAG_CUSTOM_SNI = 0x03
TAG_HAS_IPV6 = 0x04
TAG_USERNAME = 0x05
TAG_PASSWORD = 0x06
TAG_SKIP_VERIFICATION = 0x07
TAG_CERTIFICATE = 0x08
TAG_UPSTREAM_PROTOCOL = 0x09
TAG_ANTI_DPI = 0x0A
TAG_CLIENT_RANDOM_PREFIX = 0x0B
PROTOCOL_RMAP = {0x01: "http2", 0x02: "http3"}
DEFAULTS = {
"has_ipv6": True,
"skip_verification": False,
"upstream_protocol": "http2",
"anti_dpi": False,
}
# ---------------------------------------------------------------------------
# DER → PEM conversion
# ---------------------------------------------------------------------------
def _read_asn1_length(data: bytes, offset: int) -> tuple[int, int]:
"""Read an ASN.1 length at *offset*. Returns (length, new_offset)."""
if offset >= len(data):
raise ValueError("unexpected end of data in ASN.1 length")
first = data[offset]
if first < 0x80:
return first, offset + 1
num_bytes = first & 0x7F
if num_bytes == 0 or offset + 1 + num_bytes > len(data):
raise ValueError("invalid ASN.1 length encoding")
length = int.from_bytes(data[offset + 1:offset + 1 + num_bytes], "big")
return length, offset + 1 + num_bytes
def _split_der_certs(data: bytes) -> list[bytes]:
"""Split concatenated DER certificates into individual blobs."""
certs: list[bytes] = []
offset = 0
while offset < len(data):
if data[offset] != 0x30:
raise ValueError(f"expected ASN.1 SEQUENCE (0x30) at offset {offset}, "
f"got 0x{data[offset]:02X}")
body_len, hdr_end = _read_asn1_length(data, offset + 1)
cert_end = hdr_end + body_len
if cert_end > len(data):
raise ValueError("truncated DER certificate")
certs.append(data[offset:cert_end])
offset = cert_end
return certs
def der_to_pem(data: bytes) -> str:
"""Convert concatenated DER certificates to a PEM string."""
certs = _split_der_certs(data)
pem_blocks: list[str] = []
for der in certs:
b64 = base64.b64encode(der).decode("ascii")
lines = [b64[i:i + 64] for i in range(0, len(b64), 64)]
pem_blocks.append(
"-----BEGIN CERTIFICATE-----\n"
+ "\n".join(lines)
+ "\n-----END CERTIFICATE-----"
)
return "\n".join(pem_blocks) + "\n"
# ---------------------------------------------------------------------------
# TLV parser
# ---------------------------------------------------------------------------
def parse_tlv(data: bytes) -> list[tuple[int, bytes]]:
"""Parse a sequence of TLV entries from *data*."""
entries: list[tuple[int, bytes]] = []
offset = 0
while offset < len(data):
tag, offset = decode_varint(data, offset)
length, offset = decode_varint(data, offset)
if offset + length > len(data):
raise ValueError(f"TLV value truncated: tag=0x{tag:02X}, "
f"expected {length} bytes, got {len(data) - offset}")
value = data[offset:offset + length]
offset += length
entries.append((tag, value))
return entries
# ---------------------------------------------------------------------------
# Decoder
# ---------------------------------------------------------------------------
def decode_config(data: bytes) -> dict:
"""Decode TLV binary payload into a config dict."""
entries = parse_tlv(data)
cfg: dict = {}
addresses: list[str] = []
for tag, value in entries:
if tag == TAG_HOSTNAME:
cfg["hostname"] = value.decode()
elif tag == TAG_ADDRESS:
addresses.append(value.decode())
elif tag == TAG_CUSTOM_SNI:
cfg["custom_sni"] = value.decode()
elif tag == TAG_HAS_IPV6:
cfg["has_ipv6"] = value[0] != 0
elif tag == TAG_USERNAME:
cfg["username"] = value.decode()
elif tag == TAG_PASSWORD:
cfg["password"] = value.decode()
elif tag == TAG_SKIP_VERIFICATION:
cfg["skip_verification"] = value[0] != 0
elif tag == TAG_CERTIFICATE:
cfg["certificate"] = der_to_pem(value)
elif tag == TAG_UPSTREAM_PROTOCOL:
proto_byte = value[0]
if proto_byte not in PROTOCOL_RMAP:
raise ValueError(f"unknown upstream_protocol byte: 0x{proto_byte:02X}")
cfg["upstream_protocol"] = PROTOCOL_RMAP[proto_byte]
elif tag == TAG_ANTI_DPI:
cfg["anti_dpi"] = value[0] != 0
elif tag == TAG_CLIENT_RANDOM_PREFIX:
cfg["client_random_prefix"] = value.decode()
# Unknown tags are silently ignored per spec.
if addresses:
cfg["addresses"] = addresses
# Apply defaults for omitted optional fields.
for key, default in DEFAULTS.items():
cfg.setdefault(key, default)
# Certificate defaults to empty string when not present (verified by system CAs).
cfg.setdefault("certificate", "")
return cfg
# ---------------------------------------------------------------------------
# TOML emitter (minimal, no external dependency)
# ---------------------------------------------------------------------------
def _toml_value(v: object) -> str:
"""Format a single TOML value."""
if isinstance(v, bool):
return "true" if v else "false"
if isinstance(v, str):
if "\n" in v:
return f'"""\n{v}"""'
return f'"{v}"'
if isinstance(v, list):
inner = ", ".join(f'"{item}"' for item in v)
return f"[{inner}]"
return str(v)
# Ordered list of (key, comment) for pretty output.
_FIELD_ORDER: list[tuple[str, str]] = [
("hostname", "Endpoint host name, used for TLS session establishment"),
("addresses", "Endpoint addresses."),
("custom_sni", "Custom SNI"),
("has_ipv6", "Whether IPv6 traffic can be routed through the endpoint"),
("username", "Username for authorization"),
("password", "Password for authorization"),
("client_random_prefix", "TLS client random hex prefix for connection filtering"),
("skip_verification", "Skip the endpoint certificate verification?\n"
"# That is, any certificate is accepted with this one set to true."),
("certificate", "Endpoint certificate in PEM format.\n"
"# If not specified, the endpoint certificate is verified "
"using the system storage."),
("upstream_protocol", "Protocol to be used to communicate with the endpoint [http2, http3]"),
("anti_dpi", "Is anti-DPI measures should be enabled"),
]
def config_to_toml(cfg: dict) -> str:
"""Render *cfg* as a TOML string matching the canonical endpoint format."""
lines: list[str] = [
"# This file was automatically generated by endpoint "
"and could be used in vpn client.",
"",
]
for key, comment in _FIELD_ORDER:
if key not in cfg:
continue
for cline in comment.split("\n"):
lines.append(f"# {cline}" if not cline.startswith("#") else cline)
lines.append(f"{key} = {_toml_value(cfg[key])}")
lines.append("")
return "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# Deep link → config
# ---------------------------------------------------------------------------
def deeplink_to_config(uri: str) -> dict:
"""Parse a tt:// deep link URI and return a config dict."""
prefix = "tt://"
if not uri.startswith(prefix):
raise ValueError(f"URI must start with {prefix!r}")
encoded = uri[len(prefix):]
# Restore padding for base64 decoding.
padding = (4 - len(encoded) % 4) % 4
payload = base64.urlsafe_b64decode(encoded + "=" * padding)
return decode_config(payload)
def main() -> None:
if len(sys.argv) != 2:
print(f"usage: {sys.argv[0]} <tt://...>", file=sys.stderr)
sys.exit(1)
uri = sys.argv[1]
cfg = deeplink_to_config(uri)
# Validate required fields.
for field in ("hostname", "addresses", "username", "password"):
if field not in cfg:
sys.exit(f"error: missing required field: {field}")
print(config_to_toml(cfg), end="")
if __name__ == "__main__":
main()