Add simple flask webapp to move items between stash and database

This commit is contained in:
2023-10-28 05:37:44 +02:00
parent 21df28d7bb
commit 4788d0d641
10 changed files with 437 additions and 2 deletions

View File

@@ -0,0 +1,5 @@
from d2warehouse.app.main import app
__all__ = [
"app",
]

22
d2warehouse/app/db.py Normal file
View File

@@ -0,0 +1,22 @@
import sqlite3
from flask import g
import d2warehouse.db as base_db
def get_db():
if "db" not in g:
print("\n==========\nDB PATH", base_db._path, "\n============\n")
g.db = sqlite3.connect(
base_db._path,
detect_types=sqlite3.PARSE_DECLTYPES,
)
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
db = g.pop("db", None)
if db is not None:
db.close()

259
d2warehouse/app/main.py Normal file
View File

@@ -0,0 +1,259 @@
import hashlib
from flask import Flask, redirect, abort, render_template, request
from pathlib import Path
import shutil
from datetime import datetime
import psutil
from d2warehouse.item import Item, Quality
from d2warehouse.parser import parse_stash
import d2warehouse.db as base_db
from d2warehouse.app.db import get_db, close_db
import os
import re
from d2warehouse.stash import StashFullError
STASH_FILES = {
"softcore": "SharedStashSoftCoreV2.d2i",
"hardcore": "SharedStashHardCoreV2.d2i",
}
DB_FILES = {
"softcore": "d2warehouse.softcore.sqlite3",
"hardcore": "d2warehouse.hardcore.sqlite3",
}
def d2_running() -> bool:
for proc in psutil.process_iter():
try:
if proc.cmdline()[0].endswith("D2R.exe"):
return True
except (IndexError, psutil.AccessDenied):
pass
return False
def storage_count(item: Item, stash: str) -> int | str:
"""How many of this item type exist in storage"""
db = get_stash_db(stash)
if item.is_simple:
return db.execute(
"SELECT COUNT(id) FROM item WHERE code = ?", (item.code,)
).fetchone()[0]
elif item.quality == Quality.UNIQUE:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND unique_id = ?",
(item.code, item.unique_id),
).fetchone()[0]
elif item.quality == Quality.SET:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND set_id = ?",
(item.code, item.set_id),
).fetchone()[0]
elif item.is_runeword:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND runeword_id = ?",
(item.code, item.runeword_id),
).fetchone()[0]
else:
return "N/A"
def save_path() -> Path:
if "D2SAVE_PATH" in os.environ:
path = Path(os.environ["D2SAVE_PATH"])
else:
path = Path.home() / "Saved Games/Diablo II Resurrected"
if not path.exists():
raise RuntimeError(f"Save path `{path}` does not exist")
return path
def get_stash_db(stash):
base_db.set_db_path(str(save_path() / DB_FILES[stash]))
return get_db()
base_db.set_db_path(str(save_path() / DB_FILES["softcore"]))
base_db.init_db()
base_db.close_db()
base_db.set_db_path(str(save_path() / DB_FILES["hardcore"]))
base_db.init_db()
base_db.close_db()
app = Flask(__name__)
app.teardown_appcontext(close_db)
@app.route("/")
def home():
return redirect("/stash/softcore", code=302)
@app.route("/stash/<stash_name>")
def list_stash(stash_name: str):
if stash_name not in STASH_FILES:
abort(404)
path = save_path() / STASH_FILES[stash_name]
stash_data = path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
stash = parse_stash(stash_data)
return render_template(
"list_stash.html",
stash_name=stash_name,
stash=stash,
stash_hash=stash_hash,
storage_count=lambda x: storage_count(x, stash_name),
)
@app.route("/stash/<stash_name>/store", methods=["POST"])
def stash_store_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
if request.form.get("stash_hash") != stash_hash:
return "wrong stash hash", 400
stash = parse_stash(stash_data)
items = []
locs = [y for x in request.form.keys() if (y := re.match(r"item_(\d+)_(\d+)", x))]
for item_location in locs:
tab_idx, item_idx = int(item_location.group(1)), int(item_location.group(2))
if tab_idx > len(stash.tabs) or item_idx > len(stash.tabs[tab_idx].items):
# TODO: Handle this condition
return "invalid position (2)"
item = stash.tabs[tab_idx].items[item_idx]
items.append((tab_idx, item))
backup_stash(stash_name)
for tab_idx, item in items:
stash.tabs[tab_idx].remove(item)
tmp_path.write_bytes(stash.raw())
db = get_stash_db(stash_name)
for _, item in items:
item.write_to_db(db=db)
tmp_path.replace(stash_path)
return redirect(f"/stash/{stash_name}", code=303)
@app.route("/storage/<stash_name>")
def list_storage(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
items = {}
rows = db.execute("SELECT id FROM item WHERE deleted IS NULL").fetchall()
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
@app.route("/storage/<stash_name>/<category>")
def list_storage_category(stash_name: str, category: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
if category == "uniques":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.UNIQUE),),
)
elif category == "sets":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.SET),),
)
elif category == "misc":
q = db.execute("SELECT id FROM item WHERE deleted IS NULL AND is_simple = TRUE")
else:
return "Unexpected category", 400
rows = q.fetchall()
items = {}
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
def backup_stash(stash_name: str) -> None:
stash_path = save_path() / STASH_FILES[stash_name]
backup_path = save_path() / "backups"
if not backup_path.exists():
backup_path.mkdir(parents=True)
ts = datetime.now().strftime("%Y-%m-%d_%H.%M.%S.%f")[:-3]
backup_path /= f"{ts}_{STASH_FILES[stash_name]}"
shutil.copy(stash_path, backup_path)
@app.route("/storage/<stash_name>/take", methods=["POST"])
def storage_take_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash = parse_stash(stash_data)
backup_stash(stash_name)
# Write items to temporary stash file
db = get_stash_db(stash_name)
ids = [
int(y.group(1))
for x in request.form.keys()
if (y := re.match(r"item_(\d+)", x))
]
for id in ids:
item = Item.load_from_db(id, db=db)
try:
stash.add(item)
except StashFullError:
return "the shared stash does not fit those items", 500
tmp_path.write_bytes(stash.raw())
# Remove items from db
for id in ids:
db.execute("UPDATE item SET deleted = CURRENT_TIMESTAMP WHERE id = ?", (id,))
db.commit()
# Finalize by replacing real stash file
tmp_path.replace(stash_path)
return redirect(f"/storage/{stash_name}", code=303)

View File

@@ -0,0 +1,72 @@
body {
background-color: #000;
font-size: large;
font-family: sans-serif;
color: rgb(240, 240, 240);
}
.stash-tab {
display: grid;
grid-template-columns: repeat(5, 1fr);
grid-gap: 10px;
margin: 0 auto;
}
@media (max-width: 1600px) {
.stash-tab {
grid-template-columns: repeat(4, 1fr);
}
}
@media (max-width: 1200px) {
.stash-tab {
grid-template-columns: repeat(3, 1fr);
}
}
@media (max-width: 800px) {
.stash-tab {
grid-template-columns: repeat(2, 1fr);
}
}
.stash-tab input[type="checkbox"] {
display: none;
}
.item .name {
font-weight: bold;
}
.item {
display: block;
background-color: #444;
border: solid #555;
}
input[type="checkbox"]:checked + label {
background-color: #343;
border: solid #464;
}
.color-rare {
color: rgb(255, 255, 100);
}
.color-unique {
color: rgb(199, 179, 119);
}
.color-set {
color: rgb(0, 252, 0);
}
.color-runeword {
color: rgb(199, 179, 119);
}
.raw-item {
max-width: 120px;
background-color: #444;
color: rgb(240, 240, 240);
}

View File

@@ -0,0 +1,16 @@
<input type="checkbox" id="item_{{tabloop.index0}}_{{itemloop.index0}}" name="item_{{tabloop.index0}}_{{itemloop.index0}}" value="remove" />
<label class="item" for="item_{{tabloop.index0}}_{{itemloop.index0}}">
<ul>
<li class="name color-{{item.color}}">{{item.name}}</li>
{% if item.quality and item.quality >= 5 %}
<li class="name color-{{item.color}}">{{item.basename}}</li>
{% endif %}
<li>(in storage: {{storage_count(item)}})</li>
{% if item.stats %}
{% for stat in item.stats %}
<li>{{stat}}</li>
{% endfor %}
{% endif %}
<li><input class="raw-item" type="text" name="raw item" value="{{item.raw().hex()}}" onfocus="this.select()" readonly></li>
</ul>
</label>

View File

@@ -0,0 +1,24 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Shared Stash</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/stash/{{stash_name}}/store" method="POST">
{% for tab in stash.tabs %}
{% set tabloop = loop %}
<h2>Tab {{tabloop.index}}</h2>
<div class="stash-tab">
{% for item in tab.items %}
{% set itemloop = loop %}
{% include "item.html" %}
{% endfor %}
</div>
{% endfor %}
<input type="submit" value="Store items">
<input type="hidden" name="stash_hash" value="{{stash_hash}}" />
</form>
</body>
</html>

View File

@@ -0,0 +1,23 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Storage</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/storage/{{stash_name}}/take" method="POST">
<div>
<!-- TODO: Include item.html -->
{% for db_id, item in storage_items.items() %}
<div>
<input type="checkbox" name="item_{{db_id}}" value="take" />
{{item.name}}
({{db_id}})
</div>
{% endfor %}
</div>
<input type="submit" value="Take items">
</form>
</body>
</html>

View File

@@ -32,8 +32,6 @@ CREATE TABLE item (
);
-- Add an index for "... WHERE deletion IS NULL"
CREATE INDEX item_deletion_partial ON item (deleted) WHERE deleted IS NULL;
-- * nuked: if the item has been removed from storage & user indicated he does not
-- want to count it as potentially in his possession any longer
CREATE TABLE item_extra (
item_id INTEGER PRIMARY KEY,
@@ -65,6 +63,7 @@ CREATE TABLE item_extra (
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_extra_item_id ON item_extra (item_id);
CREATE TABLE item_stat (
id INTEGER PRIMARY KEY,
@@ -77,6 +76,7 @@ CREATE TABLE item_stat (
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_stat_item_id ON item_stat (item_id);
CREATE INDEX item_stat_stat ON item_stat (stat);
CREATE TABLE item_affix (
@@ -87,3 +87,4 @@ CREATE TABLE item_affix (
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_affix_item_id ON item_affix (item_id);

View File

@@ -34,6 +34,17 @@ class Stash:
def raw(self) -> bytes:
return b"".join(tab.raw() for tab in self.tabs)
def add(self, item: Item) -> None:
for tab in self.tabs:
try:
tab.add(item)
return
except StashFullError:
pass
raise StashFullError(
"Could not locate an open spot in the stash to add the item"
)
class StashTab:
def __init__(self) -> None:

View File

@@ -18,6 +18,8 @@ requires-python = ">=3.10"
license = {text = "GPLv3 License"}
dependencies = [
"bitarray",
"flask",
"psutil",
]
dynamic = ["version"]