Skip to content

Common Patterns

Reusable patterns for TokenFlow across all implementations.

Token Persistence

Save and Load Tokens

import (
    "encoding/json"
    "os"
)

func saveToken(token *Token, filename string) error {
    data, err := json.Marshal(token)
    if err != nil {
        return err
    }
    return os.WriteFile(filename, data, 0600)
}

func loadToken(filename string) (*Token, error) {
    data, err := os.ReadFile(filename)
    if err != nil {
        return nil, err
    }
    var token Token
    err = json.Unmarshal(data, &token)
    return &token, err
}
import json
import os
from oauth.token import Token

def save_token(token: Token, filename: str):
    with open(filename, "w") as f:
        f.write(token.to_json())
    os.chmod(filename, 0o600)

def load_token(filename: str) -> Token:
    with open(filename, "r") as f:
        data = json.load(f)
    return Token.from_dict(data)
import fs from 'fs';
import { Token } from './token';

function saveToken(token: Token, filename: string): void {
    fs.writeFileSync(filename, JSON.stringify(token), { mode: 0o600 });
}

function loadToken(filename: string): Token {
    const data = JSON.parse(fs.readFileSync(filename, 'utf-8'));
    return Token.fromJSON(data);
}

Token Refresh Logic

Auto-refresh Pattern

Check and refresh tokens automatically:

func getValidToken() (*Token, error) {
    token, err := loadToken("token.json")
    if err != nil || token.IsExpired() {
        // Re-authenticate
        ctx := context.Background()
        deviceCode, err := copilot.InitiateDeviceFlow(ctx)
        if err != nil {
            return nil, err
        }

        fmt.Printf("Visit: %s\n", deviceCode.VerificationURI)
        fmt.Printf("Code: %s\n", deviceCode.UserCode)

        token, err = copilot.PollForToken(ctx, deviceCode, nil)
        if err != nil {
            return nil, err
        }

        saveToken(token, "token.json")
    }
    return token, nil
}
async def get_valid_token() -> Token:
    try:
        token = load_token("token.json")
        if not token.is_expired():
            return token
    except FileNotFoundError:
        pass

    # Re-authenticate
    device_code = await initiate_device_flow()
    print(f"Visit: {device_code.verification_uri}")
    print(f"Code: {device_code.user_code}")

    token = await poll_for_token(device_code)
    save_token(token, "token.json")
    return token
async function getValidToken(): Promise<Token> {
    try {
        const token = loadToken("token.json");
        if (!token.isExpired()) {
            return token;
        }
    } catch (error) {
        // No cached token
    }

    // Re-authenticate
    const deviceCode = await initiateDeviceFlow();
    console.log(`Visit: ${deviceCode.verificationUri}`);
    console.log(`Code: ${deviceCode.userCode}`);

    const token = await pollForToken(deviceCode);
    saveToken(token, "token.json");
    return token;
}

CLI Tool Integration

Command-line Tool Pattern

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"

    "github.com/lichman0405/tokenflow/go/copilot"
)

func main() {
    tokenFile := flag.String("token", "token.json", "Token file path")
    flag.Parse()

    token, err := getValidToken(*tokenFile)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Printf("Valid token: %s\n", token.AccessToken)
}
import argparse
import asyncio

async def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--token", default="token.json")
    args = parser.parse_args()

    token = await get_valid_token()
    print(f"Valid token: {token.access_token}")

if __name__ == "__main__":
    asyncio.run(main())
import { Command } from 'commander';

const program = new Command();

program
    .option('-t, --token <file>', 'Token file path', 'token.json')
    .action(async (options) => {
        const token = await getValidToken();
        console.log(`Valid token: ${token.accessToken}`);
    });

program.parse();

Error Recovery

Retry Logic

Handle transient errors with exponential backoff:

import asyncio

async def retry_with_backoff(func, max_retries=3):
    for attempt in range(max_retries):
        try:
            return await func()
        except (ConnectionError, TimeoutError) as e:
            if attempt == max_retries - 1:
                raise
            wait_time = 2 ** attempt
            await asyncio.sleep(wait_time)

Testing

Mock Authentication

For testing without real OAuth flow:

func MockToken() *Token {
    return &Token{
        AccessToken: "mock_token_for_testing",
        TokenType:   "Bearer",
        ExpiresIn:   3600,
    }
}
def mock_token() -> Token:
    return Token(
        access_token="mock_token_for_testing",
        token_type="Bearer",
        expires_in=3600
    )
function mockToken(): Token {
    return new Token({
        accessToken: "mock_token_for_testing",
        tokenType: "Bearer",
        expiresIn: 3600
    });
}