import os import pytsk3 from db_config import GetNTFSBootInfo def find_file_mft_entry(fs, target_path): """ 在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号 """ def traverse_directory(inode, path_components): if not path_components: return inode dir_name = path_components[0].lower() try: directory = fs.open_dir(inode=inode) except Exception as e: print(f"Error opening directory with inode {inode}: {e}") return None for entry in directory: if not entry.info or not entry.info.name or not entry.info.meta: continue name = entry.info.name.name.decode('utf-8', errors='ignore').lower() meta = entry.info.meta # 匹配当前层级目录或文件名 if name == dir_name: if len(path_components) == 1: # 是目标文件/目录 return meta.addr elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR: # 继续深入查找子目录 next_inode = entry.info.meta.addr result = traverse_directory(next_inode, path_components[1:]) if result: return result return None # 拆分路径 path_parts = target_path.strip("\\").lower().split("\\") root_inode = fs.info.root_inum # 根目录 MFT Entry return traverse_directory(root_inode, path_parts) def GetFileMftEntry(file_path): """ 获取指定文件在 NTFS 中的 MFT Entry 编号 """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # 获取驱动器字母 drive_letter = os.path.splitdrive(file_path)[0][0] device = f"\\\\.\\{drive_letter}:" # print(f"Opening device: {device}") try: img = pytsk3.Img_Info(device) fs = pytsk3.FS_Info(img) except Exception as e: raise RuntimeError(f"Failed to open device '{device}': {e}") # 构建相对路径 abs_path = os.path.abspath(file_path) root_path = f"{drive_letter}:\\" rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\") # print(f"Looking up MFT entry for: {rel_path}") mft_entry = find_file_mft_entry(fs, rel_path) # print(f"MFT Entry: {mft_entry}") if mft_entry is None: raise RuntimeError("Could not find MFT entry for the specified file.") return mft_entry def CalculateFileMftStartSector(mft_entry, volume_letter="Z"): """ 根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号 参数: mft_entry (int): 文件的 MFT Entry 编号(即 inode) mft_start_sector (int): $MFT 的起始扇区号,默认 6291456 mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024 bytes_per_sector (int): 每扇区字节数,默认 512 返回: int: 文件 MFT Entry 的起始扇区号 """ if mft_entry < 0: raise ValueError("MFT Entry 编号不能为负数") # 获取 NTFS 引导信息 config_data = GetNTFSBootInfo(volume_letter) # 计算文件 MFT Entry 的起始扇区号 start_sector = config_data["MftPosition"] * 8 + mft_entry * 2 if start_sector < 0: raise ValueError("起始扇区号不能为负数") # print(f"文件 MFT Entry 的起始扇区号: {start_sector}") return start_sector def Get80hPattern(sector_number, volume_letter="Z"): """ 读取NTFS扇区并查找特定模式的数据 参数: sector_number (int): 要读取的扇区号 drive_path (str): 磁盘设备路径,默认为Z盘 返回: list: 包含所有匹配信息的列表,每个元素为: { 'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512), 'offset': 当前80属性在扇区内的偏移位置, 'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."), 'is_resident': 是否为常驻属性, 'total_groups': 实际读取的组数, 'attribute_length': 属性总长度(字节) } """ drive_path = fr"\\.\{volume_letter}:" SECTOR_SIZE = 512 GROUP_SIZE = 8 # 每组8字节 MATCH_BYTE = 0x80 # 要匹配的起始字节 results = [] try: with open(drive_path, 'rb') as disk: disk.seek(sector_number * SECTOR_SIZE) sector_data = disk.read(SECTOR_SIZE) if not sector_data or len(sector_data) < GROUP_SIZE: print(f"错误: 无法读取扇区 {sector_number}") return results groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)] for i in range(len(groups)): current_group = groups[i] if len(current_group) < GROUP_SIZE: continue if current_group[0] == MATCH_BYTE: # 获取第5~8字节作为属性长度(小端DWORD) if i + 1 >= len(groups): print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h") continue attribute_length_bytes = b''.join([ groups[i][4:8], # 第一组的4~7字节 groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00' ]) attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little') # 计算要读取的组数(向上取整到8字节) total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE end_idx = i + total_groups if end_idx > len(groups): print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h") continue raw_sequence = groups[i:end_idx] # 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00" formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence] # 判断是否为常驻属性(查看第2个组第一个字节最低位) is_resident = False if len(raw_sequence) >= 2: second_group = raw_sequence[1] is_resident = (second_group[0] & 0x01) == 0x00 result_entry = { 'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置 'offset': i * GROUP_SIZE, 'sequence': formatted_sequence, 'is_resident': is_resident, 'total_groups': total_groups, 'attribute_length': attribute_length } results.append(result_entry) # resident_str = "常驻" if is_resident else "非常驻" # print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:") # print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:") # for j, group in enumerate(formatted_sequence): # print(f"组 {j + 1}: {group}") # # print(f"\n共找到 {len(results)} 个匹配序列") return results except PermissionError: print("错误: 需要管理员权限访问磁盘设备") except Exception as e: print(f"发生错误: {str(e)}") return results def GetFile80hPattern(file_path): volume_letter = file_path.split(':')[0] try: mft_entry_value = GetFileMftEntry(file_path) StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter) # print(f"文件的相关信息以及80属性内容:") # print(Get80hPattern(StartSector, volume_letter)) file80h_pattern = Get80hPattern(StartSector, volume_letter) return file80h_pattern except Exception as e: print(f"❌ Error: {e}") return None # if __name__ == '__main__': # GetFile80hPattern(r"Z:\demo.jpg") def analyze_ntfs_data_attribute(data): """ 分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量 参数: data (list): 包含字典的列表,每个字典需有'sequence'键 (示例结构见问题描述) 返回: int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数) 异常: ValueError: 当输入数据无效时抛出 """ # 第一步:提取并转换sequence数据 hex_bytes = [] for entry in data: if 'sequence' in entry: for hex_str in entry['sequence']: hex_bytes.extend(hex_str.split()) # 将十六进制字符串转换为整数列表 try: attribute_data = [int(x, 16) for x in hex_bytes] except ValueError: raise ValueError("无效的十六进制数据") # 第二步:分析属性结构 if len(attribute_data) < 24: raise ValueError("属性数据过短,无法解析头部信息") # 检查属性类型(0x80) if attribute_data[0] != 0x80: raise ValueError("不是80属性($DATA属性)") # 检查是否常驻(偏移0x08) is_resident = attribute_data[8] == 0 if is_resident: return 1 else: # 解析非常驻属性的数据运行列表 data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8) if data_run_offset >= len(attribute_data): raise ValueError("数据运行偏移超出属性长度") data_runs = attribute_data[data_run_offset:] fragment_count = 0 pos = 0 while pos < len(data_runs): header_byte = data_runs[pos] if header_byte == 0x00: break len_len = (header_byte >> 4) & 0x0F offset_len = header_byte & 0x0F if len_len == 0 or offset_len == 0: break pos += 1 + len_len + offset_len fragment_count += 1 return fragment_count input_data = [ { 'start_byte': 3221267456, 'offset': 264, 'sequence': [ '80 00 00 00 48 00 00 00', '01 00 00 00 00 00 01 00', '00 00 00 00 00 00 00 00', '79 00 00 00 00 00 00 00', '40 00 00 00 00 00 00 00', '00 a0 07 00 00 00 00 00', '0b 93 07 00 00 00 00 00', '0b 93 07 00 00 00 00 00', '31 7a 00 ee 0b 00 00 00' ], 'is_resident': False, 'total_groups': 9, 'attribute_length': 72 } ] print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量