Add cli commands for token management

This commit is contained in:
2025-12-26 22:40:38 +01:00
parent 3c5e8571c3
commit 4f6e5cd33a

View File

@@ -4,7 +4,9 @@ import argparse
import sqlite3 import sqlite3
import uvicorn import uvicorn
import mft.config import mft.config
from datetime import datetime import secrets
import hashlib
from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
@@ -49,6 +51,15 @@ def parse_args() -> argparse.Namespace:
user_add_parser = user_subparsers.add_parser("add", help="Add a new user") user_add_parser = user_subparsers.add_parser("add", help="Add a new user")
user_add_parser.add_argument("name", type=str, help="User name") user_add_parser.add_argument("name", type=str, help="User name")
# token (list, add, disable) subcommands
token_parser = subparsers.add_parser("token", help="Token management")
token_subparsers = token_parser.add_subparsers(dest="token_command", required=True)
token_list_parser = token_subparsers.add_parser("list", help="List all tokens")
token_add_parser = token_subparsers.add_parser("add", help="Add a new token")
token_add_parser.add_argument("username", type=str, help="User name for the token")
token_disable_parser = token_subparsers.add_parser("disable", help="Disable a token")
token_disable_parser.add_argument("token_id", type=int, help="Token ID to disable")
return transform_args(parser.parse_args()) return transform_args(parser.parse_args())
@@ -78,6 +89,8 @@ def main():
db_command(args, settings) db_command(args, settings)
elif args.command == "user": elif args.command == "user":
user_command(args, settings) user_command(args, settings)
elif args.command == "token":
token_command(args, settings)
def run_command(args, settings): def run_command(args, settings):
@@ -207,5 +220,130 @@ def user_add_command(args, settings):
sys.exit(1) sys.exit(1)
def token_command(args, settings):
if args.token_command == "list":
token_list_command(args, settings)
elif args.token_command == "add":
token_add_command(args, settings)
elif args.token_command == "disable":
token_disable_command(args, settings)
def token_list_command(args, settings):
from mft.database import get_db
with get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT
a.id,
u.name,
a.created,
a.enabled,
SUBSTR(a.token, 1, 8) as token_prefix
FROM auth a
JOIN user u ON a.uid = u.id
ORDER BY a.id
"""
)
tokens = cursor.fetchall()
if not tokens:
print("No tokens found.")
return
print(f"{'ID':<5} {'User':<20} {'Created':<30} {'Enabled':<10} {'Token Prefix':<15}")
print("-" * 90)
for token in tokens:
token_id = token[0]
username = token[1]
created = token[2]
enabled = "Yes" if token[3] else "No"
token_prefix = token[4]
print(
f"{token_id:<5} {username:<20} {created:<30} {enabled:<10} {token_prefix}..."
)
print(f"\nTotal tokens: {len(tokens)}")
def token_add_command(args, settings):
from mft.database import get_db
# Generate the token
token = secrets.token_urlsafe(32)
# Hash the token for storage
token_hash = hashlib.sha256(token.encode()).hexdigest()
# Get current timestamp
created = datetime.now(timezone.utc).isoformat()
with get_db() as conn:
cursor = conn.cursor()
# Check if user exists
cursor.execute("SELECT id FROM user WHERE name = ?", (args.username,))
user = cursor.fetchone()
if not user:
print(
f"Error: User '{args.username}' does not exist.",
file=sys.stderr,
)
sys.exit(1)
user_id = user[0]
try:
cursor.execute(
"INSERT INTO auth (uid, created, token) VALUES (?, ?, ?)",
(user_id, created, token_hash),
)
conn.commit()
token_id = cursor.lastrowid
print(f"Token created successfully for user '{args.username}'")
print(f"Token ID: {token_id}")
print(f"\n{'='*60}")
print(f"Token (save this, it won't be shown again):")
print(f"{token}")
print(f"{'='*60}\n")
except sqlite3.IntegrityError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def token_disable_command(args, settings):
from mft.database import get_db
with get_db() as conn:
cursor = conn.cursor()
# Check if token exists
cursor.execute("SELECT enabled FROM auth WHERE id = ?", (args.token_id,))
token = cursor.fetchone()
if not token:
print(
f"Error: Token with ID {args.token_id} does not exist.",
file=sys.stderr,
)
sys.exit(1)
if not token[0]:
print(f"Token {args.token_id} is already disabled.")
return
# Disable the token
cursor.execute("UPDATE auth SET enabled = 0 WHERE id = ?", (args.token_id,))
conn.commit()
print(f"Token {args.token_id} has been disabled.")
if __name__ == "__main__": if __name__ == "__main__":
main() main()