diff --git a/app/DataBase/hard_link.py b/app/DataBase/hard_link.py index f802bc5..177c497 100644 --- a/app/DataBase/hard_link.py +++ b/app/DataBase/hard_link.py @@ -61,18 +61,18 @@ def decodeExtraBuf(extra_buf_content: bytes): off = extra_buf_content.index(key) + 4 except: pass - char = extra_buf_content[off : off + 1] + char = extra_buf_content[off: off + 1] off += 1 if char == b"\x04": # 四个字节的int,小端序 - intContent = extra_buf_content[off : off + 4] + intContent = extra_buf_content[off: off + 4] off += 4 intContent = int.from_bytes(intContent, "little") res[trunk_head] = intContent elif char == b"\x18": # utf-16字符串 - lengthContent = extra_buf_content[off : off + 4] + lengthContent = extra_buf_content[off: off + 4] off += 4 lengthContent = int.from_bytes(lengthContent, "little") - strContent = extra_buf_content[off : off + lengthContent] + strContent = extra_buf_content[off: off + lengthContent] off += lengthContent res[trunk_head] = strContent.decode("utf-16").rstrip("\x00") return { @@ -91,7 +91,6 @@ def decodeExtraBuf(extra_buf_content: bytes): } - def singleton(cls): _instance = {} @@ -179,27 +178,64 @@ class HardLink: finally: video_db_lock.release() - def get_image(self, content, bytesExtra, thumb=False): + def get_image_original(self, content, bytesExtra) -> str: msg_bytes = MessageBytesExtra() msg_bytes.ParseFromString(bytesExtra) + result = '' for tmp in msg_bytes.message2: - if tmp.field1 != (3 if thumb else 4): + if tmp.field1 != 4: continue pathh = tmp.field2 # wxid\FileStorage\... pathh = "\\".join(pathh.split("\\")[1:]) return pathh md5 = get_md5_from_xml(content) if not md5: - return None - result = self.get_image_by_md5(binascii.unhexlify(md5)) + pass + else: + result = self.get_image_by_md5(binascii.unhexlify(md5)) + if result: + dir1 = result[3] + dir2 = result[4] + data_image = result[2] + dir0 = "Image" + dat_image = os.path.join(root_path, dir1, dir0, dir2, data_image) + result = dat_image + return result - if result: - dir1 = result[3] - dir2 = result[4] - data_image = result[2] - dir0 = "Thumb" if thumb else "Image" - dat_image = os.path.join(root_path, dir1, dir0, dir2, data_image) - return dat_image + def get_image_thumb(self, content, bytesExtra) -> str: + msg_bytes = MessageBytesExtra() + msg_bytes.ParseFromString(bytesExtra) + result = '' + for tmp in msg_bytes.message2: + if tmp.field1 != 3: + continue + pathh = tmp.field2 # wxid\FileStorage\... + pathh = "\\".join(pathh.split("\\")[1:]) + return pathh + md5 = get_md5_from_xml(content) + if not md5: + pass + else: + result = self.get_image_by_md5(binascii.unhexlify(md5)) + if result: + dir1 = result[3] + dir2 = result[4] + data_image = result[2] + dir0 = "Thumb" + dat_image = os.path.join(root_path, dir1, dir0, dir2, data_image) + result = dat_image + return result + + def get_image(self, content, bytesExtra, thumb=False) -> str: + msg_bytes = MessageBytesExtra() + msg_bytes.ParseFromString(bytesExtra) + if thumb: + result = self.get_image_thumb(content, bytesExtra) + else: + result = self.get_image_original(content, bytesExtra) + if not (result and os.path.exists(result)): + result = self.get_image_thumb(content, bytesExtra) + return result def get_video(self, content, bytesExtra, thumb=False): msg_bytes = MessageBytesExtra() @@ -212,7 +248,7 @@ class HardLink: return pathh md5 = get_md5_from_xml(content, type_="video") if not md5: - return None + return '' result = self.get_video_by_md5(binascii.unhexlify(md5)) if result: dir2 = result[3] diff --git a/app/util/image.py b/app/util/image.py index 47c09c8..0857ee7 100644 --- a/app/util/image.py +++ b/app/util/image.py @@ -13,7 +13,7 @@ pic_head = [0xff, 0xd8, 0x89, 0x50, 0x47, 0x49] decode_code = 0 -def get_code(dat_read): +def get_code(dat_read) -> tuple[int, int]: """ 自动判断文件类型,并获取dat文件解密码 :param file_path: dat文件路径 @@ -39,7 +39,7 @@ def get_code(dat_read): return -1, -1 -def decode_dat(file_path, out_path): +def decode_dat(file_path, out_path) -> str: """ 解密文件,并生成图片 :param file_path: dat文件路径 @@ -49,10 +49,10 @@ def decode_dat(file_path, out_path): return None with open(file_path, 'rb') as file_in: data = file_in.read() - data = get_code(data[:2]) - file_type, decode_code = data + + file_type, decode_code = get_code(data[:2]) if decode_code == -1: - return + return '' filename = os.path.basename(file_path) if file_type == 1: @@ -74,21 +74,19 @@ def decode_dat(file_path, out_path): return file_outpath -def decode_dat_path(file_path, out_path): +def decode_dat_path(file_path, out_path) -> str: """ 解密文件,并生成图片 :param file_path: dat文件路径 :return: 无 """ if not os.path.exists(file_path): - return None + return '' with open(file_path, 'rb') as file_in: data = file_in.read(2) file_type, decode_code = get_code(data) - if decode_code == -1: - return - + return '' filename = os.path.basename(file_path) if file_type == 1: pic_name = os.path.basename(file_path)[:-4] + ".jpg" @@ -102,18 +100,6 @@ def decode_dat_path(file_path, out_path): return file_outpath -def find_datfile(dir_path, out_path): - """ - 获取dat文件目录下所有的文件 - :param dir_path: dat文件目录 - :return: 无 - """ - files_list = os.listdir(dir_path) - for file_name in files_list: - file_path = dir_path + "\\" + file_name - decode_dat(file_path, out_path) - - def get_image(path, base_path) -> str: if path: base_path = os.getcwd() + base_path