Files
d2warehouse/d2warehouse/item.py

549 lines
19 KiB
Python

# Copyright 2023 <omicron.me@protonmail.com>
# Copyright 2023 <andreasruden91@gmail.com>
#
# This file is part of d2warehouse.
#
# d2warehouse is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# d2warehouse is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Mercator. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
from bitarray import bitarray
from bitarray.util import int2ba
from dataclasses import dataclass
from enum import IntEnum
from d2warehouse.fileformat import STASH_TAB_VERSION
from d2warehouse.db import get_db
_data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
_basetype_map = None
_stats_map = None
_unique_map = None
_set_item_map = None
_runeword_map = None
_affix_map = None
class Quality(IntEnum):
LOW = 1
NORMAL = 2
HIGH = 3
MAGIC = 4
SET = 5
RARE = 6
UNIQUE = 7
CRAFTED = 8
def __str__(self) -> str:
return self.name.capitalize()
class LowQualityType(IntEnum):
CRUDE = 0
CRACKED = 1
DAMAGED = 2
LOW_QUALITY = 3
def __str__(self) -> str:
return self.name.capitalize()
@dataclass
class Stat:
id: int | None = None # TODO: These 3 should probably not be optional
values: list[int] | None = None
parameter: int | None = None
text: str | None = None # TODO: Make this a property
def print(self, indent=5):
print(" " * indent, str(self))
def __str__(self):
param = self.parameter
if isinstance(self.text, list):
subst_text = next(filter(lambda v: v["param"] == param, self.text))["text"]
param = None
else:
subst_text = self.text
for val in self.values:
subst_text = subst_text.replace("#", str(val), 1)
if param:
subst_text = re.sub(r"\[[^\]]*\]", str(param), subst_text, 1)
return subst_text
def txtbits(bits: bitarray) -> str:
txt = "".join(str(b) for b in bits)
grouped = [txt[i : i + 8] for i in range(0, len(txt), 8)]
return " ".join(grouped)
@dataclass
class Item:
raw_data: bytes
raw_version: int
is_identified: bool
is_socketed: bool
is_beginner: bool
is_simple: bool
is_ethereal: bool
is_personalized: bool
is_runeword: bool
pos_x: int
pos_y: int
code: str
uid: int | None = None
lvl: int | None = None
quality: Quality | None = None
graphic: int | None = None
implicit: int | None = None
low_quality: LowQualityType | None = None
prefixes: list[int] | None = None
suffixes: list[int] | None = None
set_id: int | None = None
unique_id: int | None = None
nameword1: int | None = None
nameword2: int | None = None
runeword_id: int | None = None
personal_name: str | None = None
defense: int | None = None
durability: int | None = None
max_durability: int | None = None
sockets: int | None = None
socketed_items: list["Item"] | None = None
quantity: int | None = None
stats: list[Stat] | None = None
@property
def basename(self) -> str:
return lookup_basetype(self.code)["name"]
@property
def name(self) -> str:
match self.quality:
case Quality.LOW:
return f"{self.low_quality} {self.basename}"
case Quality.NORMAL | None:
return self.basename
case Quality.HIGH:
return f"Superior {self.basename}"
case Quality.MAGIC:
return f"<prefix> {self.basename} <suffix>"
case Quality.SET:
assert self.set_id is not None
return lookup_set_item(self.set_id)["name"]
case Quality.RARE:
# FIXME
return "<rare> <name>"
case Quality.UNIQUE:
assert self.unique_id is not None
return lookup_unique(self.unique_id)["name"]
case Quality.CRAFTED:
# FIXME
return "<crafted> <name>"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
@property
def color(self) -> str:
if self.is_runeword:
return "runeword"
match self.quality:
case Quality.LOW:
return "low"
case Quality.NORMAL | None:
return "normal"
case Quality.HIGH:
return "normal"
case Quality.MAGIC:
return "magic"
case Quality.SET:
return "set"
case Quality.RARE:
return "rare"
case Quality.UNIQUE:
return "unique"
case Quality.CRAFTED:
return "crafted"
case _:
# TODO: In 3.11 replace this with assert_never
assert False, "Should be unreachable"
def raw(self):
parts = [self.raw_data]
if self.socketed_items:
for item in self.socketed_items:
parts.append(item.raw_data)
return b"".join(parts)
def print(self, indent=5, with_raw=False):
properties = []
base_name = lookup_basetype(self.code)["name"]
print(" " * indent, f"{base_name} ({self.code})")
if self.lvl:
print(" " * indent, f"ilvl {self.lvl}")
if self.is_simple:
properties.append("Simple")
else:
properties.append("Extended")
if self.is_ethereal:
properties.append("Ethereal")
if not self.is_identified:
properties.append("Unidentified")
if self.is_socketed:
properties.append("Socketed")
if self.is_runeword:
properties.append("Runeword")
if properties:
print(" " * indent, ", ".join(properties))
print(" " * indent, f"at {self.pos_x}, {self.pos_y}")
if self.quality:
print(" " * indent, self.quality)
if self.prefixes:
print(" " * indent, "Prefixes:", self.prefixes)
if self.suffixes:
print(" " * indent, "Suffixes:", self.suffixes)
if self.set_id:
itm = lookup_set_item(self.set_id)
print(" " * indent, f"{itm['name']} ({self.set_id}), part of {itm['set']}")
if self.unique_id:
itm = lookup_unique(self.unique_id)
print(" " * indent, f"{itm['name']} ({self.unique_id})")
if self.runeword_id:
rw = lookup_runeword(self.runeword_id)
print(" " * indent, f"{rw['name']} runeword")
if self.personal_name:
print(" " * indent, f"Personal name: {self.personal_name}")
if self.defense:
print(" " * indent, f"Defense: {self.defense}")
if self.durability is not None:
print(
" " * indent,
f"Durability: {self.durability} out of {self.max_durability}",
)
if self.is_socketed:
print(" " * indent, f"{len(self.socketed_items)}/{self.sockets} sockets:")
for socket in self.socketed_items:
socket.print(indent + 4)
if self.quantity:
print(" " * indent, f"Quantity: {self.quantity}")
if self.stats:
print(" " * indent, "Stats:")
for stat in self.stats:
stat.print(indent + 4)
if with_raw:
print(" " * indent, "Raw Item Data:")
bits = bitarray(endian="little")
bits.frombytes(self.raw_data)
print(" " * indent, txtbits(bits))
print(" " * indent, self.raw_data.hex())
print("")
print("")
def set_position(self, x: int, y: int) -> None:
if x < 0 or x > 15 or y < 0 or y > 15:
raise ValueError("position needs to be in range [0, 16)")
self.pos_x = x
self.pos_y = y
bits = bitarray(endian="little")
bits.frombytes(self.raw_data)
bits[42:46] = int2ba(x, 4, endian="little")
bits[46:50] = int2ba(y, 4, endian="little")
self.raw_data = bits.tobytes()
def size(self) -> tuple[int, int]:
base = lookup_basetype(self.code)
return base["width"], base["height"]
def requirements(self) -> dict:
base = lookup_basetype(self.code)
# TODO: How is class requirements determined on basetypes?
reqs = {
"lvl": base["req_lvl"],
"dex": base["req_dex"],
"str": base["req_str"],
"class": None,
}
# TODO: Can implicit mod end up increasing requirements?
if self.quality == Quality.UNIQUE:
reqs["lvl"] = lookup_unique(self.unique_id)["req_lvl"]
elif self.quality == Quality.SET:
reqs["lvl"] = lookup_set_item(self.set_id)["req_lvl"]
elif self.is_runeword:
# TODO: What affects runeword level? Only the sockets?
pass
elif self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for m, id in [(False, id) for id in self.suffixes] + [
(True, id) for id in self.prefixes
]:
if id == 0:
continue
affix = lookup_affix(id, m)
reqs["lvl"] = max(reqs["lvl"], affix["req_lvl"])
if affix["req_class"]:
reqs["class"] = affix["req_class"]
if self.socketed_items:
for socket_item in self.socketed_items:
socket_reqs = socket_item.requirements()
reqs["lvl"] = max(reqs["lvl"], socket_reqs["lvl"])
return reqs
def write_to_db(self, socketed_into=None, commit=True, db=None) -> int:
if db is None:
db = get_db()
name = lookup_basetype(self.code)["name"]
# FIXME: handle magic & rare names
if self.is_runeword:
name = lookup_runeword(self.runeword_id)["name"]
elif self.quality == Quality.SET:
name = lookup_set_item(self.set_id)["name"]
elif self.quality == Quality.UNIQUE:
name = lookup_unique(self.unique_id)["name"]
set_name = (
lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None
)
req = self.requirements()
cur = db.cursor()
cur.execute(
"""INSERT INTO item (itembase_name, socketed_into, raw_data, raw_version,
is_identified, is_socketed, is_beginner, is_simple, is_ethereal,
is_personalized, is_runeword, code, req_lvl)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
lookup_basetype(self.code)["name"],
socketed_into,
self.raw_data,
self.raw_version,
self.is_identified,
self.is_socketed,
self.is_beginner,
self.is_simple,
self.is_ethereal,
self.is_personalized,
self.is_runeword,
self.code,
req["lvl"],
),
)
item_id = cur.lastrowid
if not self.is_simple:
cur.execute(
"""INSERT INTO item_extra (item_id, item_name,
set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id,
unique_id, nameword1, nameword2, runeword_id, personal_name,
defense, durability, max_durability, sockets, quantity, req_str,
req_dex, req_class)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?)""",
(
item_id,
name,
set_name,
self.uid,
self.lvl,
int(self.quality) if self.quality else None,
self.graphic,
self.implicit,
int(self.low_quality) if self.low_quality else None,
self.set_id,
self.unique_id,
self.nameword1,
self.nameword2,
self.runeword_id,
self.personal_name,
self.defense,
self.durability,
self.max_durability,
self.sockets,
self.quantity,
req["str"],
req["dex"],
req["class"],
),
)
if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for prefix, id in [(False, id) for id in self.suffixes] + [
(True, id) for id in self.prefixes
]:
db.execute(
"INSERT INTO item_affix (item_id, prefix, affix_id) VALUES (?, ?, ?)",
(item_id, prefix, id),
)
if self.stats:
for stat in self.stats:
db.execute(
"""INSERT INTO item_stat (item_id, stat, value1, value2, value3, parameter)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
item_id,
stat.id,
stat.values[0] if len(stat.values) > 0 else None,
stat.values[1] if len(stat.values) > 1 else None,
stat.values[2] if len(stat.values) > 2 else None,
stat.parameter,
),
)
if self.socketed_items:
for socket_item in self.socketed_items:
socket_item.write_to_db(socketed_into=item_id, commit=False, db=db)
if commit:
db.commit()
return item_id
def load_from_db(id: int, db=None) -> "Item":
if db is None:
db = get_db()
row = db.execute(
"""SELECT raw_data, raw_version, is_identified, is_socketed,
is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code,
uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id,
nameword1, nameword2, runeword_id, personal_name, defense, durability,
max_durability, sockets, quantity
FROM item LEFT JOIN item_extra ON id = item_id WHERE id = ?""",
(id,),
).fetchone()
if row["raw_version"] != STASH_TAB_VERSION:
raise RuntimeError("Can not load item, the raw version is not supported")
item = Item(
raw_data=row["raw_data"],
raw_version=row["raw_version"],
is_identified=bool(row["is_identified"]),
is_socketed=bool(row["is_socketed"]),
is_beginner=bool(row["is_beginner"]),
is_simple=bool(row["is_simple"]),
is_ethereal=bool(row["is_ethereal"]),
is_personalized=bool(row["is_personalized"]),
is_runeword=bool(row["is_runeword"]),
pos_x=0,
pos_y=0,
code=row["code"],
uid=row["uid"],
lvl=row["lvl"],
quality=Quality(row["quality"]) if row["quality"] else None,
graphic=row["graphic"],
implicit=row["implicit"],
low_quality=LowQualityType(row["low_quality"])
if row["low_quality"]
else None,
set_id=row["set_id"],
unique_id=row["unique_id"],
nameword1=row["nameword1"],
nameword2=row["nameword2"],
runeword_id=row["runeword_id"],
personal_name=row["personal_name"],
defense=row["defense"],
durability=row["durability"],
max_durability=row["max_durability"],
sockets=row["sockets"],
quantity=row["quantity"],
)
if item.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
rows = db.execute(
"SELECT prefix, affix_id FROM item_affix WHERE item_id = ?", (id,)
)
item.prefixes = []
item.suffixes = []
for row in rows:
if row["prefix"]:
item.prefixes.append(row["affix_id"])
else:
item.suffixes.append(row["affix_id"])
if item.is_socketed:
item.socketed_items = []
rows = db.execute(
"SELECT id FROM item WHERE socketed_into = ?", (id,)
).fetchall()
if len(rows) > 0:
for row in rows:
socket_item = Item.load_from_db(row["id"], db=db)
socket_item.pos_x = len(item.socketed_items)
item.socketed_items.append(socket_item)
rows = db.execute(
"SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?",
(id,),
).fetchall()
if len(rows) > 0:
item.stats = []
for row in rows:
values = []
for i in range(1, 4):
if row[f"value{i}"] is not None:
values.append(row[f"value{i}"])
stat = Stat(id=row["stat"], values=values, parameter=row["parameter"])
stat_data = lookup_stat(stat.id)
stat.text = stat_data["text"]
item.stats.append(stat)
return item
def lookup_basetype(code: str) -> dict:
global _basetype_map
if _basetype_map is None:
with open(os.path.join(_data_path, "items.json")) as f:
_basetype_map = json.load(f)
return _basetype_map[code]
def lookup_stat(id: int) -> dict:
global _stats_map
if _stats_map is None:
with open(os.path.join(_data_path, "stats.json")) as f:
_stats_map = json.load(f)
return _stats_map[str(id)]
def lookup_unique(id: int) -> dict:
global _unique_map
if _unique_map is None:
with open(os.path.join(_data_path, "uniques.json")) as f:
_unique_map = json.load(f)
return _unique_map[str(id)]
def lookup_set_item(id: int) -> dict:
global _set_item_map
if _set_item_map is None:
with open(os.path.join(_data_path, "sets.json")) as f:
_set_item_map = json.load(f)
return _set_item_map[str(id)]
def lookup_runeword(id: int) -> dict:
global _runeword_map
if _runeword_map is None:
with open(os.path.join(_data_path, "runewords.json")) as f:
_runeword_map = json.load(f)
return _runeword_map[str(id)]
def lookup_affix(id: int, prefix: bool) -> dict:
global _affix_map
if _affix_map is None:
with open(os.path.join(_data_path, "affixes.json")) as f:
_affix_map = json.load(f)
return _affix_map["prefixes" if prefix else "suffixes"][str(id)]