Files
d2warehouse/d2warehouse/item.py

427 lines
14 KiB
Python

# Copyright 2023 <omicron.me@protonmail.com>
# Copyright 2023 <andreasruden91@gmail.com>
#
# This file is part of d2warehouse.
#
# d2warehouse is free software: you can redistribute it and/or modify it under the
# terms of the GNU General Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your option) any later
# version.
#
# d2warehouse is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with
# Mercator. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import re
from typing import Optional
from bitarray import bitarray
from bitarray.util import int2ba
from dataclasses import dataclass
from enum import IntEnum
from d2warehouse.fileformat import STASH_TAB_VERSION
from d2warehouse.db import get_db
_data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
_basetype_map = None
_stats_map = None
_unique_map = None
_set_item_map = None
_runeword_map = None
class Quality(IntEnum):
LOW = 1
NORMAL = 2
HIGH = 3
MAGIC = 4
SET = 5
RARE = 6
UNIQUE = 7
CRAFTED = 8
def __str__(self) -> str:
return self.name.capitalize()
class LowQualityType(IntEnum):
CRUDE = 0
CRACKED = 1
DAMAGED = 2
LOW_QUALITY = 3
def __str__(self) -> str:
return self.name.capitalize()
@dataclass
class Stat:
id: int | None = None # TODO: These 3 should probably not be optional
values: list[int] | None = None
parameter: int | None = None
text: str | None = None # TODO: Make this a property
def print(self, indent=5):
print(" " * indent, str(self))
def __str__(self):
param = self.parameter
if isinstance(self.text, list):
subst_text = next(filter(lambda v: v["param"] == param, self.text))["text"]
param = None
else:
subst_text = self.text
for val in self.values:
subst_text = subst_text.replace("#", str(val), 1)
if param:
subst_text = re.sub(r"\[[^\]]*\]", str(param), subst_text, 1)
return subst_text
def txtbits(bits: bitarray) -> str:
txt = "".join(str(b) for b in bits)
grouped = [txt[i : i + 8] for i in range(0, len(txt), 8)]
return " ".join(grouped)
@dataclass
class Item:
raw_data: bytes
raw_version: int
is_identified: bool
is_socketed: bool
is_beginner: bool
is_simple: bool
is_ethereal: bool
is_personalized: bool
is_runeword: bool
pos_x: int
pos_y: int
code: str
uid: int | None = None
lvl: int | None = None
quality: Quality | None = None
graphic: int | None = None
implicit: int | None = None
low_quality: LowQualityType | None = None
prefixes: list[int] | None = None
suffixes: list[int] | None = None
set_id: int | None = None
unique_id: int | None = None
nameword1: int | None = None
nameword2: int | None = None
runeword_id: int | None = None
personal_name: str | None = None
defense: int | None = None
durability: int | None = None
max_durability: int | None = None
sockets: list[Optional["Item"]] | None = None
quantity: int | None = None
stats: list[Stat] | None = None
def print(self, indent=5, with_raw=False):
properties = []
base_name = lookup_basetype(self.code)["name"]
print(" " * indent, f"{base_name} ({self.code})")
if self.lvl:
print(" " * indent, f"ilvl {self.lvl}")
if self.is_simple:
properties.append("Simple")
else:
properties.append("Extended")
if self.is_ethereal:
properties.append("Ethereal")
if not self.is_identified:
properties.append("Unidentified")
if self.is_socketed:
properties.append("Socketed")
if self.is_runeword:
properties.append("Runeword")
if properties:
print(" " * indent, ", ".join(properties))
print(" " * indent, f"at {self.pos_x}, {self.pos_y}")
if self.quality:
print(" " * indent, self.quality)
if self.prefixes:
print(" " * indent, "Prefixes:", self.prefixes)
if self.suffixes:
print(" " * indent, "Suffixes:", self.suffixes)
if self.set_id:
itm = lookup_set_item(self.set_id)
print(" " * indent, f"{itm['name']} ({self.set_id}), part of {itm['set']}")
if self.unique_id:
itm = lookup_unique(self.unique_id)
print(" " * indent, f"{itm['name']} ({self.unique_id})")
if self.runeword_id:
rw = lookup_runeword(self.runeword_id)
print(" " * indent, f"{rw['name']} runeword")
if self.personal_name:
print(" " * indent, f"Personal name: {self.personal_name}")
if self.defense:
print(" " * indent, f"Defense: {self.defense}")
if self.durability is not None:
print(
" " * indent,
f"Durability: {self.durability} out of {self.max_durability}",
)
if self.is_socketed:
print(" " * indent, f"{len(self.sockets)} sockets:")
for socket in self.sockets:
if socket:
socket.print(indent + 4)
else:
print(" " * (indent + 4), "Empty")
if self.quantity:
print(" " * indent, f"Quantity: {self.quantity}")
if self.stats:
print(" " * indent, "Stats:")
for stat in self.stats:
stat.print(indent + 4)
if with_raw:
print(" " * indent, "Raw Item Data:")
bits = bitarray(endian="little")
bits.frombytes(self.raw_data)
print(" " * indent, txtbits(bits))
print(" " * indent, self.raw_data.hex())
print("")
print("")
def set_position(self, x: int, y: int) -> None:
if x < 0 or x > 15 or y < 0 or y > 15:
raise ValueError("position needs to be in range [0, 16)")
self.pos_x = x
self.pos_y = y
bits = bitarray(endian="little")
bits.frombytes(self.raw_data)
bits[42:46] = int2ba(x, 4, endian="little")
bits[46:50] = int2ba(y, 4, endian="little")
self.raw_data = bits.tobytes()
def size(self) -> tuple[int, int]:
base = lookup_basetype(self.code)
return base["width"], base["height"]
def write_to_db(self, socketed_into=None, commit=True) -> int:
name = lookup_basetype(self.code)["name"]
# FIXME: handle magic & rare names
if self.is_runeword:
name = lookup_runeword(self.runeword_id)["name"]
elif self.quality == Quality.SET:
name = lookup_set_item(self.set_id)["name"]
elif self.quality == Quality.UNIQUE:
name = lookup_unique(self.unique_id)["name"]
set_name = (
lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None
)
db = get_db()
cur = db.cursor()
cur.execute(
"""INSERT INTO item (itembase_name, item_name,
set_name, socketed_into, raw_data, raw_version, is_identified, is_socketed,
is_beginner, is_simple, is_ethereal, is_personalized, is_runeword,
code, uid, lvl, quality, graphic, implicit, low_quality, set_id,
unique_id, nameword1, nameword2, runeword_id, personal_name,
defense, durability, max_durability, quantity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?)""",
(
lookup_basetype(self.code)["name"],
name,
set_name,
socketed_into,
self.raw_data,
self.raw_version,
self.is_identified,
self.is_socketed,
self.is_beginner,
self.is_simple,
self.is_ethereal,
self.is_personalized,
self.is_runeword,
self.code,
self.uid,
self.lvl,
int(self.quality) if self.quality else None,
self.graphic,
self.implicit,
int(self.low_quality) if self.low_quality else None,
self.set_id,
self.unique_id,
self.nameword1,
self.nameword2,
self.runeword_id,
self.personal_name,
self.defense,
self.durability,
self.max_durability,
self.quantity,
),
)
item_id = cur.lastrowid
if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
for prefix, id in [(False, id) for id in self.suffixes] + [
(True, id) for id in self.prefixes
]:
db.execute(
"INSERT INTO item_affix (item_id, prefix, affix_id) VALUES (?, ?, ?)",
(item_id, prefix, id),
)
if self.stats:
for stat in self.stats:
db.execute(
"""INSERT INTO item_stat (item_id, stat, value1, value2, value3, parameter)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
item_id,
stat.id,
stat.values[0] if len(stat.values) > 0 else None,
stat.values[1] if len(stat.values) > 1 else None,
stat.values[2] if len(stat.values) > 2 else None,
stat.parameter,
),
)
if self.sockets:
for socket in self.sockets:
socket.write_to_db(socketed_into=item_id, commit=False)
if commit:
db.commit()
return item_id
def load_from_db(id: int) -> "Item":
db = get_db()
row = db.execute(
"""SELECT raw_data, raw_version, is_identified, is_socketed,
is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code,
uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id,
nameword1, nameword2, runeword_id, personal_name, defense, durability,
max_durability, quantity
FROM item WHERE id = ?""",
(id,),
).fetchone()
if row["raw_version"] != STASH_TAB_VERSION:
raise RuntimeError("Can not load item, the raw version is not supported")
item = Item(
raw_data=row["raw_data"],
raw_version=row["raw_version"],
is_identified=bool(row["is_identified"]),
is_socketed=bool(row["is_socketed"]),
is_beginner=bool(row["is_beginner"]),
is_simple=bool(row["is_simple"]),
is_ethereal=bool(row["is_ethereal"]),
is_personalized=bool(row["is_personalized"]),
is_runeword=bool(row["is_runeword"]),
pos_x=0,
pos_y=0,
code=row["code"],
uid=row["uid"],
lvl=row["lvl"],
quality=Quality(row["quality"]) if row["quality"] else None,
graphic=row["graphic"],
implicit=row["implicit"],
low_quality=LowQualityType(row["low_quality"])
if row["low_quality"]
else None,
set_id=row["set_id"],
unique_id=row["unique_id"],
nameword1=row["nameword1"],
nameword2=row["nameword2"],
runeword_id=row["runeword_id"],
personal_name=row["personal_name"],
defense=row["defense"],
durability=row["durability"],
max_durability=row["max_durability"],
quantity=row["quantity"],
)
if item.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]:
rows = db.execute(
"SELECT prefix, affix_id FROM item_affix WHERE item_id = ?", (id,)
)
item.prefixes = []
item.suffixes = []
for row in rows:
if row["prefix"]:
item.prefixes.append(row["affix_id"])
else:
item.suffixes.append(row["affix_id"])
rows = db.execute(
"SELECT id FROM item WHERE socketed_into = ?", (id,)
).fetchall()
if len(rows) > 0:
item.sockets = []
for row in rows:
socket = Item.load_from_db(row["id"])
socket.pos_x = len(item.sockets)
item.sockets.append(socket)
rows = db.execute(
"SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?",
(id,),
).fetchall()
if len(rows) > 0:
item.stats = []
for row in rows:
values = []
for i in range(1, 4):
if row[f"value{i}"] is not None:
values.append(row[f"value{i}"])
stat = Stat(id=row["stat"], values=values, parameter=row["parameter"])
stat_data = lookup_stat(stat.id)
stat.text = stat_data["text"]
item.stats.append(stat)
return item
def lookup_basetype(code: str) -> dict:
global _basetype_map
if _basetype_map is None:
with open(os.path.join(_data_path, "items.json")) as f:
_basetype_map = json.load(f)
return _basetype_map[code]
def lookup_stat(id: int) -> dict:
global _stats_map
if _stats_map is None:
with open(os.path.join(_data_path, "stats.json")) as f:
_stats_map = json.load(f)
return _stats_map[str(id)]
def lookup_unique(id: int) -> dict:
global _unique_map
if _unique_map is None:
with open(os.path.join(_data_path, "uniques.json")) as f:
_unique_map = json.load(f)
return _unique_map[str(id)]
def lookup_set_item(id: int) -> dict:
global _set_item_map
if _set_item_map is None:
with open(os.path.join(_data_path, "sets.json")) as f:
_set_item_map = json.load(f)
return _set_item_map[str(id)]
def lookup_runeword(id: int) -> dict:
global _runeword_map
if _runeword_map is None:
with open(os.path.join(_data_path, "runewords.json")) as f:
_runeword_map = json.load(f)
return _runeword_map[str(id)]