mirror of
https://github.com/router-for-me/CLIProxyAPIPlus.git
synced 2026-03-27 22:27:28 +00:00
chore: remove test files containing sensitive data
Amp-Thread-ID: https://ampcode.com/threads/T-019bd618-7e42-715a-960d-dd45425851e3 Co-authored-by: Amp <amp@ampcode.com>
This commit is contained in:
452
test_api.py
452
test_api.py
@@ -1,452 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CLIProxyAPI 全面测试脚本
|
||||
测试模型列表、流式输出、thinking模式及复杂任务
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
import time
|
||||
import sys
|
||||
import io
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
# 修复 Windows 控制台编码问题
|
||||
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
|
||||
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
|
||||
|
||||
# 配置
|
||||
BASE_URL = "http://localhost:8317"
|
||||
API_KEY = "your-api-key-1"
|
||||
HEADERS = {
|
||||
"Authorization": f"Bearer {API_KEY}",
|
||||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
# 复杂任务提示词 - 用于测试 thinking 模式
|
||||
COMPLEX_TASK_PROMPT = """请帮我分析以下复杂的编程问题,并给出详细的解决方案:
|
||||
|
||||
问题:设计一个高并发的分布式任务调度系统,需要满足以下要求:
|
||||
1. 支持百万级任务队列
|
||||
2. 任务可以设置优先级、延迟执行、定时执行
|
||||
3. 支持任务依赖关系(DAG调度)
|
||||
4. 失败重试机制,支持指数退避
|
||||
5. 任务结果持久化和查询
|
||||
6. 水平扩展能力
|
||||
7. 监控和告警
|
||||
|
||||
请从以下几个方面详细分析:
|
||||
1. 整体架构设计
|
||||
2. 核心数据结构
|
||||
3. 调度算法选择
|
||||
4. 容错机制设计
|
||||
5. 性能优化策略
|
||||
6. 技术选型建议
|
||||
|
||||
请逐步思考每个方面,给出你的推理过程。"""
|
||||
|
||||
# 简单测试提示词
|
||||
SIMPLE_PROMPT = "Hello! Please respond with 'OK' if you receive this message."
|
||||
|
||||
def print_separator(title: str):
|
||||
print(f"\n{'='*60}")
|
||||
print(f" {title}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
def print_result(name: str, success: bool, detail: str = ""):
|
||||
status = "✅ PASS" if success else "❌ FAIL"
|
||||
print(f"{status} | {name}")
|
||||
if detail:
|
||||
print(f" └─ {detail[:200]}{'...' if len(detail) > 200 else ''}")
|
||||
|
||||
def get_models() -> List[str]:
|
||||
"""获取可用模型列表"""
|
||||
print_separator("获取模型列表")
|
||||
try:
|
||||
resp = requests.get(f"{BASE_URL}/v1/models", headers=HEADERS, timeout=30)
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
models = [m.get("id", m.get("name", "unknown")) for m in data.get("data", [])]
|
||||
print(f"找到 {len(models)} 个模型:")
|
||||
for m in models:
|
||||
print(f" - {m}")
|
||||
return models
|
||||
else:
|
||||
print(f"❌ 获取模型列表失败: HTTP {resp.status_code}")
|
||||
print(f" 响应: {resp.text[:500]}")
|
||||
return []
|
||||
except Exception as e:
|
||||
print(f"❌ 获取模型列表异常: {e}")
|
||||
return []
|
||||
|
||||
def test_model_basic(model: str) -> tuple:
|
||||
"""基础可用性测试,返回 (success, error_detail)"""
|
||||
try:
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": SIMPLE_PROMPT}],
|
||||
"max_tokens": 50,
|
||||
"stream": False
|
||||
}
|
||||
resp = requests.post(
|
||||
f"{BASE_URL}/v1/chat/completions",
|
||||
headers=HEADERS,
|
||||
json=payload,
|
||||
timeout=60
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
data = resp.json()
|
||||
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||||
return (bool(content), f"content_len={len(content)}")
|
||||
else:
|
||||
return (False, f"HTTP {resp.status_code}: {resp.text[:300]}")
|
||||
except Exception as e:
|
||||
return (False, str(e))
|
||||
|
||||
def test_streaming(model: str) -> Dict[str, Any]:
|
||||
"""测试流式输出"""
|
||||
result = {"success": False, "chunks": 0, "content": "", "error": None}
|
||||
try:
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": "Count from 1 to 5, one number per line."}],
|
||||
"max_tokens": 100,
|
||||
"stream": True
|
||||
}
|
||||
resp = requests.post(
|
||||
f"{BASE_URL}/v1/chat/completions",
|
||||
headers=HEADERS,
|
||||
json=payload,
|
||||
timeout=60,
|
||||
stream=True
|
||||
)
|
||||
|
||||
if resp.status_code != 200:
|
||||
result["error"] = f"HTTP {resp.status_code}: {resp.text[:200]}"
|
||||
return result
|
||||
|
||||
content_parts = []
|
||||
for line in resp.iter_lines():
|
||||
if line:
|
||||
line_str = line.decode('utf-8')
|
||||
if line_str.startswith("data: "):
|
||||
data_str = line_str[6:]
|
||||
if data_str.strip() == "[DONE]":
|
||||
break
|
||||
try:
|
||||
data = json.loads(data_str)
|
||||
result["chunks"] += 1
|
||||
choices = data.get("choices", [])
|
||||
if choices:
|
||||
delta = choices[0].get("delta", {})
|
||||
if "content" in delta and delta["content"]:
|
||||
content_parts.append(delta["content"])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
except Exception as e:
|
||||
result["error"] = f"Parse error: {e}, data: {data_str[:200]}"
|
||||
|
||||
result["content"] = "".join(content_parts)
|
||||
result["success"] = result["chunks"] > 0 and len(result["content"]) > 0
|
||||
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
return result
|
||||
|
||||
def test_thinking_mode(model: str, complex_task: bool = False) -> Dict[str, Any]:
|
||||
"""测试 thinking 模式"""
|
||||
result = {
|
||||
"success": False,
|
||||
"has_reasoning": False,
|
||||
"reasoning_content": "",
|
||||
"content": "",
|
||||
"error": None,
|
||||
"chunks": 0
|
||||
}
|
||||
|
||||
prompt = COMPLEX_TASK_PROMPT if complex_task else "What is 15 * 23? Please think step by step."
|
||||
|
||||
try:
|
||||
# 尝试不同的 thinking 模式参数格式
|
||||
payload = {
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": prompt}],
|
||||
"max_tokens": 8000 if complex_task else 2000,
|
||||
"stream": True
|
||||
}
|
||||
|
||||
# 根据模型类型添加 thinking 参数
|
||||
if "claude" in model.lower():
|
||||
payload["thinking"] = {"type": "enabled", "budget_tokens": 5000 if complex_task else 2000}
|
||||
elif "gemini" in model.lower():
|
||||
payload["thinking"] = {"thinking_budget": 5000 if complex_task else 2000}
|
||||
elif "gpt" in model.lower() or "codex" in model.lower() or "o1" in model.lower() or "o3" in model.lower():
|
||||
payload["reasoning_effort"] = "high" if complex_task else "medium"
|
||||
else:
|
||||
# 通用格式
|
||||
payload["thinking"] = {"type": "enabled", "budget_tokens": 5000 if complex_task else 2000}
|
||||
|
||||
resp = requests.post(
|
||||
f"{BASE_URL}/v1/chat/completions",
|
||||
headers=HEADERS,
|
||||
json=payload,
|
||||
timeout=300 if complex_task else 120,
|
||||
stream=True
|
||||
)
|
||||
|
||||
if resp.status_code != 200:
|
||||
result["error"] = f"HTTP {resp.status_code}: {resp.text[:500]}"
|
||||
return result
|
||||
|
||||
content_parts = []
|
||||
reasoning_parts = []
|
||||
|
||||
for line in resp.iter_lines():
|
||||
if line:
|
||||
line_str = line.decode('utf-8')
|
||||
if line_str.startswith("data: "):
|
||||
data_str = line_str[6:]
|
||||
if data_str.strip() == "[DONE]":
|
||||
break
|
||||
try:
|
||||
data = json.loads(data_str)
|
||||
result["chunks"] += 1
|
||||
|
||||
choices = data.get("choices", [])
|
||||
if not choices:
|
||||
continue
|
||||
choice = choices[0]
|
||||
delta = choice.get("delta", {})
|
||||
|
||||
# 检查 reasoning_content (Claude/OpenAI格式)
|
||||
if "reasoning_content" in delta and delta["reasoning_content"]:
|
||||
reasoning_parts.append(delta["reasoning_content"])
|
||||
result["has_reasoning"] = True
|
||||
|
||||
# 检查 thinking (Gemini格式)
|
||||
if "thinking" in delta and delta["thinking"]:
|
||||
reasoning_parts.append(delta["thinking"])
|
||||
result["has_reasoning"] = True
|
||||
|
||||
# 常规内容
|
||||
if "content" in delta and delta["content"]:
|
||||
content_parts.append(delta["content"])
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
pass
|
||||
except Exception as e:
|
||||
result["error"] = f"Parse error: {e}"
|
||||
|
||||
result["reasoning_content"] = "".join(reasoning_parts)
|
||||
result["content"] = "".join(content_parts)
|
||||
result["success"] = result["chunks"] > 0 and (len(result["content"]) > 0 or len(result["reasoning_content"]) > 0)
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
result["error"] = "Request timeout"
|
||||
except Exception as e:
|
||||
result["error"] = str(e)
|
||||
|
||||
return result
|
||||
|
||||
def run_full_test():
|
||||
"""运行完整测试"""
|
||||
print("\n" + "="*60)
|
||||
print(" CLIProxyAPI 全面测试")
|
||||
print("="*60)
|
||||
print(f"目标地址: {BASE_URL}")
|
||||
print(f"API Key: {API_KEY[:10]}...")
|
||||
|
||||
# 1. 获取模型列表
|
||||
models = get_models()
|
||||
if not models:
|
||||
print("\n❌ 无法获取模型列表,测试终止")
|
||||
return
|
||||
|
||||
# 2. 基础可用性测试
|
||||
print_separator("基础可用性测试")
|
||||
available_models = []
|
||||
for model in models:
|
||||
success, detail = test_model_basic(model)
|
||||
print_result(f"模型: {model}", success, detail)
|
||||
if success:
|
||||
available_models.append(model)
|
||||
|
||||
print(f"\n可用模型: {len(available_models)}/{len(models)}")
|
||||
|
||||
if not available_models:
|
||||
print("\n❌ 没有可用的模型,测试终止")
|
||||
return
|
||||
|
||||
# 3. 流式输出测试
|
||||
print_separator("流式输出测试")
|
||||
streaming_results = {}
|
||||
for model in available_models:
|
||||
result = test_streaming(model)
|
||||
streaming_results[model] = result
|
||||
detail = f"chunks={result['chunks']}, content_len={len(result['content'])}"
|
||||
if result["error"]:
|
||||
detail = f"error: {result['error']}"
|
||||
print_result(f"模型: {model}", result["success"], detail)
|
||||
|
||||
# 4. Thinking 模式测试 (简单任务)
|
||||
print_separator("Thinking 模式测试 (简单任务)")
|
||||
thinking_results = {}
|
||||
for model in available_models:
|
||||
result = test_thinking_mode(model, complex_task=False)
|
||||
thinking_results[model] = result
|
||||
detail = f"reasoning={result['has_reasoning']}, chunks={result['chunks']}"
|
||||
if result["error"]:
|
||||
detail = f"error: {result['error']}"
|
||||
print_result(f"模型: {model}", result["success"], detail)
|
||||
|
||||
# 5. Thinking 模式测试 (复杂任务) - 只测试支持 thinking 的模型
|
||||
print_separator("Thinking 模式测试 (复杂任务)")
|
||||
complex_thinking_results = {}
|
||||
|
||||
# 选择前3个可用模型进行复杂任务测试
|
||||
test_models = available_models[:3]
|
||||
print(f"测试模型 (取前3个): {test_models}\n")
|
||||
|
||||
for model in test_models:
|
||||
print(f"⏳ 正在测试 {model} (复杂任务,可能需要较长时间)...")
|
||||
result = test_thinking_mode(model, complex_task=True)
|
||||
complex_thinking_results[model] = result
|
||||
|
||||
if result["success"]:
|
||||
detail = f"reasoning={result['has_reasoning']}, reasoning_len={len(result['reasoning_content'])}, content_len={len(result['content'])}"
|
||||
else:
|
||||
detail = f"error: {result['error']}" if result["error"] else "Unknown error"
|
||||
|
||||
print_result(f"模型: {model}", result["success"], detail)
|
||||
|
||||
# 如果有 reasoning 内容,打印前500字符
|
||||
if result["has_reasoning"] and result["reasoning_content"]:
|
||||
print(f"\n 📝 Reasoning 内容预览 (前500字符):")
|
||||
print(f" {result['reasoning_content'][:500]}...")
|
||||
|
||||
# 6. 总结报告
|
||||
print_separator("测试总结报告")
|
||||
|
||||
print(f"📊 模型总数: {len(models)}")
|
||||
print(f"✅ 可用模型: {len(available_models)}")
|
||||
print(f"❌ 不可用模型: {len(models) - len(available_models)}")
|
||||
|
||||
print(f"\n📊 流式输出测试:")
|
||||
streaming_pass = sum(1 for r in streaming_results.values() if r["success"])
|
||||
print(f" 通过: {streaming_pass}/{len(streaming_results)}")
|
||||
|
||||
print(f"\n📊 Thinking 模式测试 (简单):")
|
||||
thinking_pass = sum(1 for r in thinking_results.values() if r["success"])
|
||||
thinking_with_reasoning = sum(1 for r in thinking_results.values() if r["has_reasoning"])
|
||||
print(f" 通过: {thinking_pass}/{len(thinking_results)}")
|
||||
print(f" 包含推理内容: {thinking_with_reasoning}/{len(thinking_results)}")
|
||||
|
||||
print(f"\n📊 Thinking 模式测试 (复杂):")
|
||||
complex_pass = sum(1 for r in complex_thinking_results.values() if r["success"])
|
||||
complex_with_reasoning = sum(1 for r in complex_thinking_results.values() if r["has_reasoning"])
|
||||
print(f" 通过: {complex_pass}/{len(complex_thinking_results)}")
|
||||
print(f" 包含推理内容: {complex_with_reasoning}/{len(complex_thinking_results)}")
|
||||
|
||||
# 列出所有错误
|
||||
print(f"\n📋 错误详情:")
|
||||
has_errors = False
|
||||
|
||||
for model, result in streaming_results.items():
|
||||
if result["error"]:
|
||||
has_errors = True
|
||||
print(f" [流式] {model}: {result['error'][:100]}")
|
||||
|
||||
for model, result in thinking_results.items():
|
||||
if result["error"]:
|
||||
has_errors = True
|
||||
print(f" [Thinking简单] {model}: {result['error'][:100]}")
|
||||
|
||||
for model, result in complex_thinking_results.items():
|
||||
if result["error"]:
|
||||
has_errors = True
|
||||
print(f" [Thinking复杂] {model}: {result['error'][:100]}")
|
||||
|
||||
if not has_errors:
|
||||
print(" 无错误")
|
||||
|
||||
print("\n" + "="*60)
|
||||
print(" 测试完成")
|
||||
print("="*60 + "\n")
|
||||
|
||||
def test_single_model_basic(model: str):
|
||||
"""单独测试一个模型的基础功能"""
|
||||
print_separator(f"基础测试: {model}")
|
||||
success, detail = test_model_basic(model)
|
||||
print_result(f"模型: {model}", success, detail)
|
||||
return success
|
||||
|
||||
def test_single_model_streaming(model: str):
|
||||
"""单独测试一个模型的流式输出"""
|
||||
print_separator(f"流式测试: {model}")
|
||||
result = test_streaming(model)
|
||||
detail = f"chunks={result['chunks']}, content_len={len(result['content'])}"
|
||||
if result["error"]:
|
||||
detail = f"error: {result['error']}"
|
||||
print_result(f"模型: {model}", result["success"], detail)
|
||||
if result["content"]:
|
||||
print(f"\n内容: {result['content'][:300]}")
|
||||
return result
|
||||
|
||||
def test_single_model_thinking(model: str, complex_task: bool = False):
|
||||
"""单独测试一个模型的thinking模式"""
|
||||
task_type = "复杂" if complex_task else "简单"
|
||||
print_separator(f"Thinking测试({task_type}): {model}")
|
||||
result = test_thinking_mode(model, complex_task=complex_task)
|
||||
detail = f"reasoning={result['has_reasoning']}, chunks={result['chunks']}"
|
||||
if result["error"]:
|
||||
detail = f"error: {result['error']}"
|
||||
print_result(f"模型: {model}", result["success"], detail)
|
||||
if result["reasoning_content"]:
|
||||
print(f"\nReasoning预览: {result['reasoning_content'][:500]}")
|
||||
if result["content"]:
|
||||
print(f"\n内容预览: {result['content'][:500]}")
|
||||
return result
|
||||
|
||||
def print_usage():
|
||||
print("""
|
||||
用法: python test_api.py <command> [options]
|
||||
|
||||
命令:
|
||||
models - 获取模型列表
|
||||
basic <model> - 测试单个模型基础功能
|
||||
stream <model> - 测试单个模型流式输出
|
||||
thinking <model> - 测试单个模型thinking模式(简单任务)
|
||||
thinking-complex <model> - 测试单个模型thinking模式(复杂任务)
|
||||
all - 运行完整测试(原有功能)
|
||||
|
||||
示例:
|
||||
python test_api.py models
|
||||
python test_api.py basic claude-sonnet
|
||||
python test_api.py stream claude-sonnet
|
||||
python test_api.py thinking claude-sonnet
|
||||
""")
|
||||
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
if len(sys.argv) < 2:
|
||||
print_usage()
|
||||
sys.exit(0)
|
||||
|
||||
cmd = sys.argv[1].lower()
|
||||
|
||||
if cmd == "models":
|
||||
get_models()
|
||||
elif cmd == "basic" and len(sys.argv) >= 3:
|
||||
test_single_model_basic(sys.argv[2])
|
||||
elif cmd == "stream" and len(sys.argv) >= 3:
|
||||
test_single_model_streaming(sys.argv[2])
|
||||
elif cmd == "thinking" and len(sys.argv) >= 3:
|
||||
test_single_model_thinking(sys.argv[2], complex_task=False)
|
||||
elif cmd == "thinking-complex" and len(sys.argv) >= 3:
|
||||
test_single_model_thinking(sys.argv[2], complex_task=True)
|
||||
elif cmd == "all":
|
||||
run_full_test()
|
||||
else:
|
||||
print_usage()
|
||||
@@ -1,273 +0,0 @@
|
||||
// 测试脚本 3:对比 CLIProxyAPIPlus 与官方格式的差异
|
||||
// 这个脚本分析 CLIProxyAPIPlus 保存的 token 与官方格式的差异
|
||||
// 运行方式: go run test_auth_diff.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
fmt.Println(" 测试脚本 3: Token 格式差异分析")
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
|
||||
homeDir := os.Getenv("USERPROFILE")
|
||||
|
||||
// 加载官方 IDE Token (Kiro IDE 生成)
|
||||
fmt.Println("\n[1] 官方 Kiro IDE Token 格式")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
ideTokenPath := filepath.Join(homeDir, ".aws", "sso", "cache", "kiro-auth-token.json")
|
||||
ideToken := loadAndAnalyze(ideTokenPath, "Kiro IDE")
|
||||
|
||||
// 加载 CLIProxyAPIPlus 保存的 Token
|
||||
fmt.Println("\n[2] CLIProxyAPIPlus 保存的 Token 格式")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
cliProxyDir := filepath.Join(homeDir, ".cli-proxy-api")
|
||||
files, _ := os.ReadDir(cliProxyDir)
|
||||
|
||||
var cliProxyTokens []map[string]interface{}
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), "kiro") && strings.HasSuffix(f.Name(), ".json") {
|
||||
p := filepath.Join(cliProxyDir, f.Name())
|
||||
token := loadAndAnalyze(p, f.Name())
|
||||
if token != nil {
|
||||
cliProxyTokens = append(cliProxyTokens, token)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 对比分析
|
||||
fmt.Println("\n[3] 关键差异分析")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if ideToken == nil {
|
||||
fmt.Println("❌ 无法加载 IDE Token,跳过对比")
|
||||
} else if len(cliProxyTokens) == 0 {
|
||||
fmt.Println("❌ 无法加载 CLIProxyAPIPlus Token,跳过对比")
|
||||
} else {
|
||||
// 对比最新的 CLIProxyAPIPlus token
|
||||
cliToken := cliProxyTokens[0]
|
||||
|
||||
fmt.Println("\n字段对比:")
|
||||
fmt.Printf("%-20s | %-15s | %-15s\n", "字段", "IDE Token", "CLIProxy Token")
|
||||
fmt.Println(strings.Repeat("-", 55))
|
||||
|
||||
fields := []string{
|
||||
"accessToken", "refreshToken", "clientId", "clientSecret",
|
||||
"authMethod", "auth_method", "provider", "region", "expiresAt", "expires_at",
|
||||
}
|
||||
|
||||
for _, field := range fields {
|
||||
ideVal := getFieldStatus(ideToken, field)
|
||||
cliVal := getFieldStatus(cliToken, field)
|
||||
|
||||
status := " "
|
||||
if ideVal != cliVal {
|
||||
if ideVal == "✅ 有" && cliVal == "❌ 无" {
|
||||
status = "⚠️"
|
||||
} else if ideVal == "❌ 无" && cliVal == "✅ 有" {
|
||||
status = "📝"
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Printf("%-20s | %-15s | %-15s %s\n", field, ideVal, cliVal, status)
|
||||
}
|
||||
|
||||
// 关键问题检测
|
||||
fmt.Println("\n🔍 问题检测:")
|
||||
|
||||
// 检查 clientId/clientSecret
|
||||
if hasField(ideToken, "clientId") && !hasField(cliToken, "clientId") {
|
||||
fmt.Println(" ⚠️ 问题: CLIProxyAPIPlus 缺少 clientId 字段!")
|
||||
fmt.Println(" 原因: IdC 认证刷新 token 时需要 clientId")
|
||||
}
|
||||
|
||||
if hasField(ideToken, "clientSecret") && !hasField(cliToken, "clientSecret") {
|
||||
fmt.Println(" ⚠️ 问题: CLIProxyAPIPlus 缺少 clientSecret 字段!")
|
||||
fmt.Println(" 原因: IdC 认证刷新 token 时需要 clientSecret")
|
||||
}
|
||||
|
||||
// 检查字段名差异
|
||||
if hasField(cliToken, "auth_method") && !hasField(cliToken, "authMethod") {
|
||||
fmt.Println(" 📝 注意: CLIProxy 使用 auth_method (snake_case)")
|
||||
fmt.Println(" 而官方使用 authMethod (camelCase)")
|
||||
}
|
||||
|
||||
if hasField(cliToken, "expires_at") && !hasField(cliToken, "expiresAt") {
|
||||
fmt.Println(" 📝 注意: CLIProxy 使用 expires_at (snake_case)")
|
||||
fmt.Println(" 而官方使用 expiresAt (camelCase)")
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: 测试使用完整格式的 token
|
||||
fmt.Println("\n[4] 测试完整格式 Token (带 clientId/clientSecret)")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if ideToken != nil {
|
||||
testWithFullToken(ideToken)
|
||||
}
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("=", 60))
|
||||
fmt.Println(" 分析完成")
|
||||
fmt.Println(strings.Repeat("=", 60))
|
||||
|
||||
// 给出建议
|
||||
fmt.Println("\n💡 修复建议:")
|
||||
fmt.Println(" 1. CLIProxyAPIPlus 导入 token 时需要保留 clientId 和 clientSecret")
|
||||
fmt.Println(" 2. IdC 认证刷新 token 必须使用这两个字段")
|
||||
fmt.Println(" 3. 检查 CLIProxyAPIPlus 的 token 导入逻辑:")
|
||||
fmt.Println(" - internal/auth/kiro/aws.go LoadKiroIDEToken()")
|
||||
fmt.Println(" - sdk/auth/kiro.go ImportFromKiroIDE()")
|
||||
}
|
||||
|
||||
func loadAndAnalyze(path, name string) map[string]interface{} {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法加载 %s: %v\n", name, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var token map[string]interface{}
|
||||
if err := json.Unmarshal(data, &token); err != nil {
|
||||
fmt.Printf("❌ 无法解析 %s: %v\n", name, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
fmt.Printf("📄 %s\n", path)
|
||||
fmt.Printf(" 字段数: %d\n", len(token))
|
||||
|
||||
// 列出所有字段
|
||||
fmt.Printf(" 字段列表: ")
|
||||
keys := make([]string, 0, len(token))
|
||||
for k := range token {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
fmt.Printf("%v\n", keys)
|
||||
|
||||
return token
|
||||
}
|
||||
|
||||
func getFieldStatus(token map[string]interface{}, field string) string {
|
||||
if token == nil {
|
||||
return "N/A"
|
||||
}
|
||||
if v, ok := token[field]; ok && v != nil && v != "" {
|
||||
return "✅ 有"
|
||||
}
|
||||
return "❌ 无"
|
||||
}
|
||||
|
||||
func hasField(token map[string]interface{}, field string) bool {
|
||||
if token == nil {
|
||||
return false
|
||||
}
|
||||
v, ok := token[field]
|
||||
return ok && v != nil && v != ""
|
||||
}
|
||||
|
||||
func testWithFullToken(token map[string]interface{}) {
|
||||
accessToken, _ := token["accessToken"].(string)
|
||||
refreshToken, _ := token["refreshToken"].(string)
|
||||
clientId, _ := token["clientId"].(string)
|
||||
clientSecret, _ := token["clientSecret"].(string)
|
||||
region, _ := token["region"].(string)
|
||||
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
|
||||
// 测试当前 accessToken
|
||||
fmt.Println("\n测试当前 accessToken...")
|
||||
if testAPICall(accessToken, region) {
|
||||
fmt.Println("✅ 当前 accessToken 有效")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("⚠️ 当前 accessToken 无效,尝试刷新...")
|
||||
|
||||
// 检查是否有完整的刷新所需字段
|
||||
if clientId == "" || clientSecret == "" {
|
||||
fmt.Println("❌ 缺少 clientId 或 clientSecret,无法刷新")
|
||||
fmt.Println(" 这就是问题所在!")
|
||||
return
|
||||
}
|
||||
|
||||
// 尝试刷新
|
||||
fmt.Println("\n使用完整字段刷新 token...")
|
||||
url := fmt.Sprintf("https://oidc.%s.amazonaws.com/token", region)
|
||||
|
||||
requestBody := map[string]interface{}{
|
||||
"refreshToken": refreshToken,
|
||||
"clientId": clientId,
|
||||
"clientSecret": clientSecret,
|
||||
"grantType": "refresh_token",
|
||||
}
|
||||
|
||||
body, _ := json.Marshal(requestBody)
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 请求失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
var refreshResp map[string]interface{}
|
||||
json.Unmarshal(respBody, &refreshResp)
|
||||
|
||||
newAccessToken, _ := refreshResp["accessToken"].(string)
|
||||
fmt.Println("✅ Token 刷新成功!")
|
||||
|
||||
// 验证新 token
|
||||
if testAPICall(newAccessToken, region) {
|
||||
fmt.Println("✅ 新 Token 验证成功!")
|
||||
fmt.Println("\n✅ 结论: 使用完整格式 (含 clientId/clientSecret) 可以正常工作")
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("❌ 刷新失败: HTTP %d\n", resp.StatusCode)
|
||||
fmt.Printf(" 响应: %s\n", string(respBody))
|
||||
}
|
||||
}
|
||||
|
||||
func testAPICall(accessToken, region string) bool {
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
|
||||
req.Header.Set("x-amz-target", "AmazonCodeWhispererService.GetUsageLimits")
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return resp.StatusCode == 200
|
||||
}
|
||||
@@ -1,323 +0,0 @@
|
||||
// 测试脚本 1:模拟 kiro2api_go1 的 IdC 认证方式
|
||||
// 这个脚本完整模拟 kiro-gateway/temp/kiro2api_go1 的认证逻辑
|
||||
// 运行方式: go run test_auth_idc_go1.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 配置常量 - 来自 kiro2api_go1/config/config.go
|
||||
const (
|
||||
IdcRefreshTokenURL = "https://oidc.us-east-1.amazonaws.com/token"
|
||||
CodeWhispererAPIURL = "https://codewhisperer.us-east-1.amazonaws.com"
|
||||
)
|
||||
|
||||
// AuthConfig - 来自 kiro2api_go1/auth/config.go
|
||||
type AuthConfig struct {
|
||||
AuthType string `json:"auth"`
|
||||
RefreshToken string `json:"refreshToken"`
|
||||
ClientID string `json:"clientId,omitempty"`
|
||||
ClientSecret string `json:"clientSecret,omitempty"`
|
||||
}
|
||||
|
||||
// IdcRefreshRequest - 来自 kiro2api_go1/types/token.go
|
||||
type IdcRefreshRequest struct {
|
||||
ClientId string `json:"clientId"`
|
||||
ClientSecret string `json:"clientSecret"`
|
||||
GrantType string `json:"grantType"`
|
||||
RefreshToken string `json:"refreshToken"`
|
||||
}
|
||||
|
||||
// RefreshResponse - 来自 kiro2api_go1/types/token.go
|
||||
type RefreshResponse struct {
|
||||
AccessToken string `json:"accessToken"`
|
||||
RefreshToken string `json:"refreshToken,omitempty"`
|
||||
ExpiresIn int `json:"expiresIn"`
|
||||
TokenType string `json:"tokenType,omitempty"`
|
||||
}
|
||||
|
||||
// Fingerprint - 简化的指纹结构
|
||||
type Fingerprint struct {
|
||||
OSType string
|
||||
ConnectionBehavior string
|
||||
AcceptLanguage string
|
||||
SecFetchMode string
|
||||
AcceptEncoding string
|
||||
}
|
||||
|
||||
func generateFingerprint() *Fingerprint {
|
||||
osTypes := []string{"darwin", "windows", "linux"}
|
||||
connections := []string{"keep-alive", "close"}
|
||||
languages := []string{"en-US,en;q=0.9", "zh-CN,zh;q=0.9", "en-GB,en;q=0.9"}
|
||||
fetchModes := []string{"cors", "navigate", "no-cors"}
|
||||
|
||||
return &Fingerprint{
|
||||
OSType: osTypes[rand.Intn(len(osTypes))],
|
||||
ConnectionBehavior: connections[rand.Intn(len(connections))],
|
||||
AcceptLanguage: languages[rand.Intn(len(languages))],
|
||||
SecFetchMode: fetchModes[rand.Intn(len(fetchModes))],
|
||||
AcceptEncoding: "gzip, deflate, br",
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
fmt.Println(" 测试脚本 1: kiro2api_go1 风格 IdC 认证")
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
|
||||
// Step 1: 加载官方格式的 token 文件
|
||||
fmt.Println("\n[Step 1] 加载官方格式 Token 文件")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
// 尝试从多个位置加载
|
||||
tokenPaths := []string{
|
||||
// 优先使用包含完整 clientId/clientSecret 的文件
|
||||
"E:/ai_project_2api/kiro-gateway/configs/kiro/kiro-auth-token-1768317098.json",
|
||||
filepath.Join(os.Getenv("USERPROFILE"), ".aws", "sso", "cache", "kiro-auth-token.json"),
|
||||
}
|
||||
|
||||
var tokenData map[string]interface{}
|
||||
var loadedPath string
|
||||
|
||||
for _, p := range tokenPaths {
|
||||
data, err := os.ReadFile(p)
|
||||
if err == nil {
|
||||
if err := json.Unmarshal(data, &tokenData); err == nil {
|
||||
loadedPath = p
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tokenData == nil {
|
||||
fmt.Println("❌ 无法加载任何 token 文件")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("✅ 加载文件: %s\n", loadedPath)
|
||||
|
||||
// 提取关键字段
|
||||
accessToken, _ := tokenData["accessToken"].(string)
|
||||
refreshToken, _ := tokenData["refreshToken"].(string)
|
||||
clientId, _ := tokenData["clientId"].(string)
|
||||
clientSecret, _ := tokenData["clientSecret"].(string)
|
||||
authMethod, _ := tokenData["authMethod"].(string)
|
||||
region, _ := tokenData["region"].(string)
|
||||
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
|
||||
fmt.Printf("\n当前 Token 信息:\n")
|
||||
fmt.Printf(" AuthMethod: %s\n", authMethod)
|
||||
fmt.Printf(" Region: %s\n", region)
|
||||
fmt.Printf(" AccessToken: %s...\n", truncate(accessToken, 50))
|
||||
fmt.Printf(" RefreshToken: %s...\n", truncate(refreshToken, 50))
|
||||
fmt.Printf(" ClientID: %s\n", truncate(clientId, 30))
|
||||
fmt.Printf(" ClientSecret: %s...\n", truncate(clientSecret, 50))
|
||||
|
||||
// Step 2: 验证 IdC 认证所需字段
|
||||
fmt.Println("\n[Step 2] 验证 IdC 认证必需字段")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
missingFields := []string{}
|
||||
if refreshToken == "" {
|
||||
missingFields = append(missingFields, "refreshToken")
|
||||
}
|
||||
if clientId == "" {
|
||||
missingFields = append(missingFields, "clientId")
|
||||
}
|
||||
if clientSecret == "" {
|
||||
missingFields = append(missingFields, "clientSecret")
|
||||
}
|
||||
|
||||
if len(missingFields) > 0 {
|
||||
fmt.Printf("❌ 缺少必需字段: %v\n", missingFields)
|
||||
fmt.Println(" IdC 认证需要: refreshToken, clientId, clientSecret")
|
||||
return
|
||||
}
|
||||
fmt.Println("✅ 所有必需字段都存在")
|
||||
|
||||
// Step 3: 测试直接使用 accessToken 调用 API
|
||||
fmt.Println("\n[Step 3] 测试当前 AccessToken 有效性")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if testAPICall(accessToken, region) {
|
||||
fmt.Println("✅ 当前 AccessToken 有效,无需刷新")
|
||||
} else {
|
||||
fmt.Println("⚠️ 当前 AccessToken 无效,需要刷新")
|
||||
|
||||
// Step 4: 使用 kiro2api_go1 风格刷新 token
|
||||
fmt.Println("\n[Step 4] 使用 kiro2api_go1 风格刷新 Token")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
newToken, err := refreshIdCToken(AuthConfig{
|
||||
AuthType: "IdC",
|
||||
RefreshToken: refreshToken,
|
||||
ClientID: clientId,
|
||||
ClientSecret: clientSecret,
|
||||
}, region)
|
||||
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 刷新失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("✅ Token 刷新成功!")
|
||||
fmt.Printf(" 新 AccessToken: %s...\n", truncate(newToken.AccessToken, 50))
|
||||
fmt.Printf(" ExpiresIn: %d 秒\n", newToken.ExpiresIn)
|
||||
|
||||
// Step 5: 验证新 token
|
||||
fmt.Println("\n[Step 5] 验证新 Token")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if testAPICall(newToken.AccessToken, region) {
|
||||
fmt.Println("✅ 新 Token 验证成功!")
|
||||
|
||||
// 保存新 token
|
||||
saveNewToken(loadedPath, newToken, tokenData)
|
||||
} else {
|
||||
fmt.Println("❌ 新 Token 验证失败")
|
||||
}
|
||||
}
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("=", 60))
|
||||
fmt.Println(" 测试完成")
|
||||
fmt.Println(strings.Repeat("=", 60))
|
||||
}
|
||||
|
||||
// refreshIdCToken - 完全模拟 kiro2api_go1/auth/refresh.go 的 refreshIdCToken 函数
|
||||
func refreshIdCToken(authConfig AuthConfig, region string) (*RefreshResponse, error) {
|
||||
refreshReq := IdcRefreshRequest{
|
||||
ClientId: authConfig.ClientID,
|
||||
ClientSecret: authConfig.ClientSecret,
|
||||
GrantType: "refresh_token",
|
||||
RefreshToken: authConfig.RefreshToken,
|
||||
}
|
||||
|
||||
reqBody, err := json.Marshal(refreshReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("序列化IdC请求失败: %v", err)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("https://oidc.%s.amazonaws.com/token", region)
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建IdC请求失败: %v", err)
|
||||
}
|
||||
|
||||
// 设置 IdC 特殊 headers(使用指纹随机化)- 完全模拟 kiro2api_go1
|
||||
fp := generateFingerprint()
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Host", fmt.Sprintf("oidc.%s.amazonaws.com", region))
|
||||
req.Header.Set("Connection", fp.ConnectionBehavior)
|
||||
req.Header.Set("x-amz-user-agent", fmt.Sprintf("aws-sdk-js/3.738.0 ua/2.1 os/%s lang/js md/browser#unknown_unknown api/sso-oidc#3.738.0 m/E KiroIDE", fp.OSType))
|
||||
req.Header.Set("Accept", "*/*")
|
||||
req.Header.Set("Accept-Language", fp.AcceptLanguage)
|
||||
req.Header.Set("sec-fetch-mode", fp.SecFetchMode)
|
||||
req.Header.Set("User-Agent", "node")
|
||||
req.Header.Set("Accept-Encoding", fp.AcceptEncoding)
|
||||
|
||||
fmt.Println("发送刷新请求:")
|
||||
fmt.Printf(" URL: %s\n", url)
|
||||
fmt.Println(" Headers:")
|
||||
for k, v := range req.Header {
|
||||
if k == "Content-Type" || k == "Host" || k == "X-Amz-User-Agent" || k == "User-Agent" {
|
||||
fmt.Printf(" %s: %s\n", k, v[0])
|
||||
}
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("IdC请求失败: %v", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("IdC刷新失败: 状态码 %d, 响应: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var refreshResp RefreshResponse
|
||||
if err := json.Unmarshal(body, &refreshResp); err != nil {
|
||||
return nil, fmt.Errorf("解析IdC响应失败: %v", err)
|
||||
}
|
||||
|
||||
return &refreshResp, nil
|
||||
}
|
||||
|
||||
func testAPICall(accessToken, region string) bool {
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
|
||||
req.Header.Set("x-amz-target", "AmazonCodeWhispererService.GetUsageLimits")
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf(" 请求错误: %v\n", err)
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
fmt.Printf(" API 响应: HTTP %d\n", resp.StatusCode)
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
return true
|
||||
}
|
||||
|
||||
fmt.Printf(" 错误详情: %s\n", truncate(string(respBody), 200))
|
||||
return false
|
||||
}
|
||||
|
||||
func saveNewToken(originalPath string, newToken *RefreshResponse, originalData map[string]interface{}) {
|
||||
// 更新 token 数据
|
||||
originalData["accessToken"] = newToken.AccessToken
|
||||
if newToken.RefreshToken != "" {
|
||||
originalData["refreshToken"] = newToken.RefreshToken
|
||||
}
|
||||
originalData["expiresAt"] = time.Now().Add(time.Duration(newToken.ExpiresIn) * time.Second).Format(time.RFC3339)
|
||||
|
||||
data, _ := json.MarshalIndent(originalData, "", " ")
|
||||
|
||||
// 保存到新文件
|
||||
newPath := strings.TrimSuffix(originalPath, ".json") + "_refreshed.json"
|
||||
if err := os.WriteFile(newPath, data, 0644); err != nil {
|
||||
fmt.Printf("⚠️ 保存失败: %v\n", err)
|
||||
} else {
|
||||
fmt.Printf("✅ 新 Token 已保存到: %s\n", newPath)
|
||||
}
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n]
|
||||
}
|
||||
@@ -1,237 +0,0 @@
|
||||
// 测试脚本 2:模拟 kiro2Api_js 的认证方式
|
||||
// 这个脚本完整模拟 kiro-gateway/temp/kiro2Api_js 的认证逻辑
|
||||
// 运行方式: go run test_auth_js_style.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// 常量 - 来自 kiro2Api_js/src/kiro/auth.js
|
||||
const (
|
||||
REFRESH_URL_TEMPLATE = "https://prod.{{region}}.auth.desktop.kiro.dev/refreshToken"
|
||||
REFRESH_IDC_URL_TEMPLATE = "https://oidc.{{region}}.amazonaws.com/token"
|
||||
AUTH_METHOD_SOCIAL = "social"
|
||||
AUTH_METHOD_IDC = "IdC"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
fmt.Println(" 测试脚本 2: kiro2Api_js 风格认证")
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
|
||||
// Step 1: 加载 token 文件
|
||||
fmt.Println("\n[Step 1] 加载 Token 文件")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
tokenPaths := []string{
|
||||
filepath.Join(os.Getenv("USERPROFILE"), ".aws", "sso", "cache", "kiro-auth-token.json"),
|
||||
"E:/ai_project_2api/kiro-gateway/configs/kiro/kiro-auth-token-1768317098.json",
|
||||
}
|
||||
|
||||
var tokenData map[string]interface{}
|
||||
var loadedPath string
|
||||
|
||||
for _, p := range tokenPaths {
|
||||
data, err := os.ReadFile(p)
|
||||
if err == nil {
|
||||
if err := json.Unmarshal(data, &tokenData); err == nil {
|
||||
loadedPath = p
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if tokenData == nil {
|
||||
fmt.Println("❌ 无法加载任何 token 文件")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("✅ 加载文件: %s\n", loadedPath)
|
||||
|
||||
// 提取字段 - 模拟 kiro2Api_js/src/kiro/auth.js initializeAuth
|
||||
accessToken, _ := tokenData["accessToken"].(string)
|
||||
refreshToken, _ := tokenData["refreshToken"].(string)
|
||||
clientId, _ := tokenData["clientId"].(string)
|
||||
clientSecret, _ := tokenData["clientSecret"].(string)
|
||||
authMethod, _ := tokenData["authMethod"].(string)
|
||||
region, _ := tokenData["region"].(string)
|
||||
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
fmt.Println("⚠️ Region 未设置,使用默认值 us-east-1")
|
||||
}
|
||||
|
||||
fmt.Printf("\nToken 信息:\n")
|
||||
fmt.Printf(" AuthMethod: %s\n", authMethod)
|
||||
fmt.Printf(" Region: %s\n", region)
|
||||
fmt.Printf(" 有 ClientID: %v\n", clientId != "")
|
||||
fmt.Printf(" 有 ClientSecret: %v\n", clientSecret != "")
|
||||
|
||||
// Step 2: 测试当前 token
|
||||
fmt.Println("\n[Step 2] 测试当前 AccessToken")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if testAPI(accessToken, region) {
|
||||
fmt.Println("✅ 当前 AccessToken 有效")
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("⚠️ 当前 AccessToken 无效,开始刷新...")
|
||||
|
||||
// Step 3: 根据 authMethod 选择刷新方式 - 模拟 doRefreshToken
|
||||
fmt.Println("\n[Step 3] 刷新 Token (JS 风格)")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
var refreshURL string
|
||||
var requestBody map[string]interface{}
|
||||
|
||||
// 判断认证方式 - 模拟 kiro2Api_js auth.js doRefreshToken
|
||||
if authMethod == AUTH_METHOD_SOCIAL {
|
||||
// Social 认证
|
||||
refreshURL = strings.Replace(REFRESH_URL_TEMPLATE, "{{region}}", region, 1)
|
||||
requestBody = map[string]interface{}{
|
||||
"refreshToken": refreshToken,
|
||||
}
|
||||
fmt.Println("使用 Social 认证方式")
|
||||
} else {
|
||||
// IdC 认证 (默认)
|
||||
refreshURL = strings.Replace(REFRESH_IDC_URL_TEMPLATE, "{{region}}", region, 1)
|
||||
requestBody = map[string]interface{}{
|
||||
"refreshToken": refreshToken,
|
||||
"clientId": clientId,
|
||||
"clientSecret": clientSecret,
|
||||
"grantType": "refresh_token",
|
||||
}
|
||||
fmt.Println("使用 IdC 认证方式")
|
||||
}
|
||||
|
||||
fmt.Printf("刷新 URL: %s\n", refreshURL)
|
||||
fmt.Printf("请求字段: %v\n", getKeys(requestBody))
|
||||
|
||||
// 发送刷新请求
|
||||
body, _ := json.Marshal(requestBody)
|
||||
req, _ := http.NewRequest("POST", refreshURL, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 请求失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
|
||||
fmt.Printf("\n响应状态: HTTP %d\n", resp.StatusCode)
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
fmt.Printf("❌ 刷新失败: %s\n", string(respBody))
|
||||
|
||||
// 分析错误
|
||||
var errResp map[string]interface{}
|
||||
if err := json.Unmarshal(respBody, &errResp); err == nil {
|
||||
if errType, ok := errResp["error"].(string); ok {
|
||||
fmt.Printf("错误类型: %s\n", errType)
|
||||
if errType == "invalid_grant" {
|
||||
fmt.Println("\n💡 提示: refresh_token 可能已过期,需要重新授权")
|
||||
}
|
||||
}
|
||||
if errDesc, ok := errResp["error_description"].(string); ok {
|
||||
fmt.Printf("错误描述: %s\n", errDesc)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// 解析响应
|
||||
var refreshResp map[string]interface{}
|
||||
json.Unmarshal(respBody, &refreshResp)
|
||||
|
||||
newAccessToken, _ := refreshResp["accessToken"].(string)
|
||||
newRefreshToken, _ := refreshResp["refreshToken"].(string)
|
||||
expiresIn, _ := refreshResp["expiresIn"].(float64)
|
||||
|
||||
fmt.Println("✅ Token 刷新成功!")
|
||||
fmt.Printf(" 新 AccessToken: %s...\n", truncate(newAccessToken, 50))
|
||||
fmt.Printf(" ExpiresIn: %.0f 秒\n", expiresIn)
|
||||
if newRefreshToken != "" {
|
||||
fmt.Printf(" 新 RefreshToken: %s...\n", truncate(newRefreshToken, 50))
|
||||
}
|
||||
|
||||
// Step 4: 验证新 token
|
||||
fmt.Println("\n[Step 4] 验证新 Token")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if testAPI(newAccessToken, region) {
|
||||
fmt.Println("✅ 新 Token 验证成功!")
|
||||
|
||||
// 保存新 token - 模拟 saveCredentialsToFile
|
||||
tokenData["accessToken"] = newAccessToken
|
||||
if newRefreshToken != "" {
|
||||
tokenData["refreshToken"] = newRefreshToken
|
||||
}
|
||||
tokenData["expiresAt"] = time.Now().Add(time.Duration(expiresIn) * time.Second).Format(time.RFC3339)
|
||||
|
||||
saveData, _ := json.MarshalIndent(tokenData, "", " ")
|
||||
newPath := strings.TrimSuffix(loadedPath, ".json") + "_js_refreshed.json"
|
||||
os.WriteFile(newPath, saveData, 0644)
|
||||
fmt.Printf("✅ 已保存到: %s\n", newPath)
|
||||
} else {
|
||||
fmt.Println("❌ 新 Token 验证失败")
|
||||
}
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("=", 60))
|
||||
fmt.Println(" 测试完成")
|
||||
fmt.Println(strings.Repeat("=", 60))
|
||||
}
|
||||
|
||||
func testAPI(accessToken, region string) bool {
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
|
||||
req.Header.Set("x-amz-target", "AmazonCodeWhispererService.GetUsageLimits")
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return resp.StatusCode == 200
|
||||
}
|
||||
|
||||
func getKeys(m map[string]interface{}) []string {
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
return keys
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n]
|
||||
}
|
||||
@@ -1,348 +0,0 @@
|
||||
// 独立测试脚本:排查 Kiro Token 403 错误
|
||||
// 运行方式: go run test_kiro_debug.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Token 结构 - 匹配 Kiro IDE 格式
|
||||
type KiroIDEToken struct {
|
||||
AccessToken string `json:"accessToken"`
|
||||
RefreshToken string `json:"refreshToken"`
|
||||
ExpiresAt string `json:"expiresAt"`
|
||||
ClientIDHash string `json:"clientIdHash,omitempty"`
|
||||
AuthMethod string `json:"authMethod"`
|
||||
Provider string `json:"provider"`
|
||||
Region string `json:"region,omitempty"`
|
||||
}
|
||||
|
||||
// Token 结构 - 匹配 CLIProxyAPIPlus 格式
|
||||
type CLIProxyToken struct {
|
||||
AccessToken string `json:"access_token"`
|
||||
RefreshToken string `json:"refresh_token"`
|
||||
ProfileArn string `json:"profile_arn"`
|
||||
ExpiresAt string `json:"expires_at"`
|
||||
AuthMethod string `json:"auth_method"`
|
||||
Provider string `json:"provider"`
|
||||
ClientID string `json:"client_id,omitempty"`
|
||||
ClientSecret string `json:"client_secret,omitempty"`
|
||||
Email string `json:"email,omitempty"`
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
func main() {
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
fmt.Println(" Kiro Token 403 错误排查工具")
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
|
||||
homeDir, _ := os.UserHomeDir()
|
||||
|
||||
// Step 1: 检查 Kiro IDE Token 文件
|
||||
fmt.Println("\n[Step 1] 检查 Kiro IDE Token 文件")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
ideTokenPath := filepath.Join(homeDir, ".aws", "sso", "cache", "kiro-auth-token.json")
|
||||
ideToken, err := loadKiroIDEToken(ideTokenPath)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法加载 Kiro IDE Token: %v\n", err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("✅ Token 文件: %s\n", ideTokenPath)
|
||||
fmt.Printf(" AuthMethod: %s\n", ideToken.AuthMethod)
|
||||
fmt.Printf(" Provider: %s\n", ideToken.Provider)
|
||||
fmt.Printf(" Region: %s\n", ideToken.Region)
|
||||
fmt.Printf(" ExpiresAt: %s\n", ideToken.ExpiresAt)
|
||||
fmt.Printf(" AccessToken (前50字符): %s...\n", truncate(ideToken.AccessToken, 50))
|
||||
|
||||
// Step 2: 检查 Token 过期状态
|
||||
fmt.Println("\n[Step 2] 检查 Token 过期状态")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
expiresAt, err := parseExpiresAt(ideToken.ExpiresAt)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法解析过期时间: %v\n", err)
|
||||
} else {
|
||||
now := time.Now()
|
||||
if now.After(expiresAt) {
|
||||
fmt.Printf("❌ Token 已过期!过期时间: %s,当前时间: %s\n", expiresAt.Format(time.RFC3339), now.Format(time.RFC3339))
|
||||
} else {
|
||||
remaining := expiresAt.Sub(now)
|
||||
fmt.Printf("✅ Token 未过期,剩余: %s\n", remaining.Round(time.Second))
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: 检查 CLIProxyAPIPlus 保存的 Token
|
||||
fmt.Println("\n[Step 3] 检查 CLIProxyAPIPlus 保存的 Token")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
cliProxyDir := filepath.Join(homeDir, ".cli-proxy-api")
|
||||
files, _ := os.ReadDir(cliProxyDir)
|
||||
for _, f := range files {
|
||||
if strings.HasPrefix(f.Name(), "kiro") && strings.HasSuffix(f.Name(), ".json") {
|
||||
filePath := filepath.Join(cliProxyDir, f.Name())
|
||||
cliToken, err := loadCLIProxyToken(filePath)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ %s: 加载失败 - %v\n", f.Name(), err)
|
||||
continue
|
||||
}
|
||||
fmt.Printf("📄 %s:\n", f.Name())
|
||||
fmt.Printf(" AuthMethod: %s\n", cliToken.AuthMethod)
|
||||
fmt.Printf(" Provider: %s\n", cliToken.Provider)
|
||||
fmt.Printf(" ExpiresAt: %s\n", cliToken.ExpiresAt)
|
||||
fmt.Printf(" AccessToken (前50字符): %s...\n", truncate(cliToken.AccessToken, 50))
|
||||
|
||||
// 比较 Token
|
||||
if cliToken.AccessToken == ideToken.AccessToken {
|
||||
fmt.Printf(" ✅ AccessToken 与 IDE Token 一致\n")
|
||||
} else {
|
||||
fmt.Printf(" ⚠️ AccessToken 与 IDE Token 不一致!\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 4: 直接测试 Token 有效性 (调用 Kiro API)
|
||||
fmt.Println("\n[Step 4] 直接测试 Token 有效性")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
testTokenValidity(ideToken.AccessToken, ideToken.Region)
|
||||
|
||||
// Step 5: 测试不同的请求头格式
|
||||
fmt.Println("\n[Step 5] 测试不同的请求头格式")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
testDifferentHeaders(ideToken.AccessToken, ideToken.Region)
|
||||
|
||||
// Step 6: 解析 JWT 内容
|
||||
fmt.Println("\n[Step 6] 解析 JWT Token 内容")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
parseJWT(ideToken.AccessToken)
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("=", 60))
|
||||
fmt.Println(" 排查完成")
|
||||
fmt.Println(strings.Repeat("=", 60))
|
||||
}
|
||||
|
||||
func loadKiroIDEToken(path string) (*KiroIDEToken, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var token KiroIDEToken
|
||||
if err := json.Unmarshal(data, &token); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &token, nil
|
||||
}
|
||||
|
||||
func loadCLIProxyToken(path string) (*CLIProxyToken, error) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var token CLIProxyToken
|
||||
if err := json.Unmarshal(data, &token); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &token, nil
|
||||
}
|
||||
|
||||
func parseExpiresAt(s string) (time.Time, error) {
|
||||
formats := []string{
|
||||
time.RFC3339,
|
||||
"2006-01-02T15:04:05.000Z",
|
||||
"2006-01-02T15:04:05Z",
|
||||
}
|
||||
for _, f := range formats {
|
||||
if t, err := time.Parse(f, s); err == nil {
|
||||
return t, nil
|
||||
}
|
||||
}
|
||||
return time.Time{}, fmt.Errorf("无法解析时间格式: %s", s)
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n]
|
||||
}
|
||||
|
||||
func testTokenValidity(accessToken, region string) {
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
|
||||
// 测试 GetUsageLimits API
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
|
||||
req.Header.Set("x-amz-target", "AmazonCodeWhispererService.GetUsageLimits")
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
fmt.Printf("请求 URL: %s\n", url)
|
||||
fmt.Printf("请求头:\n")
|
||||
for k, v := range req.Header {
|
||||
if k == "Authorization" {
|
||||
fmt.Printf(" %s: Bearer %s...\n", k, truncate(v[0][7:], 30))
|
||||
} else {
|
||||
fmt.Printf(" %s: %s\n", k, v[0])
|
||||
}
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 请求失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
fmt.Printf("响应状态: %d\n", resp.StatusCode)
|
||||
fmt.Printf("响应内容: %s\n", string(respBody))
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Println("✅ Token 有效!")
|
||||
} else if resp.StatusCode == 403 {
|
||||
fmt.Println("❌ Token 无效或已过期 (403)")
|
||||
}
|
||||
}
|
||||
|
||||
func testDifferentHeaders(accessToken, region string) {
|
||||
if region == "" {
|
||||
region = "us-east-1"
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
headers map[string]string
|
||||
}{
|
||||
{
|
||||
name: "最小请求头",
|
||||
headers: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": "Bearer " + accessToken,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "模拟 kiro2api_go1 风格",
|
||||
headers: map[string]string{
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "text/event-stream",
|
||||
"Authorization": "Bearer " + accessToken,
|
||||
"x-amzn-kiro-agent-mode": "vibe",
|
||||
"x-amzn-codewhisperer-optout": "true",
|
||||
"amz-sdk-invocation-id": "test-invocation-id",
|
||||
"amz-sdk-request": "attempt=1; max=3",
|
||||
"x-amz-user-agent": "aws-sdk-js/1.0.27 KiroIDE-0.8.0-abc123",
|
||||
"User-Agent": "aws-sdk-js/1.0.27 ua/2.1 os/windows#10.0 lang/js md/nodejs#20.16.0 api/codewhispererstreaming#1.0.27 m/E KiroIDE-0.8.0-abc123",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "模拟 CLIProxyAPIPlus 风格",
|
||||
headers: map[string]string{
|
||||
"Content-Type": "application/x-amz-json-1.0",
|
||||
"x-amz-target": "AmazonCodeWhispererService.GetUsageLimits",
|
||||
"Authorization": "Bearer " + accessToken,
|
||||
"Accept": "application/json",
|
||||
"amz-sdk-invocation-id": "test-invocation-id",
|
||||
"amz-sdk-request": "attempt=1; max=1",
|
||||
"Connection": "close",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
for _, test := range tests {
|
||||
fmt.Printf("\n测试: %s\n", test.name)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
for k, v := range test.headers {
|
||||
req.Header.Set(k, v)
|
||||
}
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf(" ❌ 请求失败: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Printf(" ✅ 成功 (HTTP %d)\n", resp.StatusCode)
|
||||
} else {
|
||||
fmt.Printf(" ❌ 失败 (HTTP %d): %s\n", resp.StatusCode, truncate(string(respBody), 100))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func parseJWT(token string) {
|
||||
parts := strings.Split(token, ".")
|
||||
if len(parts) < 2 {
|
||||
fmt.Println("Token 不是 JWT 格式")
|
||||
return
|
||||
}
|
||||
|
||||
// 解码 header
|
||||
headerData, err := base64.RawURLEncoding.DecodeString(parts[0])
|
||||
if err != nil {
|
||||
fmt.Printf("无法解码 JWT header: %v\n", err)
|
||||
} else {
|
||||
var header map[string]interface{}
|
||||
json.Unmarshal(headerData, &header)
|
||||
fmt.Printf("JWT Header: %v\n", header)
|
||||
}
|
||||
|
||||
// 解码 payload
|
||||
payloadData, err := base64.RawURLEncoding.DecodeString(parts[1])
|
||||
if err != nil {
|
||||
fmt.Printf("无法解码 JWT payload: %v\n", err)
|
||||
} else {
|
||||
var payload map[string]interface{}
|
||||
json.Unmarshal(payloadData, &payload)
|
||||
fmt.Printf("JWT Payload:\n")
|
||||
for k, v := range payload {
|
||||
fmt.Printf(" %s: %v\n", k, v)
|
||||
}
|
||||
|
||||
// 检查过期时间
|
||||
if exp, ok := payload["exp"].(float64); ok {
|
||||
expTime := time.Unix(int64(exp), 0)
|
||||
if time.Now().After(expTime) {
|
||||
fmt.Printf(" ⚠️ JWT 已过期! exp=%s\n", expTime.Format(time.RFC3339))
|
||||
} else {
|
||||
fmt.Printf(" ✅ JWT 未过期, 剩余: %s\n", expTime.Sub(time.Now()).Round(time.Second))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,367 +0,0 @@
|
||||
// 测试脚本 2:通过 CLIProxyAPIPlus 代理层排查问题
|
||||
// 运行方式: go run test_proxy_debug.go
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
ProxyURL = "http://localhost:8317"
|
||||
APIKey = "your-api-key-1"
|
||||
)
|
||||
|
||||
func main() {
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
fmt.Println(" CLIProxyAPIPlus 代理层问题排查")
|
||||
fmt.Println("=" + strings.Repeat("=", 59))
|
||||
|
||||
// Step 1: 检查代理服务状态
|
||||
fmt.Println("\n[Step 1] 检查代理服务状态")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
resp, err := http.Get(ProxyURL + "/health")
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 代理服务不可达: %v\n", err)
|
||||
fmt.Println("请确保服务正在运行: go run ./cmd/server/main.go")
|
||||
return
|
||||
}
|
||||
resp.Body.Close()
|
||||
fmt.Printf("✅ 代理服务正常 (HTTP %d)\n", resp.StatusCode)
|
||||
|
||||
// Step 2: 获取模型列表
|
||||
fmt.Println("\n[Step 2] 获取模型列表")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
models := getModels()
|
||||
if len(models) == 0 {
|
||||
fmt.Println("❌ 没有可用的模型,检查凭据加载")
|
||||
checkCredentials()
|
||||
return
|
||||
}
|
||||
fmt.Printf("✅ 找到 %d 个模型:\n", len(models))
|
||||
for _, m := range models {
|
||||
fmt.Printf(" - %s\n", m)
|
||||
}
|
||||
|
||||
// Step 3: 测试模型请求 - 捕获详细错误
|
||||
fmt.Println("\n[Step 3] 测试模型请求(详细日志)")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
if len(models) > 0 {
|
||||
testModel := models[0]
|
||||
testModelRequest(testModel)
|
||||
}
|
||||
|
||||
// Step 4: 检查代理内部 Token 状态
|
||||
fmt.Println("\n[Step 4] 检查代理服务加载的凭据")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
checkProxyCredentials()
|
||||
|
||||
// Step 5: 对比直接请求和代理请求
|
||||
fmt.Println("\n[Step 5] 对比直接请求 vs 代理请求")
|
||||
fmt.Println("-" + strings.Repeat("-", 59))
|
||||
|
||||
compareDirectVsProxy()
|
||||
|
||||
fmt.Println("\n" + strings.Repeat("=", 60))
|
||||
fmt.Println(" 排查完成")
|
||||
fmt.Println(strings.Repeat("=", 60))
|
||||
}
|
||||
|
||||
func getModels() []string {
|
||||
req, _ := http.NewRequest("GET", ProxyURL+"/v1/models", nil)
|
||||
req.Header.Set("Authorization", "Bearer "+APIKey)
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 请求失败: %v\n", err)
|
||||
return nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
fmt.Printf("❌ HTTP %d: %s\n", resp.StatusCode, string(body))
|
||||
return nil
|
||||
}
|
||||
|
||||
var result struct {
|
||||
Data []struct {
|
||||
ID string `json:"id"`
|
||||
} `json:"data"`
|
||||
}
|
||||
json.Unmarshal(body, &result)
|
||||
|
||||
models := make([]string, len(result.Data))
|
||||
for i, m := range result.Data {
|
||||
models[i] = m.ID
|
||||
}
|
||||
return models
|
||||
}
|
||||
|
||||
func checkCredentials() {
|
||||
homeDir, _ := os.UserHomeDir()
|
||||
cliProxyDir := filepath.Join(homeDir, ".cli-proxy-api")
|
||||
|
||||
fmt.Printf("\n检查凭据目录: %s\n", cliProxyDir)
|
||||
files, err := os.ReadDir(cliProxyDir)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法读取目录: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, f := range files {
|
||||
if strings.HasSuffix(f.Name(), ".json") {
|
||||
fmt.Printf(" 📄 %s\n", f.Name())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testModelRequest(model string) {
|
||||
fmt.Printf("测试模型: %s\n", model)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"model": model,
|
||||
"messages": []map[string]string{
|
||||
{"role": "user", "content": "Say 'OK' if you receive this."},
|
||||
},
|
||||
"max_tokens": 50,
|
||||
"stream": false,
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", ProxyURL+"/v1/chat/completions", bytes.NewBuffer(body))
|
||||
req.Header.Set("Authorization", "Bearer "+APIKey)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
fmt.Println("\n发送请求:")
|
||||
fmt.Printf(" URL: %s/v1/chat/completions\n", ProxyURL)
|
||||
fmt.Printf(" Model: %s\n", model)
|
||||
|
||||
client := &http.Client{Timeout: 60 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 请求失败: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
|
||||
fmt.Printf("\n响应:\n")
|
||||
fmt.Printf(" Status: %d\n", resp.StatusCode)
|
||||
fmt.Printf(" Headers:\n")
|
||||
for k, v := range resp.Header {
|
||||
fmt.Printf(" %s: %s\n", k, strings.Join(v, ", "))
|
||||
}
|
||||
|
||||
// 格式化 JSON 输出
|
||||
var prettyJSON bytes.Buffer
|
||||
if err := json.Indent(&prettyJSON, respBody, " ", " "); err == nil {
|
||||
fmt.Printf(" Body:\n %s\n", prettyJSON.String())
|
||||
} else {
|
||||
fmt.Printf(" Body: %s\n", string(respBody))
|
||||
}
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Println("\n✅ 请求成功!")
|
||||
} else {
|
||||
fmt.Println("\n❌ 请求失败!分析错误原因...")
|
||||
analyzeError(respBody)
|
||||
}
|
||||
}
|
||||
|
||||
func analyzeError(body []byte) {
|
||||
var errResp struct {
|
||||
Message string `json:"message"`
|
||||
Reason string `json:"reason"`
|
||||
Error struct {
|
||||
Message string `json:"message"`
|
||||
Type string `json:"type"`
|
||||
} `json:"error"`
|
||||
}
|
||||
json.Unmarshal(body, &errResp)
|
||||
|
||||
if errResp.Message != "" {
|
||||
fmt.Printf("错误消息: %s\n", errResp.Message)
|
||||
}
|
||||
if errResp.Reason != "" {
|
||||
fmt.Printf("错误原因: %s\n", errResp.Reason)
|
||||
}
|
||||
if errResp.Error.Message != "" {
|
||||
fmt.Printf("错误详情: %s (类型: %s)\n", errResp.Error.Message, errResp.Error.Type)
|
||||
}
|
||||
|
||||
// 分析常见错误
|
||||
bodyStr := string(body)
|
||||
if strings.Contains(bodyStr, "bearer token") || strings.Contains(bodyStr, "invalid") {
|
||||
fmt.Println("\n可能的原因:")
|
||||
fmt.Println(" 1. Token 已过期 - 需要刷新")
|
||||
fmt.Println(" 2. Token 格式不正确 - 检查凭据文件")
|
||||
fmt.Println(" 3. 代理服务加载了旧的 Token")
|
||||
}
|
||||
}
|
||||
|
||||
func checkProxyCredentials() {
|
||||
// 尝试通过管理 API 获取凭据状态
|
||||
req, _ := http.NewRequest("GET", ProxyURL+"/v0/management/auth/list", nil)
|
||||
// 使用配置中的管理密钥 admin123
|
||||
req.Header.Set("Authorization", "Bearer admin123")
|
||||
|
||||
client := &http.Client{Timeout: 10 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法访问管理 API: %v\n", err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Println("管理 API 返回的凭据列表:")
|
||||
var prettyJSON bytes.Buffer
|
||||
if err := json.Indent(&prettyJSON, body, " ", " "); err == nil {
|
||||
fmt.Printf("%s\n", prettyJSON.String())
|
||||
} else {
|
||||
fmt.Printf("%s\n", string(body))
|
||||
}
|
||||
} else {
|
||||
fmt.Printf("管理 API 返回: HTTP %d\n", resp.StatusCode)
|
||||
fmt.Printf("响应: %s\n", truncate(string(body), 200))
|
||||
}
|
||||
}
|
||||
|
||||
func compareDirectVsProxy() {
|
||||
homeDir, _ := os.UserHomeDir()
|
||||
tokenPath := filepath.Join(homeDir, ".aws", "sso", "cache", "kiro-auth-token.json")
|
||||
|
||||
data, err := os.ReadFile(tokenPath)
|
||||
if err != nil {
|
||||
fmt.Printf("❌ 无法读取 Token 文件: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
var token struct {
|
||||
AccessToken string `json:"accessToken"`
|
||||
Region string `json:"region"`
|
||||
}
|
||||
json.Unmarshal(data, &token)
|
||||
|
||||
if token.Region == "" {
|
||||
token.Region = "us-east-1"
|
||||
}
|
||||
|
||||
// 直接请求
|
||||
fmt.Println("\n1. 直接请求 Kiro API:")
|
||||
directSuccess := testDirectKiroAPI(token.AccessToken, token.Region)
|
||||
|
||||
// 通过代理请求
|
||||
fmt.Println("\n2. 通过代理请求:")
|
||||
proxySuccess := testProxyAPI()
|
||||
|
||||
// 结论
|
||||
fmt.Println("\n结论:")
|
||||
if directSuccess && !proxySuccess {
|
||||
fmt.Println(" ⚠️ 直接请求成功,代理请求失败")
|
||||
fmt.Println(" 问题在于 CLIProxyAPIPlus 代理层")
|
||||
fmt.Println(" 可能原因:")
|
||||
fmt.Println(" 1. 代理服务使用了过期的 Token")
|
||||
fmt.Println(" 2. Token 刷新逻辑有问题")
|
||||
fmt.Println(" 3. 请求头构造不正确")
|
||||
} else if directSuccess && proxySuccess {
|
||||
fmt.Println(" ✅ 两者都成功")
|
||||
} else if !directSuccess && !proxySuccess {
|
||||
fmt.Println(" ❌ 两者都失败 - Token 本身可能有问题")
|
||||
}
|
||||
}
|
||||
|
||||
func testDirectKiroAPI(accessToken, region string) bool {
|
||||
url := fmt.Sprintf("https://codewhisperer.%s.amazonaws.com", region)
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"origin": "AI_EDITOR",
|
||||
"isEmailRequired": true,
|
||||
"resourceType": "AGENTIC_REQUEST",
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", url, bytes.NewBuffer(body))
|
||||
req.Header.Set("Content-Type", "application/x-amz-json-1.0")
|
||||
req.Header.Set("x-amz-target", "AmazonCodeWhispererService.GetUsageLimits")
|
||||
req.Header.Set("Authorization", "Bearer "+accessToken)
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 30 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf(" ❌ 请求失败: %v\n", err)
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Printf(" ✅ 成功 (HTTP %d)\n", resp.StatusCode)
|
||||
return true
|
||||
}
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
fmt.Printf(" ❌ 失败 (HTTP %d): %s\n", resp.StatusCode, truncate(string(respBody), 100))
|
||||
return false
|
||||
}
|
||||
|
||||
func testProxyAPI() bool {
|
||||
models := getModels()
|
||||
if len(models) == 0 {
|
||||
fmt.Println(" ❌ 没有可用模型")
|
||||
return false
|
||||
}
|
||||
|
||||
payload := map[string]interface{}{
|
||||
"model": models[0],
|
||||
"messages": []map[string]string{
|
||||
{"role": "user", "content": "Say OK"},
|
||||
},
|
||||
"max_tokens": 10,
|
||||
"stream": false,
|
||||
}
|
||||
body, _ := json.Marshal(payload)
|
||||
|
||||
req, _ := http.NewRequest("POST", ProxyURL+"/v1/chat/completions", bytes.NewBuffer(body))
|
||||
req.Header.Set("Authorization", "Bearer "+APIKey)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{Timeout: 60 * time.Second}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
fmt.Printf(" ❌ 请求失败: %v\n", err)
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == 200 {
|
||||
fmt.Printf(" ✅ 成功 (HTTP %d)\n", resp.StatusCode)
|
||||
return true
|
||||
}
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
fmt.Printf(" ❌ 失败 (HTTP %d): %s\n", resp.StatusCode, truncate(string(respBody), 100))
|
||||
return false
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n] + "..."
|
||||
}
|
||||
Reference in New Issue
Block a user