Codex 额度用量统计

用一段 Python 脚本读取 `auth.json` 中的凭证,请求 ChatGPT 的 `/backend-api/wham/usage` 接口,查看 Codex 的额度剩余与重置时间。

如果你想查看 Codex 当前账号的额度剩余情况,可以直接写一个小脚本去请求 ChatGPT 的 /backend-api/wham/usage 接口。

这类脚本的思路很简单:

  • auth.json 里读取 tokens.access_tokentokens.account_id
  • 请求 https://chatgpt.com/backend-api/wham/usage
  • 在请求头里带上 Authorization: Bearer ...ChatGPT-Account-Id
  • 解析返回结果里的五小时窗口和周窗口额度

适合做什么

这个方法适合你在本地快速确认:

  • 五小时额度还剩多少
  • 周额度还剩多少
  • 额度会在什么时候重置

如果你有多个账号,也可以让脚本批量读取 account/*.auth.json,统一输出一张摘要表。 auth.json文件可以从 ~/.codex/ 目录下找到, 对应你当前登录的chatgpt账户。

最关键的输入

脚本通常只依赖两项凭证:

  • access_token
  • account_id

它们一般都可以从本地 auth.json 中拿到。只要这两个值有效,请求头就能按下面的形式组织:

1
2
3
4
5
6
7
8
headers = {
    "Authorization": f"Bearer {auth_token}",
    "Accept": "application/json",
    "ChatGPT-Account-Id": auth_account_id,
    "Origin": "https://chatgpt.com",
    "Referer": "https://chatgpt.com/",
    "User-Agent": "Mozilla/5.0",
}

返回结果怎么看

接口返回后,重点看的是两类窗口:

  • five_hour
  • weekly

脚本可以把它们统一整理成下面几项:

  • 剩余额度百分比
  • 重置时间
  • 对应的时间窗口长度

如果接口字段名有差异,也可以顺手兼容 primary_windowsecondary_windowfive_hour_limitweekly_limit 这些变体。

常见问题

如果脚本返回 401,通常说明 access_token 已过期或无效。

如果返回 403,通常说明当前账号没有权限访问这个接口,或者账号状态异常。

如果你看到同一个响应里字段命名不一致,也不用太意外。实际处理时最好把不同命名方式统一映射后再输出。

参考链接

代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
import argparse
import base64
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

import requests


UTC = timezone.utc
CST = timezone(timedelta(hours=8), name="CST")
JSONDict = dict[str, Any]


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Query ChatGPT Codex usage from /backend-api/wham/usage."
    )
    parser.add_argument(
        "account_name",
        nargs="?",
        help="Account name used to load account/<account_name>.auth.json. If omitted, load all *.auth.json files in account/.",
    )
    parser.add_argument(
        "--account-dir",
        default="account",
        help="Directory containing <account_name>.auth.json files.",
    )
    parser.add_argument(
        "--chatgpt-url",
        default="https://chatgpt.com/backend-api/wham/usage",
        help="ChatGPT usage endpoint.",
    )
    parser.add_argument(
        "--raw-json",
        action="store_true",
        help="Print the full JSON response body.",
    )
    parser.add_argument(
        "--raw-headers",
        action="store_true",
        help="Print response headers.",
    )
    return parser.parse_args()


def print_json(data: Any) -> None:
    print(json.dumps(data, indent=2, ensure_ascii=False))


def load_auth_json(path_str: str | Path | None) -> JSONDict | None:
    if not path_str:
        return None
    path = Path(path_str).expanduser()
    if not path.is_file():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def get_nested_string(data: JSONDict | None, *keys: str) -> str | None:
    current: Any = data
    for key in keys:
        if not isinstance(current, dict):
            return None
        current = current.get(key)
    return current if isinstance(current, str) and current else None


def format_dt(dt: datetime | None) -> str:
    if dt is None:
        return "-"
    return dt.astimezone(CST).strftime("%Y-%m-%d %H:%M:%S %Z")


def format_cst(dt: datetime | None) -> str:
    return format_dt(dt)


def epoch_ms_to_dt(value: Any) -> datetime | None:
    if value is None:
        return None
    try:
        raw = int(value)
    except (TypeError, ValueError):
        return None
    # Newer responses sometimes use epoch seconds, older ones use epoch milliseconds.
    timestamp = raw / 1000 if raw > 10**11 else raw
    return datetime.fromtimestamp(timestamp, tz=UTC)


def first_dict(data: Any, *keys: str) -> JSONDict | None:
    for key in keys:
        value = data.get(key) if isinstance(data, dict) else None
        if isinstance(value, dict):
            return value
    return None


def decode_jwt_exp(token: str) -> datetime | None:
    parts = token.split(".")
    if len(parts) != 3:
        return None
    try:
        payload = parts[1]
        payload += "=" * (-len(payload) % 4)
        data = json.loads(base64.urlsafe_b64decode(payload.encode("ascii")))
        exp = data.get("exp")
        if exp is None:
            return None
        return datetime.fromtimestamp(int(exp), tz=UTC)
    except (ValueError, TypeError, json.JSONDecodeError):
        return None


def get_percent_left(value: JSONDict) -> float | int | None:
    percent_left = value.get("percent_left")
    if percent_left is None:
        percent_left = value.get("remaining_percent")
    if percent_left is not None:
        return percent_left

    used_percent = value.get("used_percent")
    try:
        if used_percent is not None:
            return max(0, 100 - float(used_percent))
    except (TypeError, ValueError):
        return None
    return None


def resolve_limit_window(value: JSONDict) -> JSONDict:
    if (
        "reset_at" not in value
        and "reset_time_ms" not in value
        and isinstance(value.get("primary_window"), dict)
    ):
        return value["primary_window"]
    return value


def parse_limit_entry(name: str, value: Any) -> JSONDict | None:
    if not isinstance(value, dict):
        return None

    value = resolve_limit_window(value)

    percent_left = get_percent_left(value)
    reset_time_ms = value.get("reset_time_ms")
    if reset_time_ms is None:
        reset_time_ms = value.get("reset_at")

    window_seconds = value.get("limit_window_seconds")

    return {
        "name": name,
        "percent_left": percent_left,
        "reset_time_ms": reset_time_ms,
        "reset_at": epoch_ms_to_dt(reset_time_ms),
        "limit_window_seconds": window_seconds,
    }


def infer_limit_name(window_seconds: Any) -> str | None:
    if not isinstance(window_seconds, (int, float)):
        return None
    if window_seconds <= 6 * 3600:
        return "five_hour"
    if window_seconds >= 6 * 24 * 3600:
        return "weekly"
    return None


def relabel_rate_limits(primary: JSONDict | None, secondary: JSONDict | None) -> tuple[JSONDict | None, JSONDict | None]:
    for entry in (primary, secondary):
        if not entry:
            continue
        inferred_name = infer_limit_name(entry.get("limit_window_seconds"))
        if inferred_name:
            entry["name"] = inferred_name

    if primary and secondary and primary.get("name") == secondary.get("name"):
        if primary.get("name") == "weekly":
            primary["name"] = "five_hour"
        else:
            secondary["name"] = "weekly"

    if primary and primary.get("name") == "weekly" and secondary is None:
        return None, primary

    if secondary and secondary.get("name") == "five_hour" and primary is None:
        return secondary, None

    return primary, secondary


def parse_rate_limits(data: JSONDict) -> tuple[JSONDict | None, JSONDict | None]:
    primary = None
    secondary = None

    for primary_key in ("five_hour", "five_hour_limit", "five_hour_rate_limit", "primary"):
        if primary_key in data:
            primary = parse_limit_entry("five_hour", data.get(primary_key))
            if primary:
                break

    for secondary_key in ("weekly", "weekly_limit", "weekly_rate_limit", "secondary"):
        if secondary_key in data:
            secondary = parse_limit_entry("weekly", data.get(secondary_key))
            if secondary:
                break

    if primary is None:
        primary = parse_limit_entry("five_hour", data.get("primary_window"))
    if secondary is None:
        secondary = parse_limit_entry("weekly", data.get("secondary_window"))

    return relabel_rate_limits(primary, secondary)


def format_percent(value: Any) -> str:
    return f"{value:g}" if isinstance(value, (int, float)) else str(value if value is not None else "-")


def percent_sort_value(value: Any, descending: bool) -> tuple[int, float]:
    if isinstance(value, (int, float)):
        numeric_value = float(value)
        return (0, -numeric_value if descending else numeric_value)
    return (1, 0.0)


def get_auth_paths(account_dir: str, account_name: str | None) -> list[Path]:
    base_dir = Path(account_dir)
    if account_name:
        return [base_dir / f"{account_name}.auth.json"]
    return sorted(base_dir.glob("*.auth.json"))


def get_account_name_from_path(path: Path) -> str:
    suffix = ".auth.json"
    return path.name[: -len(suffix)] if path.name.endswith(suffix) else path.stem


def build_summary_row(account_name: str, five_hour: JSONDict | None, weekly: JSONDict | None) -> JSONDict:
    return {
        "account": account_name,
        "five_hour_percent": five_hour["percent_left"] if five_hour else None,
        "weekly_percent": weekly["percent_left"] if weekly else None,
        "weekly_reset_at": weekly["reset_at"] if weekly else None,
    }


def print_summary_rows(rows: list[JSONDict]) -> None:
    if not rows:
        return

    sorted_rows = sorted(
        rows,
        key=lambda row: (
            percent_sort_value(row.get("five_hour_percent"), descending=True),
            percent_sort_value(row.get("weekly_percent"), descending=True),
            format_cst(row.get("weekly_reset_at")),
            row["account"],
        ),
    )

    display_rows = []
    for row in sorted_rows:
        display_rows.append(
            {
                "account": str(row["account"]),
                "five_hour": format_percent(row.get("five_hour_percent")),
                "weekly": format_percent(row.get("weekly_percent")),
                "weekly_reset": format_cst(row.get("weekly_reset_at")),
            }
        )

    headers = {
        "account": "account",
        "five_hour": "five_hour%",
        "weekly": "weekly%",
        "weekly_reset": "weekly_reset",
    }
    widths = {
        key: max(len(headers[key]), max(len(item[key]) for item in display_rows))
        for key in headers
    }

    print(
        f"{headers['account']:<{widths['account']}}  "
        f"{headers['five_hour']:>{widths['five_hour']}}  "
        f"{headers['weekly']:>{widths['weekly']}}  "
        f"{headers['weekly_reset']:<{widths['weekly_reset']}}"
    )
    for item in display_rows:
        print(
            f"{item['account']:<{widths['account']}}  "
            f"{item['five_hour']:>{widths['five_hour']}}  "
            f"{item['weekly']:>{widths['weekly']}}  "
            f"{item['weekly_reset']:<{widths['weekly_reset']}}"
        )


def validate_token_inputs(
    token: str,
    account_id: str,
    auth_token: str | None,
    auth_account_id: str | None,
) -> int | None:
    if token.startswith("sess-"):
        print("status: invalid_token_type", file=sys.stderr)
        print(
            "message: --chatgpt-token looks like a session token (sess-...). Use the JWT access_token instead.",
            file=sys.stderr,
        )
        if auth_token:
            print(
                "hint: Found tokens.access_token in auth.json; omit --chatgpt-token or pass that value instead.",
                file=sys.stderr,
            )
        return 1

    token_exp = decode_jwt_exp(token)
    if token_exp is not None and token_exp <= datetime.now(UTC):
        print("status: expired", file=sys.stderr)
        print(f"message: access_token expired at {format_dt(token_exp)}", file=sys.stderr)
        if auth_token and auth_token != token:
            auth_token_exp = decode_jwt_exp(auth_token)
            hint = format_dt(auth_token_exp) if auth_token_exp else "unknown time"
            print(f"hint: auth.json contains a different access_token expiring at {hint}", file=sys.stderr)
        return 1

    if auth_account_id and account_id != auth_account_id:
        print("warning: supplied --account-id does not match auth.json tokens.account_id", file=sys.stderr)

    return None


def handle_error_response(response: requests.Response, data: Any, raw_json: bool) -> int | None:
    if response.status_code == 401:
        print("status: expired", file=sys.stderr)
        print("message: Token 已过期或无效", file=sys.stderr)
        if raw_json:
            print_json(data)
        return 3

    if response.status_code == 403:
        print("status: forbidden", file=sys.stderr)
        print("message: 账号已被封禁或无权访问", file=sys.stderr)
        if raw_json:
            print_json(data)
        return 3

    if response.status_code >= 400:
        print(f"HTTP {response.status_code}", file=sys.stderr)
        print_json(data)
        return 3

    return None


def fetch_chatgpt_usage(auth_path: Path, args: argparse.Namespace) -> tuple[int, JSONDict | None]:
    auth_data = load_auth_json(auth_path)
    account_name = get_account_name_from_path(auth_path)
    auth_token = get_nested_string(auth_data, "tokens", "access_token")
    auth_account_id = get_nested_string(auth_data, "tokens", "account_id")

    if not auth_data:
        print(f"{account_name}: auth file not found or invalid", file=sys.stderr)
        return 1, None

    if not auth_token:
        print(f"{account_name}: missing access_token", file=sys.stderr)
        return 1, None

    if not auth_account_id:
        print(f"{account_name}: missing account_id", file=sys.stderr)
        return 1, None

    validation_error = validate_token_inputs(
        auth_token,
        auth_account_id,
        auth_token,
        auth_account_id,
    )
    if validation_error is not None:
        return validation_error, None

    headers = {
        "Authorization": f"Bearer {auth_token}",
        "Accept": "application/json",
        "ChatGPT-Account-Id": auth_account_id,
        "Origin": "https://chatgpt.com",
        "Referer": "https://chatgpt.com/",
        "User-Agent": "Mozilla/5.0",
    }

    try:
        response = requests.get(args.chatgpt_url, headers=headers, timeout=60)
    except requests.RequestException as exc:
        print(f"Request failed: {exc}", file=sys.stderr)
        return 2, None

    if args.raw_headers:
        print("=== Headers ===")
        print_json(dict(response.headers))
        print()

    try:
        data = response.json()
    except ValueError:
        print(f"HTTP {response.status_code}", file=sys.stderr)
        print(response.text, file=sys.stderr)
        return 3, None

    error_response = handle_error_response(response, data, args.raw_json)
    if error_response is not None:
        return error_response, None

    if args.raw_json:
        print("=== Raw JSON ===")
        print_json(data)
        print()

    rate_limits = first_dict(data, "rate_limit", "rate_limits")

    if rate_limits is None:
        return 0, build_summary_row(account_name, None, None)

    five_hour, weekly = parse_rate_limits(rate_limits)
    return 0, build_summary_row(account_name, five_hour, weekly)


def main() -> None:
    args = parse_args()
    auth_paths = get_auth_paths(args.account_dir, args.account_name)
    if not auth_paths:
        print("No auth files found.", file=sys.stderr)
        sys.exit(1)

    exit_code = 0
    summary_rows: list[JSONDict] = []
    for auth_path in auth_paths:
        current_exit_code, summary_row = fetch_chatgpt_usage(auth_path, args)
        exit_code = max(exit_code, current_exit_code)
        if summary_row is not None and not args.raw_json:
            summary_rows.append(summary_row)

    if summary_rows:
        print_summary_rows(summary_rows)
    sys.exit(exit_code)


if __name__ == "__main__":
    main()
记录并分享
使用 Hugo 构建
主题 StackJimmy 设计