Compare commits

...

10 Commits

Author SHA1 Message Date
9b288a39dc Add the missing currency template for the currency ui (oops) 2023-10-31 12:08:42 +01:00
8b94171d22 Add stat fire-max for Cathan's Rule 2023-10-31 12:00:41 +01:00
9057f81d5f Add currency display page 2023-10-31 11:44:14 +01:00
2373702a1f fix storage_count to not count items that are removed from storage 2023-10-30 11:05:45 +01:00
4788d0d641 Add simple flask webapp to move items between stash and database 2023-10-29 21:45:55 +01:00
21df28d7bb Split sockets into sockets and socketed_items 2023-10-29 20:54:05 +01:00
423e4368d7 Fix bugs with affixes lookup 2023-10-29 20:48:33 +01:00
5026f58eb8 Fix an issue in rebuilding stash data for socketed items 2023-10-28 06:17:58 +02:00
b73fd9c20e make d2warehouse.db ready for the flask webui
- Add a function to automatically initialize the db with the schema if
   it isn't yet initialized
 - Remove the DROP TABLE queries from the schema in case the above^ goes
   wrong somehow
 - Make Item.write_to_db and Item.load_from_db take a db argument so
   that multiple threads can be supported
 - Add a few properties to d2warehouse.Item to help display items
2023-10-28 06:17:54 +02:00
93b1ad25a6 Add requirements to db write 2023-10-27 18:09:57 +02:00
21 changed files with 3219 additions and 2429 deletions

View File

@@ -11,12 +11,14 @@ with open(os.path.join(path, "magicprefix.txt")) as f:
dr = csv.DictReader(f, delimiter="\t") dr = csv.DictReader(f, delimiter="\t")
index = 0 index = 0
for row in dr: for row in dr:
if row["Name"] == "Expansion":
continue
index += 1 index += 1
if len(row["Name"]) == 0: if len(row["Name"]) == 0:
continue continue
affixes["prefixes"][index] = { affixes["prefixes"][index] = {
"name": row["Name"], "name": row["Name"],
"req_lvl": row["levelreq"], "req_lvl": 0 if len(row["levelreq"]) == 0 else int(row["levelreq"]),
"req_class": None if len(row["class"]) == 0 else row["class"], "req_class": None if len(row["class"]) == 0 else row["class"],
} }
@@ -24,12 +26,14 @@ with open(os.path.join(path, "magicsuffix.txt")) as f:
dr = csv.DictReader(f, delimiter="\t") dr = csv.DictReader(f, delimiter="\t")
index = 0 index = 0
for row in dr: for row in dr:
if row["Name"] == "Expansion":
continue
index += 1 index += 1
if len(row["Name"]) == 0: if len(row["Name"]) == 0:
continue continue
affixes["suffixes"][index] = { affixes["suffixes"][index] = {
"name": row["Name"], "name": row["Name"],
"req_lvl": row["levelreq"], "req_lvl": 0 if len(row["levelreq"]) == 0 else int(row["levelreq"]),
"req_class": None if len(row["class"]) == 0 else row["class"], "req_class": None if len(row["class"]) == 0 else row["class"],
} }

View File

@@ -13,7 +13,9 @@ path = sys.argv[1] if len(sys.argv) >= 2 else "."
# into one. Same applies for all stats. # into one. Same applies for all stats.
special_stats = { special_stats = {
"firemindam": {"template": "dmg-fire"}, "firemindam": {"template": "dmg-fire"},
"firemaxdam": None, "firemaxdam": {
"template": "fire-max"
}, # Cathan's rule, no other max ele dmg source exists
"lightmindam": {"template": "dmg-ltng"}, "lightmindam": {"template": "dmg-ltng"},
"lightmaxdam": None, "lightmaxdam": None,
"magicmindam": {"template": "dmg-mag"}, "magicmindam": {"template": "dmg-mag"},

View File

@@ -0,0 +1,5 @@
from d2warehouse.app.main import app
__all__ = [
"app",
]

22
d2warehouse/app/db.py Normal file
View File

@@ -0,0 +1,22 @@
import sqlite3
from flask import g
import d2warehouse.db as base_db
def get_db():
if "db" not in g:
print("\n==========\nDB PATH", base_db._path, "\n============\n")
g.db = sqlite3.connect(
base_db._path,
detect_types=sqlite3.PARSE_DECLTYPES,
)
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
db = g.pop("db", None)
if db is not None:
db.close()

339
d2warehouse/app/main.py Normal file
View File

@@ -0,0 +1,339 @@
import hashlib
from flask import Flask, redirect, abort, render_template, request
from pathlib import Path
import shutil
from datetime import datetime
import psutil
from d2warehouse.item import Item, Quality, lookup_basetype
from d2warehouse.parser import parse_stash
import d2warehouse.db as base_db
from d2warehouse.app.db import get_db, close_db
import os
import re
from d2warehouse.stash import StashFullError
STASH_FILES = {
"softcore": "SharedStashSoftCoreV2.d2i",
"hardcore": "SharedStashHardCoreV2.d2i",
}
DB_FILES = {
"softcore": "d2warehouse.softcore.sqlite3",
"hardcore": "d2warehouse.hardcore.sqlite3",
}
CURRENCY_RUNES = [f"r{i + 1:02d}" for i in range(33)]
CURRENCY_GEMS = [
"gcv",
"gfv",
"gsv",
"gzv",
"gpv",
"gcb",
"gfb",
"gsb",
"glb",
"gpb",
"gcg",
"gfg",
"gsg",
"glg",
"gpg",
"gcr",
"gfr",
"gsr",
"glr",
"gpr",
"gcw",
"gfw",
"gsw",
"glw",
"gpw",
"gcy",
"gfy",
"gsy",
"gly",
"gpy",
"skc",
"skf",
"sku",
"skl",
"skz",
]
CURRENCY_KEYS = [
"pk1",
"pk2",
"pk3",
"bey",
"mbr",
"dhn",
]
CURRENCY_ESSENCES = [
"tes",
"ceh",
"bet",
"fed",
"toa",
]
def d2_running() -> bool:
for proc in psutil.process_iter():
try:
if proc.cmdline()[0].endswith("D2R.exe"):
return True
except (IndexError, psutil.AccessDenied):
pass
return False
def storage_count(item: Item, stash: str) -> int | str:
"""How many of this item type exist in storage"""
db = get_stash_db(stash)
if item.is_simple:
return db.execute(
"SELECT COUNT(id) FROM item WHERE code = ? AND deleted IS NULL",
(item.code,),
).fetchone()[0]
elif item.quality == Quality.UNIQUE:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND unique_id = ? AND deleted IS NULL",
(item.code, item.unique_id),
).fetchone()[0]
elif item.quality == Quality.SET:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND set_id = ? AND deleted IS NULL",
(item.code, item.set_id),
).fetchone()[0]
elif item.is_runeword:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND runeword_id = ? AND deleted IS NULL",
(item.code, item.runeword_id),
).fetchone()[0]
else:
return "N/A"
def save_path() -> Path:
if "D2SAVE_PATH" in os.environ:
path = Path(os.environ["D2SAVE_PATH"])
else:
path = Path.home() / "Saved Games/Diablo II Resurrected"
if not path.exists():
raise RuntimeError(f"Save path `{path}` does not exist")
return path
def get_stash_db(stash):
base_db.set_db_path(str(save_path() / DB_FILES[stash]))
return get_db()
base_db.set_db_path(str(save_path() / DB_FILES["softcore"]))
base_db.init_db()
base_db.close_db()
base_db.set_db_path(str(save_path() / DB_FILES["hardcore"]))
base_db.init_db()
base_db.close_db()
app = Flask(__name__)
app.teardown_appcontext(close_db)
@app.route("/")
def home():
return redirect("/stash/softcore", code=302)
@app.route("/stash/<stash_name>")
def list_stash(stash_name: str):
if stash_name not in STASH_FILES:
abort(404)
path = save_path() / STASH_FILES[stash_name]
stash_data = path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
stash = parse_stash(stash_data)
return render_template(
"list_stash.html",
stash_name=stash_name,
stash=stash,
stash_hash=stash_hash,
storage_count=lambda x: storage_count(x, stash_name),
)
@app.route("/stash/<stash_name>/store", methods=["POST"])
def stash_store_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
if request.form.get("stash_hash") != stash_hash:
return "wrong stash hash", 400
stash = parse_stash(stash_data)
items = []
locs = [y for x in request.form.keys() if (y := re.match(r"item_(\d+)_(\d+)", x))]
for item_location in locs:
tab_idx, item_idx = int(item_location.group(1)), int(item_location.group(2))
if tab_idx > len(stash.tabs) or item_idx > len(stash.tabs[tab_idx].items):
# TODO: Handle this condition
return "invalid position (2)"
item = stash.tabs[tab_idx].items[item_idx]
items.append((tab_idx, item))
backup_stash(stash_name)
for tab_idx, item in items:
stash.tabs[tab_idx].remove(item)
tmp_path.write_bytes(stash.raw())
db = get_stash_db(stash_name)
for _, item in items:
item.write_to_db(db=db)
tmp_path.replace(stash_path)
return redirect(f"/stash/{stash_name}", code=303)
@app.route("/storage/<stash_name>")
def list_storage(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
items = {}
rows = db.execute("SELECT id FROM item WHERE deleted IS NULL").fetchall()
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
@app.route("/storage/<stash_name>/<category>")
def list_storage_category(stash_name: str, category: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
if category == "uniques":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.UNIQUE),),
)
elif category == "sets":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.SET),),
)
elif category == "misc":
q = db.execute("SELECT id FROM item WHERE deleted IS NULL AND is_simple = TRUE")
else:
return "Unexpected category", 400
rows = q.fetchall()
items = {}
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
def backup_stash(stash_name: str) -> None:
stash_path = save_path() / STASH_FILES[stash_name]
backup_path = save_path() / "backups"
if not backup_path.exists():
backup_path.mkdir(parents=True)
ts = datetime.now().strftime("%Y-%m-%d_%H.%M.%S.%f")[:-3]
backup_path /= f"{ts}_{STASH_FILES[stash_name]}"
shutil.copy(stash_path, backup_path)
@app.route("/storage/<stash_name>/take", methods=["POST"])
def storage_take_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash = parse_stash(stash_data)
backup_stash(stash_name)
# Write items to temporary stash file
db = get_stash_db(stash_name)
ids = [
int(y.group(1))
for x in request.form.keys()
if (y := re.match(r"item_(\d+)", x))
]
for id in ids:
item = Item.load_from_db(id, db=db)
try:
stash.add(item)
except StashFullError:
return "the shared stash does not fit those items", 500
tmp_path.write_bytes(stash.raw())
# Remove items from db
for id in ids:
db.execute("UPDATE item SET deleted = CURRENT_TIMESTAMP WHERE id = ?", (id,))
db.commit()
# Finalize by replacing real stash file
tmp_path.replace(stash_path)
return redirect(f"/storage/{stash_name}", code=303)
def storage_currency_counts(item_codes: list[str], stash_name: str) -> dict:
db = get_stash_db(stash_name)
currencies = {}
for code in item_codes:
currencies[code] = {
"count": db.execute(
"SELECT COUNT(id) FROM item WHERE code = ?", (code,)
).fetchone()[0],
"name": lookup_basetype(code)["name"],
}
return currencies
@app.route("/storage/<stash_name>/currency")
def storage_currency(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
runes = storage_currency_counts(CURRENCY_RUNES, stash_name)
gems = storage_currency_counts(CURRENCY_GEMS, stash_name)
keys = storage_currency_counts(CURRENCY_KEYS, stash_name)
essences = storage_currency_counts(CURRENCY_ESSENCES, stash_name)
return render_template(
"currency.html", runes=runes, gems=gems, keys=keys, essences=essences
)

View File

@@ -0,0 +1,85 @@
body {
background-color: #000;
font-size: large;
font-family: sans-serif;
color: rgb(240, 240, 240);
}
.stash-tab {
display: grid;
grid-template-columns: repeat(5, 1fr);
grid-gap: 10px;
margin: 0 auto;
}
.currencies {
display: flex;
gap: 50px
}
.currencies th {
text-align: left;
}
.currencies td {
text-align: right;
}
@media (max-width: 1600px) {
.stash-tab {
grid-template-columns: repeat(4, 1fr);
}
}
@media (max-width: 1200px) {
.stash-tab {
grid-template-columns: repeat(3, 1fr);
}
}
@media (max-width: 800px) {
.stash-tab {
grid-template-columns: repeat(2, 1fr);
}
}
.stash-tab input[type="checkbox"] {
display: none;
}
.item .name {
font-weight: bold;
}
.item {
display: block;
background-color: #444;
border: solid #555;
}
input[type="checkbox"]:checked + label {
background-color: #343;
border: solid #464;
}
.color-rare {
color: rgb(255, 255, 100);
}
.color-unique {
color: rgb(199, 179, 119);
}
.color-set {
color: rgb(0, 252, 0);
}
.color-runeword {
color: rgb(199, 179, 119);
}
.raw-item {
max-width: 120px;
background-color: #444;
color: rgb(240, 240, 240);
}

View File

@@ -0,0 +1,53 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Shared Stash</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<div class="currencies">
<div>
<table>
{% for code,currency in runes.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in gems.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in keys.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in essences.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
</div>
</body>
</html>

View File

@@ -0,0 +1,16 @@
<input type="checkbox" id="item_{{tabloop.index0}}_{{itemloop.index0}}" name="item_{{tabloop.index0}}_{{itemloop.index0}}" value="remove" />
<label class="item" for="item_{{tabloop.index0}}_{{itemloop.index0}}">
<ul>
<li class="name color-{{item.color}}">{{item.name}}</li>
{% if item.quality and item.quality >= 5 %}
<li class="name color-{{item.color}}">{{item.basename}}</li>
{% endif %}
<li>(in storage: {{storage_count(item)}})</li>
{% if item.stats %}
{% for stat in item.stats %}
<li>{{stat}}</li>
{% endfor %}
{% endif %}
<li><input class="raw-item" type="text" name="raw item" value="{{item.raw().hex()}}" onfocus="this.select()" readonly></li>
</ul>
</label>

View File

@@ -0,0 +1,24 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Shared Stash</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/stash/{{stash_name}}/store" method="POST">
{% for tab in stash.tabs %}
{% set tabloop = loop %}
<h2>Tab {{tabloop.index}}</h2>
<div class="stash-tab">
{% for item in tab.items %}
{% set itemloop = loop %}
{% include "item.html" %}
{% endfor %}
</div>
{% endfor %}
<input type="submit" value="Store items">
<input type="hidden" name="stash_hash" value="{{stash_hash}}" />
</form>
</body>
</html>

View File

@@ -0,0 +1,23 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Storage</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/storage/{{stash_name}}/take" method="POST">
<div>
<!-- TODO: Include item.html -->
{% for db_id, item in storage_items.items() %}
<div>
<input type="checkbox" name="item_{{db_id}}" value="take" />
{{item.name}}
({{db_id}})
</div>
{% endfor %}
</div>
<input type="submit" value="Take items">
</form>
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@@ -273,6 +273,14 @@
"save_add": 0, "save_add": 0,
"save_param_bits": null "save_param_bits": null
}, },
"49": {
"text": "+# to Maximum Fire Damage",
"save_bits": [
9
],
"save_add": 0,
"save_param_bits": null
},
"50": { "50": {
"text": "Adds #-# Lightning Damage", "text": "Adds #-# Lightning Damage",
"save_bits": [ "save_bits": [

View File

@@ -32,6 +32,13 @@ def close_db() -> None:
_db = None _db = None
def init_db() -> None:
db = get_db()
count = db.execute("SELECT count(*) FROM sqlite_master").fetchone()[0]
if count == 0:
create_db()
def create_db() -> None: def create_db() -> None:
db = get_db() db = get_db()
with open(_schema_path, encoding="utf-8") as f: with open(_schema_path, encoding="utf-8") as f:

View File

@@ -17,7 +17,6 @@
import json import json
import os import os
import re import re
from typing import Optional
from bitarray import bitarray from bitarray import bitarray
from bitarray.util import int2ba from bitarray.util import int2ba
from dataclasses import dataclass from dataclasses import dataclass
@@ -31,6 +30,7 @@ _stats_map = None
_unique_map = None _unique_map = None
_set_item_map = None _set_item_map = None
_runeword_map = None _runeword_map = None
_affix_map = None
class Quality(IntEnum): class Quality(IntEnum):
@@ -118,10 +118,74 @@ class Item:
defense: int | None = None defense: int | None = None
durability: int | None = None durability: int | None = None
max_durability: int | None = None max_durability: int | None = None
sockets: list[Optional["Item"]] | None = None sockets: int | None = None
socketed_items: list["Item"] | None = None
quantity: int | None = None quantity: int | None = None
stats: list[Stat] | None = None stats: list[Stat] | None = None
@property
def basename(self) -> str:
return lookup_basetype(self.code)["name"]
@property
def name(self) -> str:
match self.quality:
case Quality.LOW:
return f"{self.low_quality} {self.basename}"
case Quality.NORMAL | None:
return self.basename
case Quality.HIGH:
return f"Superior {self.basename}"
case Quality.MAGIC:
return f"<prefix> {self.basename} <suffix>"
case Quality.SET:
assert self.set_id is not None
return lookup_set_item(self.set_id)["name"]
case Quality.RARE:
# FIXME
return "<rare> <name>"
case Quality.UNIQUE:
assert self.unique_id is not None
return lookup_unique(self.unique_id)["name"]
case Quality.CRAFTED:
# FIXME
return "<crafted> <name>"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
@property
def color(self) -> str:
if self.is_runeword:
return "runeword"
match self.quality:
case Quality.LOW:
return "low"
case Quality.NORMAL | None:
return "normal"
case Quality.HIGH:
return "normal"
case Quality.MAGIC:
return "magic"
case Quality.SET:
return "set"
case Quality.RARE:
return "rare"
case Quality.UNIQUE:
return "unique"
case Quality.CRAFTED:
return "crafted"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
def raw(self):
parts = [self.raw_data]
if self.socketed_items:
for item in self.socketed_items:
parts.append(item.raw_data)
return b"".join(parts)
def print(self, indent=5, with_raw=False): def print(self, indent=5, with_raw=False):
properties = [] properties = []
base_name = lookup_basetype(self.code)["name"] base_name = lookup_basetype(self.code)["name"]
@@ -168,12 +232,9 @@ class Item:
f"Durability: {self.durability} out of {self.max_durability}", f"Durability: {self.durability} out of {self.max_durability}",
) )
if self.is_socketed: if self.is_socketed:
print(" " * indent, f"{len(self.sockets)} sockets:") print(" " * indent, f"{len(self.socketed_items)}/{self.sockets} sockets:")
for socket in self.sockets: for socket in self.socketed_items:
if socket: socket.print(indent + 4)
socket.print(indent + 4)
else:
print(" " * (indent + 4), "Empty")
if self.quantity: if self.quantity:
print(" " * indent, f"Quantity: {self.quantity}") print(" " * indent, f"Quantity: {self.quantity}")
if self.stats: if self.stats:
@@ -204,7 +265,42 @@ class Item:
base = lookup_basetype(self.code) base = lookup_basetype(self.code)
return base["width"], base["height"] return base["width"], base["height"]
def write_to_db(self, socketed_into=None, commit=True) -> int: def requirements(self) -> dict:
base = lookup_basetype(self.code)
# TODO: How is class requirements determined on basetypes?
reqs = {
"lvl": base["req_lvl"],
"dex": base["req_dex"],
"str": base["req_str"],
"class": None,
}
# TODO: Can implicit mod end up increasing requirements?
if self.quality == Quality.UNIQUE:
reqs["lvl"] = lookup_unique(self.unique_id)["req_lvl"]
elif self.quality == Quality.SET:
reqs["lvl"] = lookup_set_item(self.set_id)["req_lvl"]
elif self.is_runeword:
# TODO: What affects runeword level? Only the sockets?
pass
elif self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for m, id in [(False, id) for id in self.suffixes] + [
(True, id) for id in self.prefixes
]:
if id == 0:
continue
affix = lookup_affix(id, m)
reqs["lvl"] = max(reqs["lvl"], affix["req_lvl"])
if affix["req_class"]:
reqs["class"] = affix["req_class"]
if self.socketed_items:
for socket_item in self.socketed_items:
socket_reqs = socket_item.requirements()
reqs["lvl"] = max(reqs["lvl"], socket_reqs["lvl"])
return reqs
def write_to_db(self, socketed_into=None, commit=True, db=None) -> int:
if db is None:
db = get_db()
name = lookup_basetype(self.code)["name"] name = lookup_basetype(self.code)["name"]
# FIXME: handle magic & rare names # FIXME: handle magic & rare names
if self.is_runeword: if self.is_runeword:
@@ -218,13 +314,13 @@ class Item:
lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None
) )
db = get_db() req = self.requirements()
cur = db.cursor() cur = db.cursor()
cur.execute( cur.execute(
"""INSERT INTO item (itembase_name, socketed_into, raw_data, raw_version, """INSERT INTO item (itembase_name, socketed_into, raw_data, raw_version,
is_identified, is_socketed, is_beginner, is_simple, is_ethereal, is_identified, is_socketed, is_beginner, is_simple, is_ethereal,
is_personalized, is_runeword, code) is_personalized, is_runeword, code, req_lvl)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
( (
lookup_basetype(self.code)["name"], lookup_basetype(self.code)["name"],
socketed_into, socketed_into,
@@ -238,38 +334,46 @@ class Item:
self.is_personalized, self.is_personalized,
self.is_runeword, self.is_runeword,
self.code, self.code,
req["lvl"],
), ),
) )
item_id = cur.lastrowid item_id = cur.lastrowid
cur.execute( if not self.is_simple:
"""INSERT INTO item_extra (item_id, item_name, cur.execute(
set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id, """INSERT INTO item_extra (item_id, item_name,
unique_id, nameword1, nameword2, runeword_id, personal_name, set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id,
defense, durability, max_durability, quantity) unique_id, nameword1, nameword2, runeword_id, personal_name,
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", defense, durability, max_durability, sockets, quantity, req_str,
( req_dex, req_class)
item_id, VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
name, ?, ?, ?)""",
set_name, (
self.uid, item_id,
self.lvl, name,
int(self.quality) if self.quality else None, set_name,
self.graphic, self.uid,
self.implicit, self.lvl,
int(self.low_quality) if self.low_quality else None, int(self.quality) if self.quality else None,
self.set_id, self.graphic,
self.unique_id, self.implicit,
self.nameword1, int(self.low_quality) if self.low_quality else None,
self.nameword2, self.set_id,
self.runeword_id, self.unique_id,
self.personal_name, self.nameword1,
self.defense, self.nameword2,
self.durability, self.runeword_id,
self.max_durability, self.personal_name,
self.quantity, self.defense,
), self.durability,
) self.max_durability,
self.sockets,
self.quantity,
req["str"],
req["dex"],
req["class"],
),
)
if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]: if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for prefix, id in [(False, id) for id in self.suffixes] + [ for prefix, id in [(False, id) for id in self.suffixes] + [
@@ -296,24 +400,25 @@ class Item:
), ),
) )
if self.sockets: if self.socketed_items:
for socket in self.sockets: for socket_item in self.socketed_items:
socket.write_to_db(socketed_into=item_id, commit=False) socket_item.write_to_db(socketed_into=item_id, commit=False, db=db)
if commit: if commit:
db.commit() db.commit()
return item_id return item_id
def load_from_db(id: int) -> "Item": def load_from_db(id: int, db=None) -> "Item":
db = get_db() if db is None:
db = get_db()
row = db.execute( row = db.execute(
"""SELECT raw_data, raw_version, is_identified, is_socketed, """SELECT raw_data, raw_version, is_identified, is_socketed,
is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code, is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code,
uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id, uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id,
nameword1, nameword2, runeword_id, personal_name, defense, durability, nameword1, nameword2, runeword_id, personal_name, defense, durability,
max_durability, quantity max_durability, sockets, quantity
FROM item INNER JOIN item_extra ON id = item_id WHERE id = ?""", FROM item LEFT JOIN item_extra ON id = item_id WHERE id = ?""",
(id,), (id,),
).fetchone() ).fetchone()
if row["raw_version"] != STASH_TAB_VERSION: if row["raw_version"] != STASH_TAB_VERSION:
@@ -349,6 +454,7 @@ class Item:
defense=row["defense"], defense=row["defense"],
durability=row["durability"], durability=row["durability"],
max_durability=row["max_durability"], max_durability=row["max_durability"],
sockets=row["sockets"],
quantity=row["quantity"], quantity=row["quantity"],
) )
@@ -364,15 +470,16 @@ class Item:
else: else:
item.suffixes.append(row["affix_id"]) item.suffixes.append(row["affix_id"])
rows = db.execute( if item.is_socketed:
"SELECT id FROM item WHERE socketed_into = ?", (id,) item.socketed_items = []
).fetchall() rows = db.execute(
if len(rows) > 0: "SELECT id FROM item WHERE socketed_into = ?", (id,)
item.sockets = [] ).fetchall()
for row in rows: if len(rows) > 0:
socket = Item.load_from_db(row["id"]) for row in rows:
socket.pos_x = len(item.sockets) socket_item = Item.load_from_db(row["id"], db=db)
item.sockets.append(socket) socket_item.pos_x = len(item.socketed_items)
item.socketed_items.append(socket_item)
rows = db.execute( rows = db.execute(
"SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?", "SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?",
@@ -431,3 +538,11 @@ def lookup_runeword(id: int) -> dict:
with open(os.path.join(_data_path, "runewords.json")) as f: with open(os.path.join(_data_path, "runewords.json")) as f:
_runeword_map = json.load(f) _runeword_map = json.load(f)
return _runeword_map[str(id)] return _runeword_map[str(id)]
def lookup_affix(id: int, prefix: bool) -> dict:
global _affix_map
if _affix_map is None:
with open(os.path.join(_data_path, "affixes.json")) as f:
_affix_map = json.load(f)
return _affix_map["prefixes" if prefix else "suffixes"][str(id)]

View File

@@ -179,7 +179,8 @@ def parse_item(data: bytes) -> tuple[bytes, Item]:
itembase_end += personalized_end itembase_end += personalized_end
if item.is_socketed: if item.is_socketed:
sockets_count = ba2int(bits[itembase_end : itembase_end + 4]) item.sockets = ba2int(bits[itembase_end : itembase_end + 4])
item.socketed_items = []
sockets_end = itembase_end + 4 sockets_end = itembase_end + 4
else: else:
sockets_end = itembase_end sockets_end = itembase_end
@@ -194,10 +195,9 @@ def parse_item(data: bytes) -> tuple[bytes, Item]:
# Parse out sockets if any exist on the item # Parse out sockets if any exist on the item
if item.is_socketed: if item.is_socketed:
item.sockets = [None] * sockets_count
for i in range(0, filled_sockets): for i in range(0, filled_sockets):
remaining_data, socket = parse_item(remaining_data) remaining_data, socket_item = parse_item(remaining_data)
item.sockets[i] = socket item.socketed_items.append(socket_item)
return remaining_data, item return remaining_data, item
@@ -268,14 +268,20 @@ def parse_set_data(bits: bitarray, item: Item) -> tuple[Item, int]:
def parse_rare_data(bits: bitarray, item: Item) -> tuple[Item, int]: def parse_rare_data(bits: bitarray, item: Item) -> tuple[Item, int]:
item.nameword1 = ba2int(bits[0:8]) item.nameword1 = ba2int(bits[0:8])
item.nameword2 = ba2int(bits[8:16]) item.nameword2 = ba2int(bits[8:16])
affixes = [] item.prefixes = []
item.suffixes = []
ptr = 16 ptr = 16
for _ in range(0, 6): for _ in range(0, 3):
# Prefix
(affix, sz) = parse_affix(bits[ptr:]) (affix, sz) = parse_affix(bits[ptr:])
ptr += sz ptr += sz
affixes.append(affix) if affix:
item.prefixes = [affix for affix in affixes[0:3] if affix is not None] item.prefixes.append(affix)
item.suffixes = [affix for affix in affixes[3:6] if affix is not None] # Suffix
(affix, sz) = parse_affix(bits[ptr:])
ptr += sz
if affix:
item.suffixes.append(affix)
return item, ptr return item, ptr

View File

@@ -1,7 +1,7 @@
DROP TABLE IF EXISTS item_stat; --DROP TABLE IF EXISTS item_stat;
DROP TABLE IF EXISTS item_affix; --DROP TABLE IF EXISTS item_affix;
DROP TABLE IF EXISTS item_extra; --DROP TABLE IF EXISTS item_extra;
DROP TABLE IF EXISTS item; --DROP TABLE IF EXISTS item;
CREATE TABLE item ( CREATE TABLE item (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -14,6 +14,7 @@ CREATE TABLE item (
-- items have two names: the base name and item_extra.item_name. -- items have two names: the base name and item_extra.item_name.
itembase_name TEXT NOT NULL, itembase_name TEXT NOT NULL,
socketed_into INTEGER DEFAULT NULL, socketed_into INTEGER DEFAULT NULL,
req_lvl INTEGER DEFAULT 0,
-- The following fields match the fields of the item object in item.py -- The following fields match the fields of the item object in item.py
raw_data BLOB NOT NULL, raw_data BLOB NOT NULL,
@@ -31,13 +32,14 @@ CREATE TABLE item (
); );
-- Add an index for "... WHERE deletion IS NULL" -- Add an index for "... WHERE deletion IS NULL"
CREATE INDEX item_deletion_partial ON item (deleted) WHERE deleted IS NULL; CREATE INDEX item_deletion_partial ON item (deleted) WHERE deleted IS NULL;
-- * nuked: if the item has been removed from storage & user indicated he does not
-- want to count it as potentially in his possession any longer
CREATE TABLE item_extra ( CREATE TABLE item_extra (
item_id INTEGER PRIMARY KEY, item_id INTEGER PRIMARY KEY,
item_name TEXT DEFAULT NULL, item_name TEXT DEFAULT NULL,
set_name TEXT DEFAULT NULL, set_name TEXT DEFAULT NULL,
req_str INTEGER DEFAULT 0,
req_dex INTEGER DEFAULT 0,
req_class TEXT DEFAULT NULL,
-- The following fields match the fields of the item object in item.py -- The following fields match the fields of the item object in item.py
uid INTEGER DEFAULT NULL, uid INTEGER DEFAULT NULL,
@@ -55,12 +57,13 @@ CREATE TABLE item_extra (
defense INTEGER DEFAULT NULL, defense INTEGER DEFAULT NULL,
durability INTEGER DEFAULT NULL, durability INTEGER DEFAULT NULL,
max_durability INTEGER DEFAULT NULL, max_durability INTEGER DEFAULT NULL,
-- sockets: list[Optional["Item"]] | None = None => see item.socketed_into sockets INTEGER DEFAULT NULL, -- number of sockets; see item.socketed_into
quantity INTEGER DEFAULT NULL, quantity INTEGER DEFAULT NULL,
-- stats: list[Stat] | None = None => see table 'item_stat' -- stats: list[Stat] | None = None => see table 'item_stat'
FOREIGN KEY (item_id) REFERENCES item (id) FOREIGN KEY (item_id) REFERENCES item (id)
); );
CREATE INDEX item_extra_item_id ON item_extra (item_id);
CREATE TABLE item_stat ( CREATE TABLE item_stat (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
@@ -73,6 +76,7 @@ CREATE TABLE item_stat (
FOREIGN KEY (item_id) REFERENCES item (id) FOREIGN KEY (item_id) REFERENCES item (id)
); );
CREATE INDEX item_stat_item_id ON item_stat (item_id);
CREATE INDEX item_stat_stat ON item_stat (stat); CREATE INDEX item_stat_stat ON item_stat (stat);
CREATE TABLE item_affix ( CREATE TABLE item_affix (
@@ -83,3 +87,4 @@ CREATE TABLE item_affix (
FOREIGN KEY (item_id) REFERENCES item (id) FOREIGN KEY (item_id) REFERENCES item (id)
); );
CREATE INDEX item_affix_item_id ON item_affix (item_id);

View File

@@ -34,6 +34,17 @@ class Stash:
def raw(self) -> bytes: def raw(self) -> bytes:
return b"".join(tab.raw() for tab in self.tabs) return b"".join(tab.raw() for tab in self.tabs)
def add(self, item: Item) -> None:
for tab in self.tabs:
try:
tab.add(item)
return
except StashFullError:
pass
raise StashFullError(
"Could not locate an open spot in the stash to add the item"
)
class StashTab: class StashTab:
def __init__(self) -> None: def __init__(self) -> None:
@@ -44,7 +55,7 @@ class StashTab:
def raw(self) -> bytes: def raw(self) -> bytes:
"""Get the computed raw representation of the stash""" """Get the computed raw representation of the stash"""
item_raw = b"".join(item.raw_data for item in self.items) item_raw = b"".join(item.raw() for item in self.items)
raw_length = len(item_raw) + 0x44 raw_length = len(item_raw) + 0x44
return ( return (
STASH_TAB_MAGIC STASH_TAB_MAGIC

View File

@@ -2,7 +2,7 @@ import os
import tempfile import tempfile
import unittest import unittest
from d2warehouse.db import close_db, create_db, set_db_path from d2warehouse.db import close_db, create_db, get_db, set_db_path
from d2warehouse.item import Item from d2warehouse.item import Item
from d2warehouse.parser import parse_item from d2warehouse.parser import parse_item
@@ -26,6 +26,30 @@ class DbTest(unittest.TestCase):
) )
_, item = parse_item(data) _, item = parse_item(data)
db_id = item.write_to_db() db = get_db()
loaded_item = Item.load_from_db(db_id) db_id = item.write_to_db(db=db)
loaded_item = Item.load_from_db(db_id, db=db)
self.assertEqual(item, loaded_item)
# Check that requirement was written properly
reqs = db.execute(
"SELECT req_lvl, req_str, req_dex, req_class FROM item JOIN item_extra ON id = item_id WHERE id = ?",
(db_id,),
).fetchone()
expected_reqs = item.requirements()
self.assertEqual(reqs["req_lvl"], expected_reqs["lvl"])
self.assertEqual(reqs["req_str"], expected_reqs["str"])
self.assertEqual(reqs["req_dex"], expected_reqs["dex"])
self.assertEqual(reqs["req_class"], expected_reqs["class"])
def test_empty_sockets(self):
# superior armor with empty sockets
data = bytes.fromhex("10088000050014df175043b1b90cc38d80e3834070b004f41f")
_, item = parse_item(data)
db = get_db()
db_id = item.write_to_db(db=db)
loaded_item = Item.load_from_db(db_id, db=db)
self.assertEqual(loaded_item.sockets, 2)
self.assertEqual(len(loaded_item.socketed_items), 0)
self.assertEqual(item, loaded_item) self.assertEqual(item, loaded_item)

View File

@@ -1,6 +1,6 @@
import unittest import unittest
from d2warehouse.parser import parse_item from d2warehouse.parser import parse_item
from d2warehouse.item import Quality, lookup_runeword from d2warehouse.item import Quality, lookup_affix, lookup_runeword
class ParseItemTest(unittest.TestCase): class ParseItemTest(unittest.TestCase):
@@ -68,7 +68,8 @@ class ParseItemTest(unittest.TestCase):
self.assertEqual(data, b"") self.assertEqual(data, b"")
self.assertEqual(item.quality, Quality.HIGH) self.assertEqual(item.quality, Quality.HIGH)
self.assertEqual(len(item.stats), 2) self.assertEqual(len(item.stats), 2)
self.assertEqual(len(item.sockets), 2) self.assertEqual(item.sockets, 2)
self.assertEqual(len(item.socketed_items), 0)
def test_ed_max(self): def test_ed_max(self):
# test bugfix for https://gitlab.com/omicron-oss/d2warehouse/-/issues/1 # test bugfix for https://gitlab.com/omicron-oss/d2warehouse/-/issues/1
@@ -89,11 +90,9 @@ class ParseItemTest(unittest.TestCase):
data, item = parse_item(data) data, item = parse_item(data)
self.assertEqual(data, b"") self.assertEqual(data, b"")
self.assertTrue(item.is_runeword) self.assertTrue(item.is_runeword)
self.assertEqual(len(item.sockets), 2) self.assertEqual(item.sockets, 2)
item.sockets[0].print() self.assertEqual(item.socketed_items[0].code, "r09")
item.sockets[1].print() self.assertEqual(item.socketed_items[1].code, "r12")
self.assertEqual(item.sockets[0].code, "r09")
self.assertEqual(item.sockets[1].code, "r12")
rw = lookup_runeword(item.runeword_id) rw = lookup_runeword(item.runeword_id)
self.assertEqual(rw["name"], "Lore") self.assertEqual(rw["name"], "Lore")
self.assertEqual(str(item.stats[4]), "+1 to All Skills") # runeword stat self.assertEqual(str(item.stats[4]), "+1 to All Skills") # runeword stat
@@ -214,3 +213,38 @@ class ParseItemTest(unittest.TestCase):
data, _ = parse_item(data) data, _ = parse_item(data)
self.assertEqual(data, b"") self.assertEqual(data, b"")
def test_affixes(self):
data = bytes.fromhex(
"1000800005d0f4aa09173a36bf0723542d351ae4236acbd27000c30201a1052810208cf1241b4c50fc07"
)
data, item = parse_item(data)
self.assertEqual(data, b"")
req = item.requirements()
self.assertEqual(req["lvl"], 4)
# +8 stamina
self.assertAlmostEqual(lookup_affix(item.prefixes[0], True)["name"], "Rugged")
# 16% ed
self.assertAlmostEqual(lookup_affix(item.prefixes[1], True)["name"], "Sturdy")
# 10% fhr
self.assertAlmostEqual(
lookup_affix(item.suffixes[0], False)["name"], "of Balance"
)
# +1 dex
self.assertAlmostEqual(
lookup_affix(item.suffixes[1], False)["name"], "of Dexterity"
)
# Lightning bolt on hit
self.assertAlmostEqual(
lookup_affix(item.suffixes[2], False)["name"], "of Charged Shield"
)
def test_cathans_rule(self):
# Cathan's Rule: Only source of fire-max stat
data = bytes.fromhex(
"10008000050054c90448ad74276f910150448c180afc24ff1348fd3f212d85b415d25a48fb0f"
)
data, item = parse_item(data)
self.assertEqual(data, b"")
self.assertEqual(str(item.stats[0]), "+10 to Maximum Fire Damage")

View File

@@ -84,3 +84,18 @@ class StashTest(unittest.TestCase):
self.assertEqual(len(new_stash.tabs[0].items), 3) self.assertEqual(len(new_stash.tabs[0].items), 3)
self.assertEqual(len(new_stash.tabs[1].items), 1) self.assertEqual(len(new_stash.tabs[1].items), 1)
self.assertEqual(len(new_stash.tabs[2].items), 25) self.assertEqual(len(new_stash.tabs[2].items), 25)
def test_gemmed_raw(self):
data = bytes.fromhex(
"55aa55aa0100000063000000a40100006200000000000000000000000000"
"000000000000000000000000000000000000000000000000000000000000"
"000000004a4d010010088000050094a459a496629918020a484890ff1000"
"a0003500e07c6f0355aa55aa0100000063000000391b0000440000000000"
"000000000000000000000000000000000000000000000000000000000000"
"0000000000000000000000004a4d000055aa55aa01000000630000003905"
"000044000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000004a4d0000"
)
stash = parse_stash(data)
rebuilt = stash.raw()
self.assertEqual(data, rebuilt)

View File

@@ -18,6 +18,8 @@ requires-python = ">=3.10"
license = {text = "GPLv3 License"} license = {text = "GPLv3 License"}
dependencies = [ dependencies = [
"bitarray", "bitarray",
"flask",
"psutil",
] ]
dynamic = ["version"] dynamic = ["version"]