Compare commits

...

10 Commits

Author SHA1 Message Date
9b288a39dc Add the missing currency template for the currency ui (oops) 2023-10-31 12:08:42 +01:00
8b94171d22 Add stat fire-max for Cathan's Rule 2023-10-31 12:00:41 +01:00
9057f81d5f Add currency display page 2023-10-31 11:44:14 +01:00
2373702a1f fix storage_count to not count items that are removed from storage 2023-10-30 11:05:45 +01:00
4788d0d641 Add simple flask webapp to move items between stash and database 2023-10-29 21:45:55 +01:00
21df28d7bb Split sockets into sockets and socketed_items 2023-10-29 20:54:05 +01:00
423e4368d7 Fix bugs with affixes lookup 2023-10-29 20:48:33 +01:00
5026f58eb8 Fix an issue in rebuilding stash data for socketed items 2023-10-28 06:17:58 +02:00
b73fd9c20e make d2warehouse.db ready for the flask webui
- Add a function to automatically initialize the db with the schema if
   it isn't yet initialized
 - Remove the DROP TABLE queries from the schema in case the above^ goes
   wrong somehow
 - Make Item.write_to_db and Item.load_from_db take a db argument so
   that multiple threads can be supported
 - Add a few properties to d2warehouse.Item to help display items
2023-10-28 06:17:54 +02:00
93b1ad25a6 Add requirements to db write 2023-10-27 18:09:57 +02:00
21 changed files with 3219 additions and 2429 deletions

View File

@@ -11,12 +11,14 @@ with open(os.path.join(path, "magicprefix.txt")) as f:
dr = csv.DictReader(f, delimiter="\t")
index = 0
for row in dr:
if row["Name"] == "Expansion":
continue
index += 1
if len(row["Name"]) == 0:
continue
affixes["prefixes"][index] = {
"name": row["Name"],
"req_lvl": row["levelreq"],
"req_lvl": 0 if len(row["levelreq"]) == 0 else int(row["levelreq"]),
"req_class": None if len(row["class"]) == 0 else row["class"],
}
@@ -24,12 +26,14 @@ with open(os.path.join(path, "magicsuffix.txt")) as f:
dr = csv.DictReader(f, delimiter="\t")
index = 0
for row in dr:
if row["Name"] == "Expansion":
continue
index += 1
if len(row["Name"]) == 0:
continue
affixes["suffixes"][index] = {
"name": row["Name"],
"req_lvl": row["levelreq"],
"req_lvl": 0 if len(row["levelreq"]) == 0 else int(row["levelreq"]),
"req_class": None if len(row["class"]) == 0 else row["class"],
}

View File

@@ -13,7 +13,9 @@ path = sys.argv[1] if len(sys.argv) >= 2 else "."
# into one. Same applies for all stats.
special_stats = {
"firemindam": {"template": "dmg-fire"},
"firemaxdam": None,
"firemaxdam": {
"template": "fire-max"
}, # Cathan's rule, no other max ele dmg source exists
"lightmindam": {"template": "dmg-ltng"},
"lightmaxdam": None,
"magicmindam": {"template": "dmg-mag"},

View File

@@ -0,0 +1,5 @@
from d2warehouse.app.main import app
__all__ = [
"app",
]

22
d2warehouse/app/db.py Normal file
View File

@@ -0,0 +1,22 @@
import sqlite3
from flask import g
import d2warehouse.db as base_db
def get_db():
if "db" not in g:
print("\n==========\nDB PATH", base_db._path, "\n============\n")
g.db = sqlite3.connect(
base_db._path,
detect_types=sqlite3.PARSE_DECLTYPES,
)
g.db.row_factory = sqlite3.Row
return g.db
def close_db(e=None):
db = g.pop("db", None)
if db is not None:
db.close()

339
d2warehouse/app/main.py Normal file
View File

@@ -0,0 +1,339 @@
import hashlib
from flask import Flask, redirect, abort, render_template, request
from pathlib import Path
import shutil
from datetime import datetime
import psutil
from d2warehouse.item import Item, Quality, lookup_basetype
from d2warehouse.parser import parse_stash
import d2warehouse.db as base_db
from d2warehouse.app.db import get_db, close_db
import os
import re
from d2warehouse.stash import StashFullError
STASH_FILES = {
"softcore": "SharedStashSoftCoreV2.d2i",
"hardcore": "SharedStashHardCoreV2.d2i",
}
DB_FILES = {
"softcore": "d2warehouse.softcore.sqlite3",
"hardcore": "d2warehouse.hardcore.sqlite3",
}
CURRENCY_RUNES = [f"r{i + 1:02d}" for i in range(33)]
CURRENCY_GEMS = [
"gcv",
"gfv",
"gsv",
"gzv",
"gpv",
"gcb",
"gfb",
"gsb",
"glb",
"gpb",
"gcg",
"gfg",
"gsg",
"glg",
"gpg",
"gcr",
"gfr",
"gsr",
"glr",
"gpr",
"gcw",
"gfw",
"gsw",
"glw",
"gpw",
"gcy",
"gfy",
"gsy",
"gly",
"gpy",
"skc",
"skf",
"sku",
"skl",
"skz",
]
CURRENCY_KEYS = [
"pk1",
"pk2",
"pk3",
"bey",
"mbr",
"dhn",
]
CURRENCY_ESSENCES = [
"tes",
"ceh",
"bet",
"fed",
"toa",
]
def d2_running() -> bool:
for proc in psutil.process_iter():
try:
if proc.cmdline()[0].endswith("D2R.exe"):
return True
except (IndexError, psutil.AccessDenied):
pass
return False
def storage_count(item: Item, stash: str) -> int | str:
"""How many of this item type exist in storage"""
db = get_stash_db(stash)
if item.is_simple:
return db.execute(
"SELECT COUNT(id) FROM item WHERE code = ? AND deleted IS NULL",
(item.code,),
).fetchone()[0]
elif item.quality == Quality.UNIQUE:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND unique_id = ? AND deleted IS NULL",
(item.code, item.unique_id),
).fetchone()[0]
elif item.quality == Quality.SET:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND set_id = ? AND deleted IS NULL",
(item.code, item.set_id),
).fetchone()[0]
elif item.is_runeword:
return db.execute(
"SELECT COUNT(id) FROM item INNER JOIN item_extra ON id = item_id WHERE code = ? AND runeword_id = ? AND deleted IS NULL",
(item.code, item.runeword_id),
).fetchone()[0]
else:
return "N/A"
def save_path() -> Path:
if "D2SAVE_PATH" in os.environ:
path = Path(os.environ["D2SAVE_PATH"])
else:
path = Path.home() / "Saved Games/Diablo II Resurrected"
if not path.exists():
raise RuntimeError(f"Save path `{path}` does not exist")
return path
def get_stash_db(stash):
base_db.set_db_path(str(save_path() / DB_FILES[stash]))
return get_db()
base_db.set_db_path(str(save_path() / DB_FILES["softcore"]))
base_db.init_db()
base_db.close_db()
base_db.set_db_path(str(save_path() / DB_FILES["hardcore"]))
base_db.init_db()
base_db.close_db()
app = Flask(__name__)
app.teardown_appcontext(close_db)
@app.route("/")
def home():
return redirect("/stash/softcore", code=302)
@app.route("/stash/<stash_name>")
def list_stash(stash_name: str):
if stash_name not in STASH_FILES:
abort(404)
path = save_path() / STASH_FILES[stash_name]
stash_data = path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
stash = parse_stash(stash_data)
return render_template(
"list_stash.html",
stash_name=stash_name,
stash=stash,
stash_hash=stash_hash,
storage_count=lambda x: storage_count(x, stash_name),
)
@app.route("/stash/<stash_name>/store", methods=["POST"])
def stash_store_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash_hash = hashlib.sha256(stash_data).hexdigest()
if request.form.get("stash_hash") != stash_hash:
return "wrong stash hash", 400
stash = parse_stash(stash_data)
items = []
locs = [y for x in request.form.keys() if (y := re.match(r"item_(\d+)_(\d+)", x))]
for item_location in locs:
tab_idx, item_idx = int(item_location.group(1)), int(item_location.group(2))
if tab_idx > len(stash.tabs) or item_idx > len(stash.tabs[tab_idx].items):
# TODO: Handle this condition
return "invalid position (2)"
item = stash.tabs[tab_idx].items[item_idx]
items.append((tab_idx, item))
backup_stash(stash_name)
for tab_idx, item in items:
stash.tabs[tab_idx].remove(item)
tmp_path.write_bytes(stash.raw())
db = get_stash_db(stash_name)
for _, item in items:
item.write_to_db(db=db)
tmp_path.replace(stash_path)
return redirect(f"/stash/{stash_name}", code=303)
@app.route("/storage/<stash_name>")
def list_storage(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
items = {}
rows = db.execute("SELECT id FROM item WHERE deleted IS NULL").fetchall()
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
@app.route("/storage/<stash_name>/<category>")
def list_storage_category(stash_name: str, category: str):
if stash_name not in DB_FILES:
abort(404)
db = get_stash_db(stash_name)
if category == "uniques":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.UNIQUE),),
)
elif category == "sets":
q = db.execute(
"SELECT id FROM item INNER JOIN item_extra ON id = item_id WHERE deleted IS NULL AND quality = ?",
(int(Quality.SET),),
)
elif category == "misc":
q = db.execute("SELECT id FROM item WHERE deleted IS NULL AND is_simple = TRUE")
else:
return "Unexpected category", 400
rows = q.fetchall()
items = {}
for row in rows:
items[row["id"]] = Item.load_from_db(row["id"], db=db)
return render_template(
"list_storage.html", stash_name=stash_name, storage_items=items
)
def backup_stash(stash_name: str) -> None:
stash_path = save_path() / STASH_FILES[stash_name]
backup_path = save_path() / "backups"
if not backup_path.exists():
backup_path.mkdir(parents=True)
ts = datetime.now().strftime("%Y-%m-%d_%H.%M.%S.%f")[:-3]
backup_path /= f"{ts}_{STASH_FILES[stash_name]}"
shutil.copy(stash_path, backup_path)
@app.route("/storage/<stash_name>/take", methods=["POST"])
def storage_take_items(stash_name: str):
if stash_name not in STASH_FILES or stash_name not in DB_FILES:
abort(404)
stash_path = save_path() / STASH_FILES[stash_name]
tmp_path = save_path() / f"{STASH_FILES[stash_name]}.temp"
if tmp_path.exists():
# TODO: Handle this condition
return "temp file exists (BAD)"
return 500
if d2_running():
return "d2 is running", 500
stash_data = stash_path.read_bytes()
stash = parse_stash(stash_data)
backup_stash(stash_name)
# Write items to temporary stash file
db = get_stash_db(stash_name)
ids = [
int(y.group(1))
for x in request.form.keys()
if (y := re.match(r"item_(\d+)", x))
]
for id in ids:
item = Item.load_from_db(id, db=db)
try:
stash.add(item)
except StashFullError:
return "the shared stash does not fit those items", 500
tmp_path.write_bytes(stash.raw())
# Remove items from db
for id in ids:
db.execute("UPDATE item SET deleted = CURRENT_TIMESTAMP WHERE id = ?", (id,))
db.commit()
# Finalize by replacing real stash file
tmp_path.replace(stash_path)
return redirect(f"/storage/{stash_name}", code=303)
def storage_currency_counts(item_codes: list[str], stash_name: str) -> dict:
db = get_stash_db(stash_name)
currencies = {}
for code in item_codes:
currencies[code] = {
"count": db.execute(
"SELECT COUNT(id) FROM item WHERE code = ?", (code,)
).fetchone()[0],
"name": lookup_basetype(code)["name"],
}
return currencies
@app.route("/storage/<stash_name>/currency")
def storage_currency(stash_name: str):
if stash_name not in DB_FILES:
abort(404)
runes = storage_currency_counts(CURRENCY_RUNES, stash_name)
gems = storage_currency_counts(CURRENCY_GEMS, stash_name)
keys = storage_currency_counts(CURRENCY_KEYS, stash_name)
essences = storage_currency_counts(CURRENCY_ESSENCES, stash_name)
return render_template(
"currency.html", runes=runes, gems=gems, keys=keys, essences=essences
)

View File

@@ -0,0 +1,85 @@
body {
background-color: #000;
font-size: large;
font-family: sans-serif;
color: rgb(240, 240, 240);
}
.stash-tab {
display: grid;
grid-template-columns: repeat(5, 1fr);
grid-gap: 10px;
margin: 0 auto;
}
.currencies {
display: flex;
gap: 50px
}
.currencies th {
text-align: left;
}
.currencies td {
text-align: right;
}
@media (max-width: 1600px) {
.stash-tab {
grid-template-columns: repeat(4, 1fr);
}
}
@media (max-width: 1200px) {
.stash-tab {
grid-template-columns: repeat(3, 1fr);
}
}
@media (max-width: 800px) {
.stash-tab {
grid-template-columns: repeat(2, 1fr);
}
}
.stash-tab input[type="checkbox"] {
display: none;
}
.item .name {
font-weight: bold;
}
.item {
display: block;
background-color: #444;
border: solid #555;
}
input[type="checkbox"]:checked + label {
background-color: #343;
border: solid #464;
}
.color-rare {
color: rgb(255, 255, 100);
}
.color-unique {
color: rgb(199, 179, 119);
}
.color-set {
color: rgb(0, 252, 0);
}
.color-runeword {
color: rgb(199, 179, 119);
}
.raw-item {
max-width: 120px;
background-color: #444;
color: rgb(240, 240, 240);
}

View File

@@ -0,0 +1,53 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Shared Stash</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<div class="currencies">
<div>
<table>
{% for code,currency in runes.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in gems.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in keys.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
<div>
<table>
{% for code,currency in essences.items() %}
<tr>
<th>{{currency.name}}</th>
<td>{{currency.count}}</td>
</tr>
{% endfor %}
</table>
</div>
</div>
</body>
</html>

View File

@@ -0,0 +1,16 @@
<input type="checkbox" id="item_{{tabloop.index0}}_{{itemloop.index0}}" name="item_{{tabloop.index0}}_{{itemloop.index0}}" value="remove" />
<label class="item" for="item_{{tabloop.index0}}_{{itemloop.index0}}">
<ul>
<li class="name color-{{item.color}}">{{item.name}}</li>
{% if item.quality and item.quality >= 5 %}
<li class="name color-{{item.color}}">{{item.basename}}</li>
{% endif %}
<li>(in storage: {{storage_count(item)}})</li>
{% if item.stats %}
{% for stat in item.stats %}
<li>{{stat}}</li>
{% endfor %}
{% endif %}
<li><input class="raw-item" type="text" name="raw item" value="{{item.raw().hex()}}" onfocus="this.select()" readonly></li>
</ul>
</label>

View File

@@ -0,0 +1,24 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Shared Stash</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/stash/{{stash_name}}/store" method="POST">
{% for tab in stash.tabs %}
{% set tabloop = loop %}
<h2>Tab {{tabloop.index}}</h2>
<div class="stash-tab">
{% for item in tab.items %}
{% set itemloop = loop %}
{% include "item.html" %}
{% endfor %}
</div>
{% endfor %}
<input type="submit" value="Store items">
<input type="hidden" name="stash_hash" value="{{stash_hash}}" />
</form>
</body>
</html>

View File

@@ -0,0 +1,23 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Storage</title>
<link rel="stylesheet" href="/static/style.css" />
<head>
<body>
<form action="/storage/{{stash_name}}/take" method="POST">
<div>
<!-- TODO: Include item.html -->
{% for db_id, item in storage_items.items() %}
<div>
<input type="checkbox" name="item_{{db_id}}" value="take" />
{{item.name}}
({{db_id}})
</div>
{% endfor %}
</div>
<input type="submit" value="Take items">
</form>
</body>
</html>

File diff suppressed because it is too large Load Diff

View File

@@ -273,6 +273,14 @@
"save_add": 0,
"save_param_bits": null
},
"49": {
"text": "+# to Maximum Fire Damage",
"save_bits": [
9
],
"save_add": 0,
"save_param_bits": null
},
"50": {
"text": "Adds #-# Lightning Damage",
"save_bits": [

View File

@@ -32,6 +32,13 @@ def close_db() -> None:
_db = None
def init_db() -> None:
db = get_db()
count = db.execute("SELECT count(*) FROM sqlite_master").fetchone()[0]
if count == 0:
create_db()
def create_db() -> None:
db = get_db()
with open(_schema_path, encoding="utf-8") as f:

View File

@@ -17,7 +17,6 @@
import json
import os
import re
from typing import Optional
from bitarray import bitarray
from bitarray.util import int2ba
from dataclasses import dataclass
@@ -31,6 +30,7 @@ _stats_map = None
_unique_map = None
_set_item_map = None
_runeword_map = None
_affix_map = None
class Quality(IntEnum):
@@ -118,10 +118,74 @@ class Item:
defense: int | None = None
durability: int | None = None
max_durability: int | None = None
sockets: list[Optional["Item"]] | None = None
sockets: int | None = None
socketed_items: list["Item"] | None = None
quantity: int | None = None
stats: list[Stat] | None = None
@property
def basename(self) -> str:
return lookup_basetype(self.code)["name"]
@property
def name(self) -> str:
match self.quality:
case Quality.LOW:
return f"{self.low_quality} {self.basename}"
case Quality.NORMAL | None:
return self.basename
case Quality.HIGH:
return f"Superior {self.basename}"
case Quality.MAGIC:
return f"<prefix> {self.basename} <suffix>"
case Quality.SET:
assert self.set_id is not None
return lookup_set_item(self.set_id)["name"]
case Quality.RARE:
# FIXME
return "<rare> <name>"
case Quality.UNIQUE:
assert self.unique_id is not None
return lookup_unique(self.unique_id)["name"]
case Quality.CRAFTED:
# FIXME
return "<crafted> <name>"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
@property
def color(self) -> str:
if self.is_runeword:
return "runeword"
match self.quality:
case Quality.LOW:
return "low"
case Quality.NORMAL | None:
return "normal"
case Quality.HIGH:
return "normal"
case Quality.MAGIC:
return "magic"
case Quality.SET:
return "set"
case Quality.RARE:
return "rare"
case Quality.UNIQUE:
return "unique"
case Quality.CRAFTED:
return "crafted"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
def raw(self):
parts = [self.raw_data]
if self.socketed_items:
for item in self.socketed_items:
parts.append(item.raw_data)
return b"".join(parts)
def print(self, indent=5, with_raw=False):
properties = []
base_name = lookup_basetype(self.code)["name"]
@@ -168,12 +232,9 @@ class Item:
f"Durability: {self.durability} out of {self.max_durability}",
)
if self.is_socketed:
print(" " * indent, f"{len(self.sockets)} sockets:")
for socket in self.sockets:
if socket:
socket.print(indent + 4)
else:
print(" " * (indent + 4), "Empty")
print(" " * indent, f"{len(self.socketed_items)}/{self.sockets} sockets:")
for socket in self.socketed_items:
socket.print(indent + 4)
if self.quantity:
print(" " * indent, f"Quantity: {self.quantity}")
if self.stats:
@@ -204,7 +265,42 @@ class Item:
base = lookup_basetype(self.code)
return base["width"], base["height"]
def write_to_db(self, socketed_into=None, commit=True) -> int:
def requirements(self) -> dict:
base = lookup_basetype(self.code)
# TODO: How is class requirements determined on basetypes?
reqs = {
"lvl": base["req_lvl"],
"dex": base["req_dex"],
"str": base["req_str"],
"class": None,
}
# TODO: Can implicit mod end up increasing requirements?
if self.quality == Quality.UNIQUE:
reqs["lvl"] = lookup_unique(self.unique_id)["req_lvl"]
elif self.quality == Quality.SET:
reqs["lvl"] = lookup_set_item(self.set_id)["req_lvl"]
elif self.is_runeword:
# TODO: What affects runeword level? Only the sockets?
pass
elif self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for m, id in [(False, id) for id in self.suffixes] + [
(True, id) for id in self.prefixes
]:
if id == 0:
continue
affix = lookup_affix(id, m)
reqs["lvl"] = max(reqs["lvl"], affix["req_lvl"])
if affix["req_class"]:
reqs["class"] = affix["req_class"]
if self.socketed_items:
for socket_item in self.socketed_items:
socket_reqs = socket_item.requirements()
reqs["lvl"] = max(reqs["lvl"], socket_reqs["lvl"])
return reqs
def write_to_db(self, socketed_into=None, commit=True, db=None) -> int:
if db is None:
db = get_db()
name = lookup_basetype(self.code)["name"]
# FIXME: handle magic & rare names
if self.is_runeword:
@@ -218,13 +314,13 @@ class Item:
lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None
)
db = get_db()
req = self.requirements()
cur = db.cursor()
cur.execute(
"""INSERT INTO item (itembase_name, socketed_into, raw_data, raw_version,
is_identified, is_socketed, is_beginner, is_simple, is_ethereal,
is_personalized, is_runeword, code)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
is_personalized, is_runeword, code, req_lvl)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
lookup_basetype(self.code)["name"],
socketed_into,
@@ -238,38 +334,46 @@ class Item:
self.is_personalized,
self.is_runeword,
self.code,
req["lvl"],
),
)
item_id = cur.lastrowid
cur.execute(
"""INSERT INTO item_extra (item_id, item_name,
set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id,
unique_id, nameword1, nameword2, runeword_id, personal_name,
defense, durability, max_durability, quantity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
item_id,
name,
set_name,
self.uid,
self.lvl,
int(self.quality) if self.quality else None,
self.graphic,
self.implicit,
int(self.low_quality) if self.low_quality else None,
self.set_id,
self.unique_id,
self.nameword1,
self.nameword2,
self.runeword_id,
self.personal_name,
self.defense,
self.durability,
self.max_durability,
self.quantity,
),
)
if not self.is_simple:
cur.execute(
"""INSERT INTO item_extra (item_id, item_name,
set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id,
unique_id, nameword1, nameword2, runeword_id, personal_name,
defense, durability, max_durability, sockets, quantity, req_str,
req_dex, req_class)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?)""",
(
item_id,
name,
set_name,
self.uid,
self.lvl,
int(self.quality) if self.quality else None,
self.graphic,
self.implicit,
int(self.low_quality) if self.low_quality else None,
self.set_id,
self.unique_id,
self.nameword1,
self.nameword2,
self.runeword_id,
self.personal_name,
self.defense,
self.durability,
self.max_durability,
self.sockets,
self.quantity,
req["str"],
req["dex"],
req["class"],
),
)
if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for prefix, id in [(False, id) for id in self.suffixes] + [
@@ -296,24 +400,25 @@ class Item:
),
)
if self.sockets:
for socket in self.sockets:
socket.write_to_db(socketed_into=item_id, commit=False)
if self.socketed_items:
for socket_item in self.socketed_items:
socket_item.write_to_db(socketed_into=item_id, commit=False, db=db)
if commit:
db.commit()
return item_id
def load_from_db(id: int) -> "Item":
db = get_db()
def load_from_db(id: int, db=None) -> "Item":
if db is None:
db = get_db()
row = db.execute(
"""SELECT raw_data, raw_version, is_identified, is_socketed,
is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code,
uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id,
nameword1, nameword2, runeword_id, personal_name, defense, durability,
max_durability, quantity
FROM item INNER JOIN item_extra ON id = item_id WHERE id = ?""",
max_durability, sockets, quantity
FROM item LEFT JOIN item_extra ON id = item_id WHERE id = ?""",
(id,),
).fetchone()
if row["raw_version"] != STASH_TAB_VERSION:
@@ -349,6 +454,7 @@ class Item:
defense=row["defense"],
durability=row["durability"],
max_durability=row["max_durability"],
sockets=row["sockets"],
quantity=row["quantity"],
)
@@ -364,15 +470,16 @@ class Item:
else:
item.suffixes.append(row["affix_id"])
rows = db.execute(
"SELECT id FROM item WHERE socketed_into = ?", (id,)
).fetchall()
if len(rows) > 0:
item.sockets = []
for row in rows:
socket = Item.load_from_db(row["id"])
socket.pos_x = len(item.sockets)
item.sockets.append(socket)
if item.is_socketed:
item.socketed_items = []
rows = db.execute(
"SELECT id FROM item WHERE socketed_into = ?", (id,)
).fetchall()
if len(rows) > 0:
for row in rows:
socket_item = Item.load_from_db(row["id"], db=db)
socket_item.pos_x = len(item.socketed_items)
item.socketed_items.append(socket_item)
rows = db.execute(
"SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?",
@@ -431,3 +538,11 @@ def lookup_runeword(id: int) -> dict:
with open(os.path.join(_data_path, "runewords.json")) as f:
_runeword_map = json.load(f)
return _runeword_map[str(id)]
def lookup_affix(id: int, prefix: bool) -> dict:
global _affix_map
if _affix_map is None:
with open(os.path.join(_data_path, "affixes.json")) as f:
_affix_map = json.load(f)
return _affix_map["prefixes" if prefix else "suffixes"][str(id)]

View File

@@ -179,7 +179,8 @@ def parse_item(data: bytes) -> tuple[bytes, Item]:
itembase_end += personalized_end
if item.is_socketed:
sockets_count = ba2int(bits[itembase_end : itembase_end + 4])
item.sockets = ba2int(bits[itembase_end : itembase_end + 4])
item.socketed_items = []
sockets_end = itembase_end + 4
else:
sockets_end = itembase_end
@@ -194,10 +195,9 @@ def parse_item(data: bytes) -> tuple[bytes, Item]:
# Parse out sockets if any exist on the item
if item.is_socketed:
item.sockets = [None] * sockets_count
for i in range(0, filled_sockets):
remaining_data, socket = parse_item(remaining_data)
item.sockets[i] = socket
remaining_data, socket_item = parse_item(remaining_data)
item.socketed_items.append(socket_item)
return remaining_data, item
@@ -268,14 +268,20 @@ def parse_set_data(bits: bitarray, item: Item) -> tuple[Item, int]:
def parse_rare_data(bits: bitarray, item: Item) -> tuple[Item, int]:
item.nameword1 = ba2int(bits[0:8])
item.nameword2 = ba2int(bits[8:16])
affixes = []
item.prefixes = []
item.suffixes = []
ptr = 16
for _ in range(0, 6):
for _ in range(0, 3):
# Prefix
(affix, sz) = parse_affix(bits[ptr:])
ptr += sz
affixes.append(affix)
item.prefixes = [affix for affix in affixes[0:3] if affix is not None]
item.suffixes = [affix for affix in affixes[3:6] if affix is not None]
if affix:
item.prefixes.append(affix)
# Suffix
(affix, sz) = parse_affix(bits[ptr:])
ptr += sz
if affix:
item.suffixes.append(affix)
return item, ptr

View File

@@ -1,7 +1,7 @@
DROP TABLE IF EXISTS item_stat;
DROP TABLE IF EXISTS item_affix;
DROP TABLE IF EXISTS item_extra;
DROP TABLE IF EXISTS item;
--DROP TABLE IF EXISTS item_stat;
--DROP TABLE IF EXISTS item_affix;
--DROP TABLE IF EXISTS item_extra;
--DROP TABLE IF EXISTS item;
CREATE TABLE item (
id INTEGER PRIMARY KEY,
@@ -14,6 +14,7 @@ CREATE TABLE item (
-- items have two names: the base name and item_extra.item_name.
itembase_name TEXT NOT NULL,
socketed_into INTEGER DEFAULT NULL,
req_lvl INTEGER DEFAULT 0,
-- The following fields match the fields of the item object in item.py
raw_data BLOB NOT NULL,
@@ -31,13 +32,14 @@ CREATE TABLE item (
);
-- Add an index for "... WHERE deletion IS NULL"
CREATE INDEX item_deletion_partial ON item (deleted) WHERE deleted IS NULL;
-- * nuked: if the item has been removed from storage & user indicated he does not
-- want to count it as potentially in his possession any longer
CREATE TABLE item_extra (
item_id INTEGER PRIMARY KEY,
item_name TEXT DEFAULT NULL,
set_name TEXT DEFAULT NULL,
req_str INTEGER DEFAULT 0,
req_dex INTEGER DEFAULT 0,
req_class TEXT DEFAULT NULL,
-- The following fields match the fields of the item object in item.py
uid INTEGER DEFAULT NULL,
@@ -55,12 +57,13 @@ CREATE TABLE item_extra (
defense INTEGER DEFAULT NULL,
durability INTEGER DEFAULT NULL,
max_durability INTEGER DEFAULT NULL,
-- sockets: list[Optional["Item"]] | None = None => see item.socketed_into
sockets INTEGER DEFAULT NULL, -- number of sockets; see item.socketed_into
quantity INTEGER DEFAULT NULL,
-- stats: list[Stat] | None = None => see table 'item_stat'
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_extra_item_id ON item_extra (item_id);
CREATE TABLE item_stat (
id INTEGER PRIMARY KEY,
@@ -73,6 +76,7 @@ CREATE TABLE item_stat (
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_stat_item_id ON item_stat (item_id);
CREATE INDEX item_stat_stat ON item_stat (stat);
CREATE TABLE item_affix (
@@ -83,3 +87,4 @@ CREATE TABLE item_affix (
FOREIGN KEY (item_id) REFERENCES item (id)
);
CREATE INDEX item_affix_item_id ON item_affix (item_id);

View File

@@ -34,6 +34,17 @@ class Stash:
def raw(self) -> bytes:
return b"".join(tab.raw() for tab in self.tabs)
def add(self, item: Item) -> None:
for tab in self.tabs:
try:
tab.add(item)
return
except StashFullError:
pass
raise StashFullError(
"Could not locate an open spot in the stash to add the item"
)
class StashTab:
def __init__(self) -> None:
@@ -44,7 +55,7 @@ class StashTab:
def raw(self) -> bytes:
"""Get the computed raw representation of the stash"""
item_raw = b"".join(item.raw_data for item in self.items)
item_raw = b"".join(item.raw() for item in self.items)
raw_length = len(item_raw) + 0x44
return (
STASH_TAB_MAGIC

View File

@@ -2,7 +2,7 @@ import os
import tempfile
import unittest
from d2warehouse.db import close_db, create_db, set_db_path
from d2warehouse.db import close_db, create_db, get_db, set_db_path
from d2warehouse.item import Item
from d2warehouse.parser import parse_item
@@ -26,6 +26,30 @@ class DbTest(unittest.TestCase):
)
_, item = parse_item(data)
db_id = item.write_to_db()
loaded_item = Item.load_from_db(db_id)
db = get_db()
db_id = item.write_to_db(db=db)
loaded_item = Item.load_from_db(db_id, db=db)
self.assertEqual(item, loaded_item)
# Check that requirement was written properly
reqs = db.execute(
"SELECT req_lvl, req_str, req_dex, req_class FROM item JOIN item_extra ON id = item_id WHERE id = ?",
(db_id,),
).fetchone()
expected_reqs = item.requirements()
self.assertEqual(reqs["req_lvl"], expected_reqs["lvl"])
self.assertEqual(reqs["req_str"], expected_reqs["str"])
self.assertEqual(reqs["req_dex"], expected_reqs["dex"])
self.assertEqual(reqs["req_class"], expected_reqs["class"])
def test_empty_sockets(self):
# superior armor with empty sockets
data = bytes.fromhex("10088000050014df175043b1b90cc38d80e3834070b004f41f")
_, item = parse_item(data)
db = get_db()
db_id = item.write_to_db(db=db)
loaded_item = Item.load_from_db(db_id, db=db)
self.assertEqual(loaded_item.sockets, 2)
self.assertEqual(len(loaded_item.socketed_items), 0)
self.assertEqual(item, loaded_item)

View File

@@ -1,6 +1,6 @@
import unittest
from d2warehouse.parser import parse_item
from d2warehouse.item import Quality, lookup_runeword
from d2warehouse.item import Quality, lookup_affix, lookup_runeword
class ParseItemTest(unittest.TestCase):
@@ -68,7 +68,8 @@ class ParseItemTest(unittest.TestCase):
self.assertEqual(data, b"")
self.assertEqual(item.quality, Quality.HIGH)
self.assertEqual(len(item.stats), 2)
self.assertEqual(len(item.sockets), 2)
self.assertEqual(item.sockets, 2)
self.assertEqual(len(item.socketed_items), 0)
def test_ed_max(self):
# test bugfix for https://gitlab.com/omicron-oss/d2warehouse/-/issues/1
@@ -89,11 +90,9 @@ class ParseItemTest(unittest.TestCase):
data, item = parse_item(data)
self.assertEqual(data, b"")
self.assertTrue(item.is_runeword)
self.assertEqual(len(item.sockets), 2)
item.sockets[0].print()
item.sockets[1].print()
self.assertEqual(item.sockets[0].code, "r09")
self.assertEqual(item.sockets[1].code, "r12")
self.assertEqual(item.sockets, 2)
self.assertEqual(item.socketed_items[0].code, "r09")
self.assertEqual(item.socketed_items[1].code, "r12")
rw = lookup_runeword(item.runeword_id)
self.assertEqual(rw["name"], "Lore")
self.assertEqual(str(item.stats[4]), "+1 to All Skills") # runeword stat
@@ -214,3 +213,38 @@ class ParseItemTest(unittest.TestCase):
data, _ = parse_item(data)
self.assertEqual(data, b"")
def test_affixes(self):
data = bytes.fromhex(
"1000800005d0f4aa09173a36bf0723542d351ae4236acbd27000c30201a1052810208cf1241b4c50fc07"
)
data, item = parse_item(data)
self.assertEqual(data, b"")
req = item.requirements()
self.assertEqual(req["lvl"], 4)
# +8 stamina
self.assertAlmostEqual(lookup_affix(item.prefixes[0], True)["name"], "Rugged")
# 16% ed
self.assertAlmostEqual(lookup_affix(item.prefixes[1], True)["name"], "Sturdy")
# 10% fhr
self.assertAlmostEqual(
lookup_affix(item.suffixes[0], False)["name"], "of Balance"
)
# +1 dex
self.assertAlmostEqual(
lookup_affix(item.suffixes[1], False)["name"], "of Dexterity"
)
# Lightning bolt on hit
self.assertAlmostEqual(
lookup_affix(item.suffixes[2], False)["name"], "of Charged Shield"
)
def test_cathans_rule(self):
# Cathan's Rule: Only source of fire-max stat
data = bytes.fromhex(
"10008000050054c90448ad74276f910150448c180afc24ff1348fd3f212d85b415d25a48fb0f"
)
data, item = parse_item(data)
self.assertEqual(data, b"")
self.assertEqual(str(item.stats[0]), "+10 to Maximum Fire Damage")

View File

@@ -84,3 +84,18 @@ class StashTest(unittest.TestCase):
self.assertEqual(len(new_stash.tabs[0].items), 3)
self.assertEqual(len(new_stash.tabs[1].items), 1)
self.assertEqual(len(new_stash.tabs[2].items), 25)
def test_gemmed_raw(self):
data = bytes.fromhex(
"55aa55aa0100000063000000a40100006200000000000000000000000000"
"000000000000000000000000000000000000000000000000000000000000"
"000000004a4d010010088000050094a459a496629918020a484890ff1000"
"a0003500e07c6f0355aa55aa0100000063000000391b0000440000000000"
"000000000000000000000000000000000000000000000000000000000000"
"0000000000000000000000004a4d000055aa55aa01000000630000003905"
"000044000000000000000000000000000000000000000000000000000000"
"00000000000000000000000000000000000000004a4d0000"
)
stash = parse_stash(data)
rebuilt = stash.raw()
self.assertEqual(data, rebuilt)

View File

@@ -18,6 +18,8 @@ requires-python = ">=3.10"
license = {text = "GPLv3 License"}
dependencies = [
"bitarray",
"flask",
"psutil",
]
dynamic = ["version"]