diff --git a/mft/cli.py b/mft/cli.py index eb30901..350974f 100644 --- a/mft/cli.py +++ b/mft/cli.py @@ -4,7 +4,9 @@ import argparse import sqlite3 import uvicorn import mft.config -from datetime import datetime +import secrets +import hashlib +from datetime import datetime, timezone from pathlib import Path @@ -49,6 +51,15 @@ def parse_args() -> argparse.Namespace: user_add_parser = user_subparsers.add_parser("add", help="Add a new user") user_add_parser.add_argument("name", type=str, help="User name") + # token (list, add, disable) subcommands + token_parser = subparsers.add_parser("token", help="Token management") + token_subparsers = token_parser.add_subparsers(dest="token_command", required=True) + token_list_parser = token_subparsers.add_parser("list", help="List all tokens") + token_add_parser = token_subparsers.add_parser("add", help="Add a new token") + token_add_parser.add_argument("username", type=str, help="User name for the token") + token_disable_parser = token_subparsers.add_parser("disable", help="Disable a token") + token_disable_parser.add_argument("token_id", type=int, help="Token ID to disable") + return transform_args(parser.parse_args()) @@ -78,6 +89,8 @@ def main(): db_command(args, settings) elif args.command == "user": user_command(args, settings) + elif args.command == "token": + token_command(args, settings) def run_command(args, settings): @@ -207,5 +220,130 @@ def user_add_command(args, settings): sys.exit(1) +def token_command(args, settings): + if args.token_command == "list": + token_list_command(args, settings) + elif args.token_command == "add": + token_add_command(args, settings) + elif args.token_command == "disable": + token_disable_command(args, settings) + + +def token_list_command(args, settings): + from mft.database import get_db + + with get_db() as conn: + cursor = conn.cursor() + cursor.execute( + """ + SELECT + a.id, + u.name, + a.created, + a.enabled, + SUBSTR(a.token, 1, 8) as token_prefix + FROM auth a + JOIN user u ON a.uid = u.id + ORDER BY a.id + """ + ) + tokens = cursor.fetchall() + + if not tokens: + print("No tokens found.") + return + + print(f"{'ID':<5} {'User':<20} {'Created':<30} {'Enabled':<10} {'Token Prefix':<15}") + print("-" * 90) + + for token in tokens: + token_id = token[0] + username = token[1] + created = token[2] + enabled = "Yes" if token[3] else "No" + token_prefix = token[4] + print( + f"{token_id:<5} {username:<20} {created:<30} {enabled:<10} {token_prefix}..." + ) + + print(f"\nTotal tokens: {len(tokens)}") + + +def token_add_command(args, settings): + from mft.database import get_db + + # Generate the token + token = secrets.token_urlsafe(32) + + # Hash the token for storage + token_hash = hashlib.sha256(token.encode()).hexdigest() + + # Get current timestamp + created = datetime.now(timezone.utc).isoformat() + + with get_db() as conn: + cursor = conn.cursor() + + # Check if user exists + cursor.execute("SELECT id FROM user WHERE name = ?", (args.username,)) + user = cursor.fetchone() + + if not user: + print( + f"Error: User '{args.username}' does not exist.", + file=sys.stderr, + ) + sys.exit(1) + + user_id = user[0] + + try: + cursor.execute( + "INSERT INTO auth (uid, created, token) VALUES (?, ?, ?)", + (user_id, created, token_hash), + ) + conn.commit() + token_id = cursor.lastrowid + + print(f"Token created successfully for user '{args.username}'") + print(f"Token ID: {token_id}") + print(f"\n{'='*60}") + print(f"Token (save this, it won't be shown again):") + print(f"{token}") + print(f"{'='*60}\n") + + except sqlite3.IntegrityError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +def token_disable_command(args, settings): + from mft.database import get_db + + with get_db() as conn: + cursor = conn.cursor() + + # Check if token exists + cursor.execute("SELECT enabled FROM auth WHERE id = ?", (args.token_id,)) + token = cursor.fetchone() + + if not token: + print( + f"Error: Token with ID {args.token_id} does not exist.", + file=sys.stderr, + ) + sys.exit(1) + + if not token[0]: + print(f"Token {args.token_id} is already disabled.") + return + + # Disable the token + cursor.execute("UPDATE auth SET enabled = 0 WHERE id = ?", (args.token_id,)) + conn.commit() + + print(f"Token {args.token_id} has been disabled.") + + if __name__ == "__main__": main()