diff --git a/ntfs_utils/db_node.py b/ntfs_utils/db_node.py index 241d824..9797d16 100644 --- a/ntfs_utils/db_node.py +++ b/ntfs_utils/db_node.py @@ -22,9 +22,13 @@ def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int: # 获取 DirLayer(路径层级) def GetDirLayer(path: str) -> int: - # 示例:Z:\demo.jpg → 层级为0;Z:\pictures\RHCE.jpg → 层级为1 - path = path.strip().strip("\\") - return path.count("\\") + # "Z:\demo.jpg" → 0 (根目录文件) + # "Z:\pictures\RHCE.jpg" → 1 (一级子目录) + path = path.strip() + if not path or path == "\\": + return 0 + # 计算路径中的反斜杠数量,减去根目录的反斜杠 + return path.count("\\") - 1 # 获取 GroupID(默认第一个) @@ -74,7 +78,7 @@ def GetRandomLength() -> int: # 主函数:将 db_path 数据导入 db_node -def MigratePathToNode(db_path='../src/db_ntfs_info.db'): +def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'): conn = sqlite3.connect(db_path) cursor = conn.cursor() @@ -85,9 +89,18 @@ def MigratePathToNode(db_path='../src/db_ntfs_info.db'): cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path") rows = cursor.fetchall() + inserted_count = 0 # 新增:记录实际插入的条目数 + for row in rows: path_id, full_path, name, parent_id = row + # 检查是否已存在相同 PathID + cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,)) + exists = cursor.fetchone()[0] + if exists > 0: + print(f"⚠️ PathID {path_id} 已存在,跳过插入") + continue + # 计算字段 name_hash = hashlib.sha256(name.encode()).hexdigest() dir_layer = GetDirLayer(full_path) @@ -136,15 +149,21 @@ def MigratePathToNode(db_path='../src/db_ntfs_info.db'): # 构建 SQL 插入语句 placeholders = ', '.join('?' * len(values)) - insert_sql = f"INSERT INTO db_node ({', '.join(fields)}) VALUES ({placeholders})" + insert_sql = f"INSERT INTO {table_name} ({', '.join(fields)}) VALUES ({placeholders})" # 执行插入 cursor.execute(insert_sql, values) + inserted_count += 1 # 新增:成功插入后计数器加1 conn.commit() conn.close() - print("✅ db_path 数据已成功迁移到 db_node 表") + + # 新增:根据插入结果输出不同信息 + if inserted_count > 0: + print(f"✅ 成功插入 {inserted_count} 条数据到 {table_name} 表") + else: + print("ℹ️ 没有新的数据被插入数据库(可能所有条目已存在或没有可处理的数据)") if __name__ == '__main__': - MigratePathToNode() + InsertNodeDataToDB() \ No newline at end of file diff --git a/ntfs_utils/mft_analyze.py b/ntfs_utils/mft_analyze.py new file mode 100644 index 0000000..95899e5 --- /dev/null +++ b/ntfs_utils/mft_analyze.py @@ -0,0 +1,223 @@ +import os + +import pytsk3 + +from db_config import GetNTFSBootInfo + + +def find_file_mft_entry(fs, target_path): + """ + 在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号 + """ + + def traverse_directory(inode, path_components): + if not path_components: + return inode + + dir_name = path_components[0].lower() + try: + directory = fs.open_dir(inode=inode) + except Exception as e: + print(f"Error opening directory with inode {inode}: {e}") + return None + + for entry in directory: + if not entry.info or not entry.info.name or not entry.info.meta: + continue + + name = entry.info.name.name.decode('utf-8', errors='ignore').lower() + meta = entry.info.meta + + # 匹配当前层级目录或文件名 + if name == dir_name: + if len(path_components) == 1: + # 是目标文件/目录 + return meta.addr + + elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR: + # 继续深入查找子目录 + next_inode = entry.info.meta.addr + result = traverse_directory(next_inode, path_components[1:]) + if result: + return result + return None + + # 拆分路径 + path_parts = target_path.strip("\\").lower().split("\\") + root_inode = fs.info.root_inum # 根目录 MFT Entry + return traverse_directory(root_inode, path_parts) + + +def GetFileMftEntry(file_path): + """ + 获取指定文件在 NTFS 中的 MFT Entry 编号 + """ + + if not os.path.exists(file_path): + raise FileNotFoundError(f"File not found: {file_path}") + + # 获取驱动器字母 + drive_letter = os.path.splitdrive(file_path)[0][0] + device = f"\\\\.\\{drive_letter}:" + + print(f"Opening device: {device}") + + try: + img = pytsk3.Img_Info(device) + fs = pytsk3.FS_Info(img) + except Exception as e: + raise RuntimeError(f"Failed to open device '{device}': {e}") + + # 构建相对路径 + abs_path = os.path.abspath(file_path) + root_path = f"{drive_letter}:\\" + rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\") + + print(f"Looking up MFT entry for: {rel_path}") + + mft_entry = find_file_mft_entry(fs, rel_path) + if mft_entry is None: + raise RuntimeError("Could not find MFT entry for the specified file.") + + return mft_entry + + +def CalculateFileMftStartSector(mft_entry, volume_letter="Z"): + """ + 根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号 + + 参数: + mft_entry (int): 文件的 MFT Entry 编号(即 inode) + mft_start_sector (int): $MFT 的起始扇区号,默认 6291456 + mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024 + bytes_per_sector (int): 每扇区字节数,默认 512 + + 返回: + int: 文件 MFT Entry 的起始扇区号 + """ + if mft_entry < 0: + raise ValueError("MFT Entry 编号不能为负数") + + # 获取 NTFS 引导信息 + config_data = GetNTFSBootInfo(volume_letter) + # 计算文件 MFT Entry 的起始扇区号 + start_sector = config_data["MftPosition"] * 8 + mft_entry * 2 + + return start_sector + + +def Get80hPattern(sector_number, volume_letter="Z"): + """ + 读取NTFS扇区并查找特定模式的数据 + + 参数: + sector_number (int): 要读取的扇区号 + drive_path (str): 磁盘设备路径,默认为Z盘 + + 返回: + list: 包含所有匹配信息的列表,每个元素为: + { + 'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512), + 'offset': 当前80属性在扇区内的偏移位置, + 'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."), + 'is_resident': 是否为常驻属性, + 'total_groups': 实际读取的组数, + 'attribute_length': 属性总长度(字节) + } + """ + drive_path = fr"\\.\{volume_letter}:" + SECTOR_SIZE = 512 + GROUP_SIZE = 8 # 每组8字节 + MATCH_BYTE = 0x80 # 要匹配的起始字节 + results = [] + + try: + with open(drive_path, 'rb') as disk: + disk.seek(sector_number * SECTOR_SIZE) + sector_data = disk.read(SECTOR_SIZE) + + if not sector_data or len(sector_data) < GROUP_SIZE: + print(f"错误: 无法读取扇区 {sector_number}") + return results + + groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)] + + for i in range(len(groups)): + current_group = groups[i] + + if len(current_group) < GROUP_SIZE: + continue + + if current_group[0] == MATCH_BYTE: + # 获取第5~8字节作为属性长度(小端DWORD) + if i + 1 >= len(groups): + print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h") + continue + + attribute_length_bytes = b''.join([ + groups[i][4:8], # 第一组的4~7字节 + groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00' + ]) + + attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little') + + # 计算要读取的组数(向上取整到8字节) + total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE + + end_idx = i + total_groups + if end_idx > len(groups): + print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h") + continue + + raw_sequence = groups[i:end_idx] + + # 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00" + formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence] + + # 判断是否为常驻属性(查看第2个组第一个字节最低位) + is_resident = False + if len(raw_sequence) >= 2: + second_group = raw_sequence[1] + is_resident = (second_group[0] & 0x01) == 0x00 + + result_entry = { + 'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置 + 'offset': i * GROUP_SIZE, + 'sequence': formatted_sequence, + 'is_resident': is_resident, + 'total_groups': total_groups, + 'attribute_length': attribute_length + } + + results.append(result_entry) + + # resident_str = "常驻" if is_resident else "非常驻" + # print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:") + # print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:") + # for j, group in enumerate(formatted_sequence): + # print(f"组 {j + 1}: {group}") + # + # print(f"\n共找到 {len(results)} 个匹配序列") + + return results + + except PermissionError: + print("错误: 需要管理员权限访问磁盘设备") + except Exception as e: + print(f"发生错误: {str(e)}") + + return results + + +def GetFile80hPattern(file_path): + volume_letter = file_path.split(':')[0] + try: + mft_entry_value = GetFileMftEntry(file_path) + StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter) + print(Get80hPattern(StartSector, volume_letter)) + except Exception as e: + print(f"❌ Error: {e}") + + +if __name__ == '__main__': + GetFile80hPattern(r"Z:\demo.jpg") diff --git a/src/db_ntfs_info.db b/src/db_ntfs_info.db index 4d349de..2740b3d 100644 Binary files a/src/db_ntfs_info.db and b/src/db_ntfs_info.db differ