# Copyright 2023 # Copyright 2023 # # This file is part of d2warehouse. # # d2warehouse is free software: you can redistribute it and/or modify it under the # terms of the GNU General Public License as published by the Free Software # Foundation, either version 3 of the License, or (at your option) any later # version. # # d2warehouse is distributed in the hope that it will be useful, but WITHOUT ANY # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR # A PARTICULAR PURPOSE. See the GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along with # Mercator. If not, see . import json import os import re from bitarray import bitarray from bitarray.util import int2ba from dataclasses import dataclass from enum import IntEnum from d2warehouse.fileformat import STASH_TAB_VERSION from d2warehouse.db import get_db _data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") _basetype_map = None _stats_map = None _unique_map = None _set_item_map = None _runeword_map = None _affix_map = None _skills_map = None class Quality(IntEnum): LOW = 1 NORMAL = 2 HIGH = 3 MAGIC = 4 SET = 5 RARE = 6 UNIQUE = 7 CRAFTED = 8 def __str__(self) -> str: return self.name.capitalize() class LowQualityType(IntEnum): CRUDE = 0 CRACKED = 1 DAMAGED = 2 LOW_QUALITY = 3 def __str__(self) -> str: return self.name.capitalize() @dataclass class Stat: id: int | None = None # TODO: These 3 should probably not be optional values: list[int] | None = None parameter: int | None = None text: str | None = None # TODO: Make this a property def print(self, indent=5): print(" " * indent, str(self)) def __str__(self): param = self.parameter if isinstance(self.text, list): subst_text = next(filter(lambda v: v["param"] == param, self.text))["text"] param = None else: subst_text = self.text for val in self.values: subst_text = subst_text.replace("#", str(val), 1) if param: subst_text = self.try_add_skill_text(subst_text) return subst_text def try_add_skill_text(self, subst_text: str) -> str | None: if self.id == 107: # +X to [Skill] ([Class] only) return re.sub(r"\[[^\]]*\]", lookup_skill_name(self.parameter), subst_text, 1) elif self.id == 188: # +X to [Skill] return re.sub(r"\[[^\]]*\]", lookup_talent_tree(self.parameter) + f"({self.parameter})", subst_text, 1) else: return re.sub(r"\[[^\]]*\]", str(self.parameter), subst_text, 1) def txtbits(bits: bitarray) -> str: txt = "".join(str(b) for b in bits) grouped = [txt[i : i + 8] for i in range(0, len(txt), 8)] return " ".join(grouped) @dataclass class Item: raw_data: bytes raw_version: int is_identified: bool is_socketed: bool is_beginner: bool is_simple: bool is_ethereal: bool is_personalized: bool is_runeword: bool pos_x: int pos_y: int code: str uid: int | None = None lvl: int | None = None quality: Quality | None = None graphic: int | None = None implicit: int | None = None low_quality: LowQualityType | None = None prefixes: list[int] | None = None suffixes: list[int] | None = None set_id: int | None = None unique_id: int | None = None nameword1: int | None = None nameword2: int | None = None runeword_id: int | None = None personal_name: str | None = None defense: int | None = None durability: int | None = None max_durability: int | None = None sockets: int | None = None socketed_items: list["Item"] | None = None quantity: int | None = None stats: list[Stat] | None = None @property def basename(self) -> str: return lookup_basetype(self.code)["name"] @property def name(self) -> str: match self.quality: case Quality.LOW: return f"{self.low_quality} {self.basename}" case Quality.NORMAL | None: return self.basename case Quality.HIGH: return f"Superior {self.basename}" case Quality.MAGIC: return f" {self.basename} " case Quality.SET: assert self.set_id is not None return lookup_set_item(self.set_id)["name"] case Quality.RARE: # FIXME return " " case Quality.UNIQUE: assert self.unique_id is not None return lookup_unique(self.unique_id)["name"] case Quality.CRAFTED: # FIXME return " " case _: # TODO: In 3.11 replace this with assert_never assert False, "Should be unreachable" @property def color(self) -> str: if self.is_runeword: return "runeword" match self.quality: case Quality.LOW: return "low" case Quality.NORMAL | None: return "normal" case Quality.HIGH: return "normal" case Quality.MAGIC: return "magic" case Quality.SET: return "set" case Quality.RARE: return "rare" case Quality.UNIQUE: return "unique" case Quality.CRAFTED: return "crafted" case _: # TODO: In 3.11 replace this with assert_never assert False, "Should be unreachable" def raw(self): parts = [self.raw_data] if self.socketed_items: for item in self.socketed_items: parts.append(item.raw_data) return b"".join(parts) def print(self, indent=5, with_raw=False): properties = [] base_name = lookup_basetype(self.code)["name"] print(" " * indent, f"{base_name} ({self.code})") if self.lvl: print(" " * indent, f"ilvl {self.lvl}") if self.is_simple: properties.append("Simple") else: properties.append("Extended") if self.is_ethereal: properties.append("Ethereal") if not self.is_identified: properties.append("Unidentified") if self.is_socketed: properties.append("Socketed") if self.is_runeword: properties.append("Runeword") if properties: print(" " * indent, ", ".join(properties)) print(" " * indent, f"at {self.pos_x}, {self.pos_y}") if self.quality: print(" " * indent, self.quality) if self.prefixes: print(" " * indent, "Prefixes:", self.prefixes) if self.suffixes: print(" " * indent, "Suffixes:", self.suffixes) if self.set_id: itm = lookup_set_item(self.set_id) print(" " * indent, f"{itm['name']} ({self.set_id}), part of {itm['set']}") if self.unique_id: itm = lookup_unique(self.unique_id) print(" " * indent, f"{itm['name']} ({self.unique_id})") if self.runeword_id: rw = lookup_runeword(self.runeword_id) print(" " * indent, f"{rw['name']} runeword") if self.personal_name: print(" " * indent, f"Personal name: {self.personal_name}") if self.defense: print(" " * indent, f"Defense: {self.defense}") if self.durability is not None: print( " " * indent, f"Durability: {self.durability} out of {self.max_durability}", ) if self.is_socketed: print(" " * indent, f"{len(self.socketed_items)}/{self.sockets} sockets:") for socket in self.socketed_items: socket.print(indent + 4) if self.quantity: print(" " * indent, f"Quantity: {self.quantity}") if self.stats: print(" " * indent, "Stats:") for stat in self.stats: stat.print(indent + 4) if with_raw: print(" " * indent, "Raw Item Data:") bits = bitarray(endian="little") bits.frombytes(self.raw_data) print(" " * indent, txtbits(bits)) print(" " * indent, self.raw_data.hex()) print("") print("") def set_position(self, x: int, y: int) -> None: if x < 0 or x > 15 or y < 0 or y > 15: raise ValueError("position needs to be in range [0, 16)") self.pos_x = x self.pos_y = y bits = bitarray(endian="little") bits.frombytes(self.raw_data) bits[42:46] = int2ba(x, 4, endian="little") bits[46:50] = int2ba(y, 4, endian="little") self.raw_data = bits.tobytes() def size(self) -> tuple[int, int]: base = lookup_basetype(self.code) return base["width"], base["height"] def requirements(self) -> dict: base = lookup_basetype(self.code) # TODO: How is class requirements determined on basetypes? reqs = { "lvl": base["req_lvl"], "dex": base["req_dex"], "str": base["req_str"], "class": None, } # TODO: Can implicit mod end up increasing requirements? if self.quality == Quality.UNIQUE: reqs["lvl"] = lookup_unique(self.unique_id)["req_lvl"] elif self.quality == Quality.SET: reqs["lvl"] = lookup_set_item(self.set_id)["req_lvl"] elif self.is_runeword: # TODO: What affects runeword level? Only the sockets? pass elif self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]: for m, id in [(False, id) for id in self.suffixes] + [ (True, id) for id in self.prefixes ]: if id == 0: continue affix = lookup_affix(id, m) reqs["lvl"] = max(reqs["lvl"], affix["req_lvl"]) if affix["req_class"]: reqs["class"] = affix["req_class"] if self.socketed_items: for socket_item in self.socketed_items: socket_reqs = socket_item.requirements() reqs["lvl"] = max(reqs["lvl"], socket_reqs["lvl"]) return reqs def write_to_db(self, socketed_into=None, commit=True, db=None) -> int: if db is None: db = get_db() name = lookup_basetype(self.code)["name"] # FIXME: handle magic & rare names if self.is_runeword: name = lookup_runeword(self.runeword_id)["name"] elif self.quality == Quality.SET: name = lookup_set_item(self.set_id)["name"] elif self.quality == Quality.UNIQUE: name = lookup_unique(self.unique_id)["name"] set_name = ( lookup_set_item(self.set_id)["set"] if self.quality == Quality.SET else None ) req = self.requirements() cur = db.cursor() cur.execute( """INSERT INTO item (itembase_name, socketed_into, raw_data, raw_version, is_identified, is_socketed, is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code, req_lvl) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( lookup_basetype(self.code)["name"], socketed_into, self.raw_data, self.raw_version, self.is_identified, self.is_socketed, self.is_beginner, self.is_simple, self.is_ethereal, self.is_personalized, self.is_runeword, self.code, req["lvl"], ), ) item_id = cur.lastrowid if not self.is_simple: cur.execute( """INSERT INTO item_extra (item_id, item_name, set_name, uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id, nameword1, nameword2, runeword_id, personal_name, defense, durability, max_durability, sockets, quantity, req_str, req_dex, req_class) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( item_id, name, set_name, self.uid, self.lvl, int(self.quality) if self.quality else None, self.graphic, self.implicit, int(self.low_quality) if self.low_quality else None, self.set_id, self.unique_id, self.nameword1, self.nameword2, self.runeword_id, self.personal_name, self.defense, self.durability, self.max_durability, self.sockets, self.quantity, req["str"], req["dex"], req["class"], ), ) if self.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]: for prefix, id in [(False, id) for id in self.suffixes] + [ (True, id) for id in self.prefixes ]: db.execute( "INSERT INTO item_affix (item_id, prefix, affix_id) VALUES (?, ?, ?)", (item_id, prefix, id), ) if self.stats: for stat in self.stats: db.execute( """INSERT INTO item_stat (item_id, stat, value1, value2, value3, parameter) VALUES (?, ?, ?, ?, ?, ?) """, ( item_id, stat.id, stat.values[0] if len(stat.values) > 0 else None, stat.values[1] if len(stat.values) > 1 else None, stat.values[2] if len(stat.values) > 2 else None, stat.parameter, ), ) if self.socketed_items: for socket_item in self.socketed_items: socket_item.write_to_db(socketed_into=item_id, commit=False, db=db) if commit: db.commit() return item_id def load_from_db(id: int, db=None) -> "Item": if db is None: db = get_db() row = db.execute( """SELECT raw_data, raw_version, is_identified, is_socketed, is_beginner, is_simple, is_ethereal, is_personalized, is_runeword, code, uid, lvl, quality, graphic, implicit, low_quality, set_id, unique_id, nameword1, nameword2, runeword_id, personal_name, defense, durability, max_durability, sockets, quantity FROM item LEFT JOIN item_extra ON id = item_id WHERE id = ?""", (id,), ).fetchone() if row["raw_version"] != STASH_TAB_VERSION: raise RuntimeError("Can not load item, the raw version is not supported") item = Item( raw_data=row["raw_data"], raw_version=row["raw_version"], is_identified=bool(row["is_identified"]), is_socketed=bool(row["is_socketed"]), is_beginner=bool(row["is_beginner"]), is_simple=bool(row["is_simple"]), is_ethereal=bool(row["is_ethereal"]), is_personalized=bool(row["is_personalized"]), is_runeword=bool(row["is_runeword"]), pos_x=0, pos_y=0, code=row["code"], uid=row["uid"], lvl=row["lvl"], quality=Quality(row["quality"]) if row["quality"] else None, graphic=row["graphic"], implicit=row["implicit"], low_quality=LowQualityType(row["low_quality"]) if row["low_quality"] else None, set_id=row["set_id"], unique_id=row["unique_id"], nameword1=row["nameword1"], nameword2=row["nameword2"], runeword_id=row["runeword_id"], personal_name=row["personal_name"], defense=row["defense"], durability=row["durability"], max_durability=row["max_durability"], sockets=row["sockets"], quantity=row["quantity"], ) if item.quality in [Quality.MAGIC, Quality.RARE, Quality.CRAFTED]: rows = db.execute( "SELECT prefix, affix_id FROM item_affix WHERE item_id = ?", (id,) ) item.prefixes = [] item.suffixes = [] for row in rows: if row["prefix"]: item.prefixes.append(row["affix_id"]) else: item.suffixes.append(row["affix_id"]) if item.is_socketed: item.socketed_items = [] rows = db.execute( "SELECT id FROM item WHERE socketed_into = ?", (id,) ).fetchall() if len(rows) > 0: for row in rows: socket_item = Item.load_from_db(row["id"], db=db) socket_item.pos_x = len(item.socketed_items) item.socketed_items.append(socket_item) rows = db.execute( "SELECT stat, value1, value2, value3, parameter FROM item_stat WHERE item_id = ?", (id,), ).fetchall() if len(rows) > 0: item.stats = [] for row in rows: values = [] for i in range(1, 4): if row[f"value{i}"] is not None: values.append(row[f"value{i}"]) stat = Stat(id=row["stat"], values=values, parameter=row["parameter"]) stat_data = lookup_stat(stat.id) stat.text = stat_data["text"] item.stats.append(stat) return item def lookup_basetype(code: str) -> dict: global _basetype_map if _basetype_map is None: with open(os.path.join(_data_path, "items.json")) as f: _basetype_map = json.load(f) return _basetype_map[code] def lookup_stat(id: int) -> dict: global _stats_map if _stats_map is None: with open(os.path.join(_data_path, "stats.json")) as f: _stats_map = json.load(f) return _stats_map[str(id)] def lookup_unique(id: int) -> dict: global _unique_map if _unique_map is None: with open(os.path.join(_data_path, "uniques.json")) as f: _unique_map = json.load(f) return _unique_map[str(id)] def lookup_set_item(id: int) -> dict: global _set_item_map if _set_item_map is None: with open(os.path.join(_data_path, "sets.json")) as f: _set_item_map = json.load(f) return _set_item_map[str(id)] def lookup_runeword(id: int) -> dict: global _runeword_map if _runeword_map is None: with open(os.path.join(_data_path, "runewords.json")) as f: _runeword_map = json.load(f) return _runeword_map[str(id)] def lookup_affix(id: int, prefix: bool) -> dict: global _affix_map if _affix_map is None: with open(os.path.join(_data_path, "affixes.json")) as f: _affix_map = json.load(f) return _affix_map["prefixes" if prefix else "suffixes"][str(id)] def _get_skills_map(): global _skills_map if _skills_map is None: with open(os.path.join(_data_path, "skills.json")) as f: _skills_map = json.load(f) return _skills_map def lookup_skill_name(id: int) -> str: # NOTE: The mapping seems not straight forward # for Powerstrike and Impale the id given is 14 and 19 and the key is "skillname14" and "skillname19" # for Feral Rage id 232 is given but the key is "Skillname233" skills_map = _get_skills_map() try: try: return skills_map[f"Skillname{id + 1}"] except KeyError: return skills_map[f"skillname{id}"] except KeyError: return f"" def lookup_talent_tree(id: int) -> str: # TODO: I do not know how to translate this Id to e.g. SkillCategoryBa1 class_guess = lookup_class(int(id / 10) + 1, id) try: return f"" except AssertionError: return "" def lookup_class(id: int, dbg) -> str: match id: case 0: return "Amazon" case 1: return "Sorceress" case 2: return "Necromancer" case 3: return "Paladin" case 4: return "Barbarian" case 5: return "Druid" case 6: return "Assassin" case _: # TODO: In 3.11 replace this with assert_never assert False, f"{id} Should be unreachable: {dbg}"