import os import pytsk3 from ntfs_utils.db_config import GetNTFSBootInfo def find_file_mft_entry(fs, target_path): """ 在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号 """ def traverse_directory(inode, path_components): if not path_components: return inode dir_name = path_components[0].lower() try: directory = fs.open_dir(inode=inode) except Exception as e: print(f"Error opening directory with inode {inode}: {e}") return None for entry in directory: if not entry.info or not entry.info.name or not entry.info.meta: continue name = entry.info.name.name.decode('utf-8', errors='ignore').lower() meta = entry.info.meta # 匹配当前层级目录或文件名 if name == dir_name: if len(path_components) == 1: # 是目标文件/目录 return meta.addr elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR: # 继续深入查找子目录 next_inode = entry.info.meta.addr result = traverse_directory(next_inode, path_components[1:]) if result: return result return None # 拆分路径 path_parts = target_path.strip("\\").lower().split("\\") root_inode = fs.info.root_inum # 根目录 MFT Entry return traverse_directory(root_inode, path_parts) def GetFileMftEntry(file_path): """ 获取指定文件在 NTFS 中的 MFT Entry 编号 """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # 获取驱动器字母 drive_letter = os.path.splitdrive(file_path)[0][0] device = f"\\\\.\\{drive_letter}:" # print(f"Opening device: {device}") try: img = pytsk3.Img_Info(device) fs = pytsk3.FS_Info(img) except Exception as e: raise RuntimeError(f"Failed to open device '{device}': {e}") # 构建相对路径 abs_path = os.path.abspath(file_path) root_path = f"{drive_letter}:\\" rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\") # print(f"Looking up MFT entry for: {rel_path}") mft_entry = find_file_mft_entry(fs, rel_path) # print(f"MFT Entry: {mft_entry}") if mft_entry is None: raise RuntimeError("Could not find MFT entry for the specified file.") return mft_entry def CalculateFileMftStartSector(mft_entry, volume_letter="Z"): """ 根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号 参数: mft_entry (int): 文件的 MFT Entry 编号(即 inode) mft_start_sector (int): $MFT 的起始扇区号,默认 6291456 mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024 bytes_per_sector (int): 每扇区字节数,默认 512 返回: int: 文件 MFT Entry 的起始扇区号 """ if mft_entry < 0: raise ValueError("MFT Entry 编号不能为负数") # 获取 NTFS 引导信息 config_data = GetNTFSBootInfo(volume_letter) # 计算文件 MFT Entry 的起始扇区号 start_sector = config_data["MftPosition"] * 8 + mft_entry * 2 if start_sector < 0: raise ValueError("起始扇区号不能为负数") # print(f"文件 MFT Entry 的起始扇区号: {start_sector}") return start_sector def Get80hPattern(sector_number, volume_letter="Z"): """ 读取NTFS扇区并查找特定模式的数据 参数: sector_number (int): 要读取的扇区号 drive_path (str): 磁盘设备路径,默认为Z盘 返回: list: 包含所有匹配信息的列表,每个元素为: { 'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512), 'offset': 当前80属性在扇区内的偏移位置, 'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."), 'is_resident': 是否为常驻属性, 'total_groups': 实际读取的组数, 'attribute_length': 属性总长度(字节) } """ drive_path = fr"\\.\{volume_letter}:" SECTOR_SIZE = 512 GROUP_SIZE = 8 # 每组8字节 MATCH_BYTE = 0x80 # 要匹配的起始字节 results = [] try: with open(drive_path, 'rb') as disk: disk.seek(sector_number * SECTOR_SIZE) sector_data = disk.read(SECTOR_SIZE) if not sector_data or len(sector_data) < GROUP_SIZE: print(f"错误: 无法读取扇区 {sector_number}") return results groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)] for i in range(len(groups)): current_group = groups[i] if len(current_group) < GROUP_SIZE: continue if current_group[0] == MATCH_BYTE: # 获取第5~8字节作为属性长度(小端DWORD) if i + 1 >= len(groups): print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h") continue attribute_length_bytes = b''.join([ groups[i][4:8], # 第一组的4~7字节 groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00' ]) attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little') # 计算要读取的组数(向上取整到8字节) total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE end_idx = i + total_groups if end_idx > len(groups): print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h") continue raw_sequence = groups[i:end_idx] # 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00" formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence] # 判断是否为常驻属性(查看第2个组第一个字节最低位) is_resident = False if len(raw_sequence) >= 2: second_group = raw_sequence[1] is_resident = (second_group[0] & 0x01) == 0x00 result_entry = { 'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置 'offset': i * GROUP_SIZE, 'sequence': formatted_sequence, 'is_resident': is_resident, 'total_groups': total_groups, 'attribute_length': attribute_length } results.append(result_entry) # resident_str = "常驻" if is_resident else "非常驻" # print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:") # print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:") # for j, group in enumerate(formatted_sequence): # print(f"组 {j + 1}: {group}") # # print(f"\n共找到 {len(results)} 个匹配序列") return results except PermissionError: print("错误: 需要管理员权限访问磁盘设备") except Exception as e: print(f"发生错误: {str(e)}") return results def GetFile80hPattern(file_path): volume_letter = file_path.split(':')[0] try: mft_entry_value = GetFileMftEntry(file_path) StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter) # print(f"文件的相关信息以及80属性内容:") # print(Get80hPattern(StartSector, volume_letter)) file80h_pattern = Get80hPattern(StartSector, volume_letter) return file80h_pattern except Exception as e: print(f"❌ Error: {e}") return None # if __name__ == '__main__': # data = GetFile80hPattern(r"Z:\hello.txt") # print(data) def ExtractSequenceHexValues(file80h_pattern): """ 从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表 参数: data (list): 包含字典的列表,每个字典有 'sequence' 键 返回: list: 包含所有 sequence 值的合并列表 """ sequence_list = [] for entry in file80h_pattern: if 'sequence' in entry: # 将每个十六进制字符串按空格分割,然后合并到结果列表 for hex_str in entry['sequence']: # 分割字符串并添加到结果 sequence_list.extend(hex_str.split()) return sequence_list def ExportDataRunList(data_run): """ 将 data_run 中的多个 Data Run 提取为独立的 list 片段。 参数: data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容 返回: list: 每个元素是一个代表单个 Data Run 的 list """ result = [] pos = 0 while pos < len(data_run): current_byte = data_run[pos] if current_byte == '00': # 遇到空运行块,停止解析 break try: header = int(current_byte, 16) len_bytes = (header >> 4) & 0x0F offset_bytes = header & 0x0F if len_bytes == 0 or offset_bytes == 0: print(f"⚠️ 无效的字段长度,跳过位置 {pos}") break # 计算当前 Data Run 总长度 run_length = 1 + offset_bytes + len_bytes # 截取当前 Data Run fragment = data_run[pos: pos + run_length] result.append(fragment) # 移动指针 pos += run_length except Exception as e: print(f"❌ 解析失败,位置 {pos}:{e}") break return result def hex_list_to_int(lst, byteorder='little'): """ 将十六进制字符串列表转换为整数(支持小端序) """ if byteorder == 'little': lst = list(reversed(lst)) return int(''.join(f"{int(b, 16):02x}" for b in lst), 16) def parse_data_run(data_run, previous_cluster=0, cluster_size=512): """ 解析 NTFS 单个 Data Run,返回起始字节、结束字节、长度(字节) 参数: data_run (list): Data Run 的十六进制字符串列表 previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移) cluster_size (int): 簇大小(默认为 512 字节) 返回: dict: 包含起始字节、结束字节、长度等信息 """ if not data_run or data_run[0] == '00': return None header = int(data_run[0], 16) len_bytes = (header >> 4) & 0x0F offset_bytes = header & 0x0F # 提取偏移字段和长度字段 offset_data = data_run[1:1 + offset_bytes] length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes] # 小端序转整数 def hex_list_to_int(lst): return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16) offset = hex_list_to_int(offset_data) run_length = hex_list_to_int(length_data) # 计算起始簇号 starting_cluster = previous_cluster + offset ending_cluster = starting_cluster + run_length - 1 # 转换为字节偏移 cluster_per_sector = 8 byte_per_sector = cluster_size byte_length = starting_cluster * cluster_per_sector * byte_per_sector starting_byte = run_length * cluster_per_sector * byte_per_sector ending_byte = starting_byte + byte_length - 1 return { "starting_byte": starting_byte, "ending_byte": ending_byte, "byte_length": byte_length, "starting_cluster": starting_cluster, "run_length_clusters": run_length } def ParseMultipleDataRuns(fragments, cluster_size=512): """ 批量解析多个 Data Run 片段,返回字节偏移信息。 参数: fragments (list): 多个 Data Run 字符串列表 cluster_size (int): 簇大小(默认为 512) 返回: list: 每个元素是一个包含字节偏移信息的 dict """ results = [] previous_starting_cluster = 0 for fragment in fragments: result = parse_data_run(fragment, previous_starting_cluster, cluster_size) if result: results.append(result) previous_starting_cluster = result["starting_cluster"] return results def GetFragmentData(file80h_pattern): if not file80h_pattern or not isinstance(file80h_pattern, list): return [] if file80h_pattern[0].get('is_resident'): start_byte = file80h_pattern[0].get('start_byte') offset = file80h_pattern[0].get('offset') content_start = file80h_pattern[0].get('sequence')[2] content_start_list = content_start.split() content_len = content_start_list[::-1][4:8] content_offset = content_start_list[::-1][:4] content_len_str = ''.join(content_len) content_len_decimal_value = int(content_len_str, 16) content_offset_str = ''.join(content_offset) content_offset_decimal_value = int(content_offset_str, 16) file_offset = start_byte + offset + content_offset_decimal_value return [{ 'starting_byte': file_offset, 'byte_length': content_len_decimal_value }] else: sequence_list = ExtractSequenceHexValues(file80h_pattern) data_run_offset = sequence_list[32:34][::-1] data_run_offset_str = ''.join(data_run_offset) data_run_offset_decimal_value = int(data_run_offset_str, 16) data_run_list = sequence_list[data_run_offset_decimal_value:] fragments = ExportDataRunList(data_run_list) results = ParseMultipleDataRuns(fragments) return results # if __name__ == '__main__': # arri80_data = GetFile80hPattern(r"Z:\hello.txt") # data = GetFragmentData(arri80_data) # print(data)