Files
d2warehouse/d2warehouse/app/main.py
2025-10-02 19:19:05 +02:00

360 lines
9.9 KiB
Python

import hashlib
from flask import Flask, redirect, abort, render_template, request
from pathlib import Path
import shutil
from datetime import datetime
import psutil
from d2warehouse.item import Item, Quality, lookup_basetype
from d2warehouse.parser import parse_stash
import d2warehouse.db as base_db
from d2warehouse.app.db import get_db, close_db
import os
import re
from d2warehouse.stash import StashFullError
STASH_FILES = {
"softcore": "SharedStashSoftCoreV2.d2i",
"hardcore": "SharedStashHardCoreV2.d2i",
}
DB_FILES = {
"softcore": "d2warehouse.softcore.sqlite3",
"hardcore": "d2warehouse.hardcore.sqlite3",
}
CURRENCY_RUNES = [f"r{i + 1:02d}" for i in range(33)]
CURRENCY_GEMS = [
"gcv",
"gfv",
"gsv",
"gzv",
"gpv",
"gcb",
"gfb",
"gsb",
"glb",
"gpb",
"gcg",
"gfg",
"gsg",
"glg",
"gpg",
"gcr",
"gfr",
"gsr",
"glr",
"gpr",
"gcw",
"gfw",
"gsw",
"glw",
"gpw",
"gcy",
"gfy",
"gsy",
"gly",
"gpy",
"skc",
"skf",
"sku",
"skl",
"skz",
]
CURRENCY_KEYS = [
"pk1",
"pk2",
"pk3",
"bey",
"mbr",
"dhn",
]
CURRENCY_ESSENCES = [
"tes",
"ceh",
"bet",
"fed",
"toa",
]
def d2_running() -> bool:
for proc in psutil.process_iter():
try:
if proc.cmdline()[0].endswith("D2R.exe"):
return True
except (IndexError, psutil.AccessDenied, psutil.ZombieProcess):
pass
return False
def storage_count(item: Item, stash: str) -> int | str:
"""How many of this item type exist in storage"""
db = get_stash_db(stash)
if item.is_simple:
return db.execute(
"SELECT COUNT(id) FROM item "
"WHERE code = ? AND deleted IS NULL AND socketed_into IS NULL",
(item.code,),
).fetchone()[0]
elif item.quality == Quality.UNIQUE:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id "
"WHERE code = ? AND unique_id = ? AND deleted IS NULL AND socketed_into IS NULL",
(item.code, item.unique_id),
).fetchone()[0]
elif item.quality == Quality.SET:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id "
"WHERE code = ? AND set_id = ? AND deleted IS NULL AND socketed_into IS NULL",
(item.code, item.set_id),
).fetchone()[0]
elif item.is_runeword:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id "
"WHERE code = ? AND runeword_id = ? AND deleted IS NULL and socketed_into IS NULL",
(item.code, item.runeword_id),
).fetchone()[0]
else:
return "N/A"
def save_path() -> Path:
if "D2SAVE_PATH" in os.environ:
path = Path(os.environ["D2SAVE_PATH"])
else:
path = Path.home() / "Saved Games/Diablo II Resurrected"
if not path.exists():
raise RuntimeError(f"Save path `{path}` does not exist")
return path
def get_stash_db(stash):
base_db.set_db_path(str(save_path() / DB_FILES[stash]))
return get_db()
base_db.set_db_path(str(save_path() / DB_FILES["softcore"]))
base_db.init_db()
base_db.close_db()
base_db.set_db_path(str(save_path() / DB_FILES["hardcore"]))
base_db.init_db()
base_db.close_db()
app = Flask(__name__)
app.teardown_appcontext(close_db)
@app.route("/")
def home():
return redirect("/stash/softcore", code=302)
@app.route("/stash/<stash_name>")
def list_stash(stash_name: str):
if stash_name not in STASH_FILES:
abort(404)
path = save_path() / STASH_FILES[stash_name]
stash_data = path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
stash = parse_stash(stash_data)
return render_template(
"list_stash.html",
stash_name=stash_name,
stash=stash,
stash_hash=stash_hash,
storage_count=lambda x: storage_count(x, stash_name),
)
@app.route("/stash/<stash_name>/store", methods=["POST"])
def stash_store_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
if request.form.get("stash_hash") != stash_hash:
return "wrong stash hash", 400
stash = parse_stash(stash_data)
items = []
locs = [y for x in request.form.keys() if (y := re.match(r"item_(\d+)_(\d+)", x))]
for item_location in locs:
tab_idx, item_idx = int(item_location.group(1)), int(item_location.group(2))
if tab_idx > len(stash.tabs) or item_idx > len(stash.tabs[tab_idx].items):
# TODO: Handle this condition
return "invalid position (2)"
item = stash.tabs[tab_idx].items[item_idx]
items.append((tab_idx, item))
backup_stash(stash_name)
for tab_idx, item in items:
stash.tabs[tab_idx].remove(item)
tmp_path.write_bytes(stash.raw())
db = get_stash_db(stash_name)
for _, item in items:
item.write_to_db(db=db)
tmp_path.replace(stash_path)
return redirect(f"/stash/{stash_name}", code=303)
@app.route("/storage/<stash_name>")
def list_storage(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
items = {}
rows = db.execute(
"SELECT id FROM item WHERE deleted IS NULL and socketed_into IS NULL"
).fetchall()
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html",
stash_name=stash_name,
storage_items=items,
storage_count=lambda x: storage_count(x, stash_name),
)
@app.route("/storage/<stash_name>/<category>")
def list_storage_category(stash_name: str, category: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
if category == "uniques":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id "
"WHERE deleted IS NULL AND socketed_into IS NULL AND quality = ?",
(int(Quality.UNIQUE),),
)
elif category == "sets":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id "
"WHERE deleted IS NULL AND socketed_into IS NULL AND quality = ?",
(int(Quality.SET),),
)
elif category == "misc":
q = db.execute(
"SELECT id FROM item "
"WHERE deleted IS NULL AND socketed_into IS NULL AND is_simple = TRUE"
)
else:
return "Unexpected category", 400
rows = q.fetchall()
items = {}
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html",
stash_name=stash_name,
storage_items=items,
category=category,
storage_count=lambda x: storage_count(x, stash_name),
)
def backup_stash(stash_name: str) -> None:
stash_path = save_path() / STASH_FILES[stash_name]
backup_path = save_path() / "backups"
if not backup_path.exists():
backup_path.mkdir(parents=True)
ts = datetime.now().strftime("%Y-%m-%d_%H.%M.%S.%f")[:-3]
backup_path /= f"{ts}_{STASH_FILES[stash_name]}"
shutil.copy(stash_path, backup_path)
@app.route("/storage/<stash_name>/take", methods=["POST"])
def storage_take_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash = parse_stash(stash_data)
backup_stash(stash_name)
# Write items to temporary stash file
db = get_stash_db(stash_name)
ids = [
int(y.group(1))
for x in request.form.keys()
if (y := re.match(r"item_(\d+)", x))
]
for id in ids:
item = Item.load_from_db(id, db=db)
try:
stash.add(item)
except StashFullError:
return "the shared stash does not fit those items", 500
tmp_path.write_bytes(stash.raw())
# Remove items from db
for id in ids:
db.execute("UPDATE item SET deleted = CURRENT_TIMESTAMP WHERE id = ?", (id,))
db.commit()
# Finalize by replacing real stash file
tmp_path.replace(stash_path)
return redirect(f"/storage/{stash_name}", code=303)
def storage_currency_counts(item_codes: list[str], stash_name: str) -> dict:
db = get_stash_db(stash_name)
currencies = {}
for code in item_codes:
currencies[code] = {
"count": db.execute(
"SELECT COUNT(id) FROM item "
"WHERE code = ? AND deleted IS NULL AND socketed_into IS NULL",
(code,),
).fetchone()[0],
"name": lookup_basetype(code)["name"],
}
return currencies
@app.route("/storage/<stash_name>/currency")
def storage_currency(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
runes = storage_currency_counts(CURRENCY_RUNES, stash_name)
gems = storage_currency_counts(CURRENCY_GEMS, stash_name)
keys = storage_currency_counts(CURRENCY_KEYS, stash_name)
essences = storage_currency_counts(CURRENCY_ESSENCES, stash_name)
return render_template(
"currency.html", runes=runes, gems=gems, keys=keys, essences=essences
)