Codex クォータ使用統計

Python スクリプトを使用して、`auth.json` の証明書を読み取り、ChatGPT の `/backend-api/wham/usage` インターフェイスをリクエストし、Codex の残高とリセット時間を確認します。

現在の Codex アカウントの残高を確認したい場合は、ChatGPT の /backend-api/wham/usage インターフェイスを要求する小さなスクリプトを直接作成できます。

このタイプのスクリプトの考え方は非常にシンプルです。

  • auth.json から tokens.access_token および tokens.account_id を読み取ります
  • リクエスト https://chatgpt.com/backend-api/wham/usage
  • リクエストヘッダーに Authorization: Bearer ...ChatGPT-Account-Id を含めます
  • 返された結果の 5 時間のウィンドウと週ごとのウィンドウ クォータを解析します。

やるべきことに適した

この方法は、ローカルですばやく確認するのに適しています。

  • 5 時間の割り当てはどれくらい残っていますか?
  • 週の割り当てはどれくらい残っていますか?
  • クォータはいつリセットされますか?

複数のアカウントをお持ちの場合は、スクリプトで account/*.auth.json を一括で読み込み、一律に集計表を出力することもできます。 auth.json ファイルは、現在ログインしている chatgpt アカウントに対応する ~/.codex/ ディレクトリにあります。

最も重要なインプット

通常、スクリプトは次の 2 つの資格情報のみに依存します。

  • access_token
  • account_id

これらは通常、ローカルの auth.json から入手できます。これら 2 つの値が有効である限り、リクエスト ヘッダーは次のように構成できます。

1
2
3
4
5
6
7
8
headers = {
    "Authorization": f"Bearer {auth_token}",
    "Accept": "application/json",
    "ChatGPT-Account-Id": auth_account_id,
    "Origin": "https://chatgpt.com",
    "Referer": "https://chatgpt.com/",
    "User-Agent": "Mozilla/5.0",
}

返された結果の見方

インターフェイスが戻った後は、次の 2 種類のウィンドウに焦点が当てられます。

  • five_hour
  • weekly

スクリプトはそれらを次の項目に整理できます。

  • 残高の割合
  • リセット時間
  • 対応する時間ウィンドウの長さ

インターフェイス フィールド名が異なる場合、バリアント primary_windowsecondary_windowfive_hour_limit、および weekly_limit とも互換性がある可能性があります。

よくある質問

スクリプトが 401 を返した場合、通常、access_token の有効期限が切れているか、無効であることを意味します。

403 が返された場合は、通常、現在のアカウントにこのインターフェイスにアクセスする権限がないか、アカウントのステータスが異常であることを意味します。

同じ応答内でフィールドの名前が一貫していない場合でも驚かないでください。実際の処理では、異なる命名方法を統一的にマッピングしてから出力するのがベストです。

参考リンク

コード

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
import argparse
import base64
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any

import requests


UTC = timezone.utc
CST = timezone(timedelta(hours=8), name="CST")
JSONDict = dict[str, Any]


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Query ChatGPT Codex usage from /backend-api/wham/usage."
    )
    parser.add_argument(
        "account_name",
        nargs="?",
        help="Account name used to load account/<account_name>.auth.json. If omitted, load all *.auth.json files in account/.",
    )
    parser.add_argument(
        "--account-dir",
        default="account",
        help="Directory containing <account_name>.auth.json files.",
    )
    parser.add_argument(
        "--chatgpt-url",
        default="https://chatgpt.com/backend-api/wham/usage",
        help="ChatGPT usage endpoint.",
    )
    parser.add_argument(
        "--raw-json",
        action="store_true",
        help="Print the full JSON response body.",
    )
    parser.add_argument(
        "--raw-headers",
        action="store_true",
        help="Print response headers.",
    )
    return parser.parse_args()


def print_json(data: Any) -> None:
    print(json.dumps(data, indent=2, ensure_ascii=False))


def load_auth_json(path_str: str | Path | None) -> JSONDict | None:
    if not path_str:
        return None
    path = Path(path_str).expanduser()
    if not path.is_file():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except (OSError, json.JSONDecodeError):
        return None


def get_nested_string(data: JSONDict | None, *keys: str) -> str | None:
    current: Any = data
    for key in keys:
        if not isinstance(current, dict):
            return None
        current = current.get(key)
    return current if isinstance(current, str) and current else None


def format_dt(dt: datetime | None) -> str:
    if dt is None:
        return "-"
    return dt.astimezone(CST).strftime("%Y-%m-%d %H:%M:%S %Z")


def format_cst(dt: datetime | None) -> str:
    return format_dt(dt)


def epoch_ms_to_dt(value: Any) -> datetime | None:
    if value is None:
        return None
    try:
        raw = int(value)
    except (TypeError, ValueError):
        return None
    # Newer responses sometimes use epoch seconds, older ones use epoch milliseconds.
    timestamp = raw / 1000 if raw > 10**11 else raw
    return datetime.fromtimestamp(timestamp, tz=UTC)


def first_dict(data: Any, *keys: str) -> JSONDict | None:
    for key in keys:
        value = data.get(key) if isinstance(data, dict) else None
        if isinstance(value, dict):
            return value
    return None


def decode_jwt_exp(token: str) -> datetime | None:
    parts = token.split(".")
    if len(parts) != 3:
        return None
    try:
        payload = parts[1]
        payload += "=" * (-len(payload) % 4)
        data = json.loads(base64.urlsafe_b64decode(payload.encode("ascii")))
        exp = data.get("exp")
        if exp is None:
            return None
        return datetime.fromtimestamp(int(exp), tz=UTC)
    except (ValueError, TypeError, json.JSONDecodeError):
        return None


def get_percent_left(value: JSONDict) -> float | int | None:
    percent_left = value.get("percent_left")
    if percent_left is None:
        percent_left = value.get("remaining_percent")
    if percent_left is not None:
        return percent_left

    used_percent = value.get("used_percent")
    try:
        if used_percent is not None:
            return max(0, 100 - float(used_percent))
    except (TypeError, ValueError):
        return None
    return None


def resolve_limit_window(value: JSONDict) -> JSONDict:
    if (
        "reset_at" not in value
        and "reset_time_ms" not in value
        and isinstance(value.get("primary_window"), dict)
    ):
        return value["primary_window"]
    return value


def parse_limit_entry(name: str, value: Any) -> JSONDict | None:
    if not isinstance(value, dict):
        return None

    value = resolve_limit_window(value)

    percent_left = get_percent_left(value)
    reset_time_ms = value.get("reset_time_ms")
    if reset_time_ms is None:
        reset_time_ms = value.get("reset_at")

    window_seconds = value.get("limit_window_seconds")

    return {
        "name": name,
        "percent_left": percent_left,
        "reset_time_ms": reset_time_ms,
        "reset_at": epoch_ms_to_dt(reset_time_ms),
        "limit_window_seconds": window_seconds,
    }


def infer_limit_name(window_seconds: Any) -> str | None:
    if not isinstance(window_seconds, (int, float)):
        return None
    if window_seconds <= 6 * 3600:
        return "five_hour"
    if window_seconds >= 6 * 24 * 3600:
        return "weekly"
    return None


def relabel_rate_limits(primary: JSONDict | None, secondary: JSONDict | None) -> tuple[JSONDict | None, JSONDict | None]:
    for entry in (primary, secondary):
        if not entry:
            continue
        inferred_name = infer_limit_name(entry.get("limit_window_seconds"))
        if inferred_name:
            entry["name"] = inferred_name

    if primary and secondary and primary.get("name") == secondary.get("name"):
        if primary.get("name") == "weekly":
            primary["name"] = "five_hour"
        else:
            secondary["name"] = "weekly"

    if primary and primary.get("name") == "weekly" and secondary is None:
        return None, primary

    if secondary and secondary.get("name") == "five_hour" and primary is None:
        return secondary, None

    return primary, secondary


def parse_rate_limits(data: JSONDict) -> tuple[JSONDict | None, JSONDict | None]:
    primary = None
    secondary = None

    for primary_key in ("five_hour", "five_hour_limit", "five_hour_rate_limit", "primary"):
        if primary_key in data:
            primary = parse_limit_entry("five_hour", data.get(primary_key))
            if primary:
                break

    for secondary_key in ("weekly", "weekly_limit", "weekly_rate_limit", "secondary"):
        if secondary_key in data:
            secondary = parse_limit_entry("weekly", data.get(secondary_key))
            if secondary:
                break

    if primary is None:
        primary = parse_limit_entry("five_hour", data.get("primary_window"))
    if secondary is None:
        secondary = parse_limit_entry("weekly", data.get("secondary_window"))

    return relabel_rate_limits(primary, secondary)


def format_percent(value: Any) -> str:
    return f"{value:g}" if isinstance(value, (int, float)) else str(value if value is not None else "-")


def percent_sort_value(value: Any, descending: bool) -> tuple[int, float]:
    if isinstance(value, (int, float)):
        numeric_value = float(value)
        return (0, -numeric_value if descending else numeric_value)
    return (1, 0.0)


def get_auth_paths(account_dir: str, account_name: str | None) -> list[Path]:
    base_dir = Path(account_dir)
    if account_name:
        return [base_dir / f"{account_name}.auth.json"]
    return sorted(base_dir.glob("*.auth.json"))


def get_account_name_from_path(path: Path) -> str:
    suffix = ".auth.json"
    return path.name[: -len(suffix)] if path.name.endswith(suffix) else path.stem


def build_summary_row(account_name: str, five_hour: JSONDict | None, weekly: JSONDict | None) -> JSONDict:
    return {
        "account": account_name,
        "five_hour_percent": five_hour["percent_left"] if five_hour else None,
        "weekly_percent": weekly["percent_left"] if weekly else None,
        "weekly_reset_at": weekly["reset_at"] if weekly else None,
    }


def print_summary_rows(rows: list[JSONDict]) -> None:
    if not rows:
        return

    sorted_rows = sorted(
        rows,
        key=lambda row: (
            percent_sort_value(row.get("five_hour_percent"), descending=True),
            percent_sort_value(row.get("weekly_percent"), descending=True),
            format_cst(row.get("weekly_reset_at")),
            row["account"],
        ),
    )

    display_rows = []
    for row in sorted_rows:
        display_rows.append(
            {
                "account": str(row["account"]),
                "five_hour": format_percent(row.get("five_hour_percent")),
                "weekly": format_percent(row.get("weekly_percent")),
                "weekly_reset": format_cst(row.get("weekly_reset_at")),
            }
        )

    headers = {
        "account": "account",
        "five_hour": "five_hour%",
        "weekly": "weekly%",
        "weekly_reset": "weekly_reset",
    }
    widths = {
        key: max(len(headers[key]), max(len(item[key]) for item in display_rows))
        for key in headers
    }

    print(
        f"{headers['account']:<{widths['account']}}  "
        f"{headers['five_hour']:>{widths['five_hour']}}  "
        f"{headers['weekly']:>{widths['weekly']}}  "
        f"{headers['weekly_reset']:<{widths['weekly_reset']}}"
    )
    for item in display_rows:
        print(
            f"{item['account']:<{widths['account']}}  "
            f"{item['five_hour']:>{widths['five_hour']}}  "
            f"{item['weekly']:>{widths['weekly']}}  "
            f"{item['weekly_reset']:<{widths['weekly_reset']}}"
        )


def validate_token_inputs(
    token: str,
    account_id: str,
    auth_token: str | None,
    auth_account_id: str | None,
) -> int | None:
    if token.startswith("sess-"):
        print("status: invalid_token_type", file=sys.stderr)
        print(
            "message: --chatgpt-token looks like a session token (sess-...). Use the JWT access_token instead.",
            file=sys.stderr,
        )
        if auth_token:
            print(
                "hint: Found tokens.access_token in auth.json; omit --chatgpt-token or pass that value instead.",
                file=sys.stderr,
            )
        return 1

    token_exp = decode_jwt_exp(token)
    if token_exp is not None and token_exp <= datetime.now(UTC):
        print("status: expired", file=sys.stderr)
        print(f"message: access_token expired at {format_dt(token_exp)}", file=sys.stderr)
        if auth_token and auth_token != token:
            auth_token_exp = decode_jwt_exp(auth_token)
            hint = format_dt(auth_token_exp) if auth_token_exp else "unknown time"
            print(f"hint: auth.json contains a different access_token expiring at {hint}", file=sys.stderr)
        return 1

    if auth_account_id and account_id != auth_account_id:
        print("warning: supplied --account-id does not match auth.json tokens.account_id", file=sys.stderr)

    return None


def handle_error_response(response: requests.Response, data: Any, raw_json: bool) -> int | None:
    if response.status_code == 401:
        print("status: expired", file=sys.stderr)
        print("message: Token 已过期或无效", file=sys.stderr)
        if raw_json:
            print_json(data)
        return 3

    if response.status_code == 403:
        print("status: forbidden", file=sys.stderr)
        print("message: 账号已被封禁或无权访问", file=sys.stderr)
        if raw_json:
            print_json(data)
        return 3

    if response.status_code >= 400:
        print(f"HTTP {response.status_code}", file=sys.stderr)
        print_json(data)
        return 3

    return None


def fetch_chatgpt_usage(auth_path: Path, args: argparse.Namespace) -> tuple[int, JSONDict | None]:
    auth_data = load_auth_json(auth_path)
    account_name = get_account_name_from_path(auth_path)
    auth_token = get_nested_string(auth_data, "tokens", "access_token")
    auth_account_id = get_nested_string(auth_data, "tokens", "account_id")

    if not auth_data:
        print(f"{account_name}: auth file not found or invalid", file=sys.stderr)
        return 1, None

    if not auth_token:
        print(f"{account_name}: missing access_token", file=sys.stderr)
        return 1, None

    if not auth_account_id:
        print(f"{account_name}: missing account_id", file=sys.stderr)
        return 1, None

    validation_error = validate_token_inputs(
        auth_token,
        auth_account_id,
        auth_token,
        auth_account_id,
    )
    if validation_error is not None:
        return validation_error, None

    headers = {
        "Authorization": f"Bearer {auth_token}",
        "Accept": "application/json",
        "ChatGPT-Account-Id": auth_account_id,
        "Origin": "https://chatgpt.com",
        "Referer": "https://chatgpt.com/",
        "User-Agent": "Mozilla/5.0",
    }

    try:
        response = requests.get(args.chatgpt_url, headers=headers, timeout=60)
    except requests.RequestException as exc:
        print(f"Request failed: {exc}", file=sys.stderr)
        return 2, None

    if args.raw_headers:
        print("=== Headers ===")
        print_json(dict(response.headers))
        print()

    try:
        data = response.json()
    except ValueError:
        print(f"HTTP {response.status_code}", file=sys.stderr)
        print(response.text, file=sys.stderr)
        return 3, None

    error_response = handle_error_response(response, data, args.raw_json)
    if error_response is not None:
        return error_response, None

    if args.raw_json:
        print("=== Raw JSON ===")
        print_json(data)
        print()

    rate_limits = first_dict(data, "rate_limit", "rate_limits")

    if rate_limits is None:
        return 0, build_summary_row(account_name, None, None)

    five_hour, weekly = parse_rate_limits(rate_limits)
    return 0, build_summary_row(account_name, five_hour, weekly)


def main() -> None:
    args = parse_args()
    auth_paths = get_auth_paths(args.account_dir, args.account_name)
    if not auth_paths:
        print("No auth files found.", file=sys.stderr)
        sys.exit(1)

    exit_code = 0
    summary_rows: list[JSONDict] = []
    for auth_path in auth_paths:
        current_exit_code, summary_row = fetch_chatgpt_usage(auth_path, args)
        exit_code = max(exit_code, current_exit_code)
        if summary_row is not None and not args.raw_json:
            summary_rows.append(summary_row)

    if summary_rows:
        print_summary_rows(summary_rows)
    sys.exit(exit_code)


if __name__ == "__main__":
    main()
记录并分享
Hugo で構築されています。
テーマ StackJimmy によって設計されています。