diff --git a/db_manage/clear_table_record.py b/db_manage/clear_table_record.py index c9d4b64..fe9be28 100644 --- a/db_manage/clear_table_record.py +++ b/db_manage/clear_table_record.py @@ -23,7 +23,12 @@ def ClearTableRecordsWithReset(db_path, table_name): if __name__ == '__main__': - # ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path') - # ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device') - # ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path') ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_user') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_group') + # ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_extent') + ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_name') + diff --git a/ntfs_utils/__init__.py b/ntfs_utils/__init__.py index 7b97b73..612e6a1 100644 --- a/ntfs_utils/__init__.py +++ b/ntfs_utils/__init__.py @@ -4,6 +4,7 @@ from db_extend_name import InsertExtensionsToDB from db_group import InsertGroupToDB from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB from db_user import InsertUserToDB +from db_node import InsertNodeDataToDB def main(): @@ -40,6 +41,8 @@ def main(): count = InsertExtensionsToDB(common_extensions) print(f"共插入 {count} 个新扩展名。") + InsertNodeDataToDB() + if __name__ == '__main__': main() diff --git a/ntfs_utils/db_node.py b/ntfs_utils/db_node.py index 877bdeb..6a75722 100644 --- a/ntfs_utils/db_node.py +++ b/ntfs_utils/db_node.py @@ -1,10 +1,10 @@ import hashlib import os -import random import sqlite3 from datetime import datetime -from mft_analyze import GetFile80hPattern +# 导入你的模块函数 +from mft_analyze import GetFile80hPattern, GetFragmentData, ExtractSequenceHexValues, hex_list_to_int # 工具函数:获取文件扩展名 @@ -52,7 +52,6 @@ def GetFilesTime(file_path): st_atime: 最后一次访问时间(FileAccessTime) st_mtime: 最后一次修改内容的时间(FileModifyTime) st_ctime: 文件元数据(metadata)更改时间,在 Windows 中是文件创建时间(FileCreateTime) - 注意:Windows 和 Linux 在这些字段的定义上略有不同,比如 Linux 中 st_ctime 是元数据变更时间,而不是创建时间。 参数: file_path (str): 文件的绝对路径 @@ -71,7 +70,6 @@ def GetFilesTime(file_path): try: stat_info = os.stat(file_path) - # 将时间戳转换为可读格式字符串 ISO 8601 格式 def ts_to_str(timestamp): return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') @@ -79,7 +77,7 @@ def GetFilesTime(file_path): modify_time = ts_to_str(stat_info.st_mtime) access_time = ts_to_str(stat_info.st_atime) - # 权限变更时间,Linux 上是 metadata 修改时间,Windows 上可能不适用 + # 权限变更时间,Windows 下可能不适用 try: auth_time = ts_to_str(getattr(stat_info, 'st_birthtime', stat_info.st_ctime)) except Exception: @@ -110,8 +108,13 @@ def GetDeviceId(cursor: sqlite3.Cursor) -> int: # 获取文件大小(伪数据) -def GetFileSize(full_path: str) -> int: - return random.randint(100, 999) +def GetFileSize(file80h_pattern): + if file80h_pattern[0].get('is_resident'): + return GetFragmentData(file80h_pattern)[0].get('byte_length') + else: + size_list = ExtractSequenceHexValues(file80h_pattern)[56:64] + size = hex_list_to_int(size_list) + return size # 获取文件内容哈希(伪数据) @@ -119,85 +122,16 @@ def GetFileHash(full_path: str) -> str: return hashlib.sha256(full_path.encode()).hexdigest() -# 获取分片数(1~4) -def GetExtentCount(data): - """ - 分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量 - - 参数: - data (list): 包含字典的列表,每个字典需有'sequence'键 - (示例结构见问题描述) - - 返回: - int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数) - - 异常: - ValueError: 当输入数据无效时抛出 - """ - # 第一步:提取并转换sequence数据 - hex_bytes = [] - for entry in data: - if 'sequence' in entry: - for hex_str in entry['sequence']: - hex_bytes.extend(hex_str.split()) - - # 将十六进制字符串转换为整数列表 - try: - attribute_data = [int(x, 16) for x in hex_bytes] - except ValueError: - raise ValueError("无效的十六进制数据") - - # 第二步:分析属性结构 - if len(attribute_data) < 24: - raise ValueError("属性数据过短,无法解析头部信息") - - # 检查属性类型(0x80) - if attribute_data[0] != 0x80: - raise ValueError("不是80属性($DATA属性)") - - # 检查是否常驻(偏移0x08) - is_resident = attribute_data[8] == 0 - - if is_resident: - return 1 - else: - # 解析非常驻属性的数据运行列表 - data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8) - - if data_run_offset >= len(attribute_data): - raise ValueError("数据运行偏移超出属性长度") - - data_runs = attribute_data[data_run_offset:] - fragment_count = 0 - pos = 0 - - while pos < len(data_runs): - header_byte = data_runs[pos] - if header_byte == 0x00: - break - - len_len = (header_byte >> 4) & 0x0F - offset_len = header_byte & 0x0F - - if len_len == 0 or offset_len == 0: - break - - pos += 1 + len_len + offset_len - fragment_count += 1 - - return fragment_count +# 新增:获取文件片段位置和长度 +def GetFragmentLocation(fragment): + return fragment.get('starting_byte', 0) -# 获取随机位置 -def GetRandomLocation() -> int: - return random.randint(1000, 9999) - - -# 获取随机长度 -def GetRandomLength() -> int: - return random.randint(1000, 9999) +def GetFragmentLength(fragment): + return fragment.get('byte_length', 0) +# 主函数:将 db_path 数据导入 db_node # 主函数:将 db_path 数据导入 db_node def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'): conn = sqlite3.connect(db_path) @@ -220,11 +154,30 @@ def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'): print(f"⚠️ PathID {path_id} 已存在,跳过插入") continue + # 获取文件的80h属性数据 + try: + file80h_pattern = GetFile80hPattern(full_path) + fragments = GetFragmentData(file80h_pattern) + extent_count = min(len(fragments), 4) # 最多支持4个fragment + print(f"✅ 分片数量为: {extent_count}") + + except Exception as e: + print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}") + fragments = [] + extent_count = 0 + # 计算字段 name_hash = hashlib.sha256(name.encode()).hexdigest() dir_layer = GetDirLayer(full_path) extend_name_id = GetExtendNameId(name, cursor) - file_size = GetFileSize(full_path) + + # ✅ 现在可以安全调用 GetFileSize(file80h_pattern) + try: + file_size = GetFileSize(file80h_pattern) + except Exception as e: + print(f"⚠️ 获取文件大小失败,使用默认值 0: {e}") + file_size = 0 + file_hash = GetFileHash(full_path) # 获取文件的时间属性 @@ -234,20 +187,10 @@ def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'): access_time = file_times["FileAccessTime"] auth_time = file_times["FileAuthTime"] - # 新增:根据 $80 属性获取更精确的 ExtentCount - try: - attribute_80_data = GetFile80hPattern(full_path) - - if not attribute_80_data or not isinstance(attribute_80_data, list): - raise ValueError("无效的 80h 属性数据") - - extent_count = GetExtentCount(attribute_80_data) - - print(f"✅ 分片数量为: {extent_count}") - - except Exception as e: - print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}") - extent_count = 0 + # 查询 PathHash + cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,)) + path_hash_result = cursor.fetchone() + path_hash = path_hash_result[0] if path_hash_result else "" # 构建插入语句字段和参数(保持原样) fields = [ @@ -257,24 +200,19 @@ def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'): 'FileSize', 'FileMode', 'FileHash', 'ExtentCount' ] values = [ - path_id, parent_id, name_hash, '', # PathHash 待填 + path_id, parent_id, name_hash, path_hash, extend_name_id, dir_layer, group_id, user_id, create_time, modify_time, access_time, auth_time, file_size, 'default', file_hash, extent_count ] - # 查询 PathHash(与 db_path.PathHash 一致) - cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,)) - path_hash_result = cursor.fetchone() - path_hash = path_hash_result[0] if path_hash_result else "" - values[3] = path_hash # 替换 PathHash - # 处理 Extent 片段字段 extent_data = [] - for i in range(1, 5): - if i <= extent_count: - location = GetRandomLocation() - length = GetRandomLength() + for i in range(4): # 最多4个 extent + if i < len(fragments): + frag = fragments[i] + location = GetFragmentLocation(frag) + length = GetFragmentLength(frag) extent_data.extend([device_id, location, length]) else: extent_data.extend([None, None, None]) diff --git a/ntfs_utils/mft_analyze.py b/ntfs_utils/mft_analyze.py index 2476f32..9438d82 100644 --- a/ntfs_utils/mft_analyze.py +++ b/ntfs_utils/mft_analyze.py @@ -227,96 +227,197 @@ def GetFile80hPattern(file_path): # if __name__ == '__main__': -# GetFile80hPattern(r"Z:\demo.jpg") +# data = GetFile80hPattern(r"Z:\hello.txt") +# print(data) -def analyze_ntfs_data_attribute(data): +def ExtractSequenceHexValues(file80h_pattern): """ - 分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量 + 从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表 参数: - data (list): 包含字典的列表,每个字典需有'sequence'键 - (示例结构见问题描述) + data (list): 包含字典的列表,每个字典有 'sequence' 键 返回: - int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数) - - 异常: - ValueError: 当输入数据无效时抛出 + list: 包含所有 sequence 值的合并列表 """ - # 第一步:提取并转换sequence数据 - hex_bytes = [] - for entry in data: + sequence_list = [] + for entry in file80h_pattern: if 'sequence' in entry: + # 将每个十六进制字符串按空格分割,然后合并到结果列表 for hex_str in entry['sequence']: - hex_bytes.extend(hex_str.split()) + # 分割字符串并添加到结果 + sequence_list.extend(hex_str.split()) + return sequence_list - # 将十六进制字符串转换为整数列表 - try: - attribute_data = [int(x, 16) for x in hex_bytes] - except ValueError: - raise ValueError("无效的十六进制数据") - # 第二步:分析属性结构 - if len(attribute_data) < 24: - raise ValueError("属性数据过短,无法解析头部信息") +def ExportDataRunList(data_run): + """ + 将 data_run 中的多个 Data Run 提取为独立的 list 片段。 - # 检查属性类型(0x80) - if attribute_data[0] != 0x80: - raise ValueError("不是80属性($DATA属性)") + 参数: + data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容 - # 检查是否常驻(偏移0x08) - is_resident = attribute_data[8] == 0 + 返回: + list: 每个元素是一个代表单个 Data Run 的 list + """ + result = [] + pos = 0 - if is_resident: - return 1 - else: - # 解析非常驻属性的数据运行列表 - data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8) + while pos < len(data_run): + current_byte = data_run[pos] - if data_run_offset >= len(attribute_data): - raise ValueError("数据运行偏移超出属性长度") + if current_byte == '00': + # 遇到空运行块,停止解析 + break - data_runs = attribute_data[data_run_offset:] - fragment_count = 0 - pos = 0 + try: + header = int(current_byte, 16) + len_bytes = (header >> 4) & 0x0F + offset_bytes = header & 0x0F - while pos < len(data_runs): - header_byte = data_runs[pos] - if header_byte == 0x00: + if len_bytes == 0 or offset_bytes == 0: + print(f"⚠️ 无效的字段长度,跳过位置 {pos}") break - len_len = (header_byte >> 4) & 0x0F - offset_len = header_byte & 0x0F + # 计算当前 Data Run 总长度 + run_length = 1 + offset_bytes + len_bytes - if len_len == 0 or offset_len == 0: - break + # 截取当前 Data Run + fragment = data_run[pos: pos + run_length] - pos += 1 + len_len + offset_len - fragment_count += 1 + result.append(fragment) - return fragment_count + # 移动指针 + pos += run_length + + except Exception as e: + print(f"❌ 解析失败,位置 {pos}:{e}") + break + + return result -input_data = [ - { - 'start_byte': 3221267456, - 'offset': 264, - 'sequence': [ - '80 00 00 00 48 00 00 00', - '01 00 00 00 00 00 01 00', - '00 00 00 00 00 00 00 00', - '79 00 00 00 00 00 00 00', - '40 00 00 00 00 00 00 00', - '00 a0 07 00 00 00 00 00', - '0b 93 07 00 00 00 00 00', - '0b 93 07 00 00 00 00 00', - '31 7a 00 ee 0b 00 00 00' - ], - 'is_resident': False, - 'total_groups': 9, - 'attribute_length': 72 +def hex_list_to_int(lst, byteorder='little'): + """ + 将十六进制字符串列表转换为整数(支持小端序) + """ + if byteorder == 'little': + lst = list(reversed(lst)) + return int(''.join(f"{int(b, 16):02x}" for b in lst), 16) + + +def parse_data_run(data_run, previous_cluster=0, cluster_size=512): + """ + 解析 NTFS 单个 Data Run,返回起始字节、结束字节、长度(字节) + + 参数: + data_run (list): Data Run 的十六进制字符串列表 + previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移) + cluster_size (int): 簇大小(默认为 512 字节) + + 返回: + dict: 包含起始字节、结束字节、长度等信息 + """ + if not data_run or data_run[0] == '00': + return None + + header = int(data_run[0], 16) + len_bytes = (header >> 4) & 0x0F + offset_bytes = header & 0x0F + + # 提取偏移字段和长度字段 + offset_data = data_run[1:1 + offset_bytes] + length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes] + + # 小端序转整数 + def hex_list_to_int(lst): + return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16) + + offset = hex_list_to_int(offset_data) + run_length = hex_list_to_int(length_data) + + # 计算起始簇号 + starting_cluster = previous_cluster + offset + ending_cluster = starting_cluster + run_length - 1 + + # 转换为字节偏移 + cluster_per_sector = 8 + byte_per_sector = cluster_size + byte_length = starting_cluster * cluster_per_sector * byte_per_sector + starting_byte = run_length * cluster_per_sector * byte_per_sector + ending_byte = starting_byte + byte_length - 1 + + return { + "starting_byte": starting_byte, + "ending_byte": ending_byte, + "byte_length": byte_length, + "starting_cluster": starting_cluster, + "run_length_clusters": run_length } -] -print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量 + +def ParseMultipleDataRuns(fragments, cluster_size=512): + """ + 批量解析多个 Data Run 片段,返回字节偏移信息。 + + 参数: + fragments (list): 多个 Data Run 字符串列表 + cluster_size (int): 簇大小(默认为 512) + + 返回: + list: 每个元素是一个包含字节偏移信息的 dict + """ + results = [] + previous_starting_cluster = 0 + + for fragment in fragments: + result = parse_data_run(fragment, previous_starting_cluster, cluster_size) + + if result: + results.append(result) + previous_starting_cluster = result["starting_cluster"] + + return results + + +def GetFragmentData(file80h_pattern): + if not file80h_pattern or not isinstance(file80h_pattern, list): + return [] + + if file80h_pattern[0].get('is_resident'): + start_byte = file80h_pattern[0].get('start_byte') + offset = file80h_pattern[0].get('offset') + content_start = file80h_pattern[0].get('sequence')[2] + + content_start_list = content_start.split() + content_len = content_start_list[::-1][4:8] + content_offset = content_start_list[::-1][:4] + + content_len_str = ''.join(content_len) + content_len_decimal_value = int(content_len_str, 16) + content_offset_str = ''.join(content_offset) + content_offset_decimal_value = int(content_offset_str, 16) + + file_offset = start_byte + offset + content_offset_decimal_value + + return [{ + 'starting_byte': file_offset, + 'byte_length': content_len_decimal_value + }] + + else: + sequence_list = ExtractSequenceHexValues(file80h_pattern) + data_run_offset = sequence_list[32:34][::-1] + data_run_offset_str = ''.join(data_run_offset) + data_run_offset_decimal_value = int(data_run_offset_str, 16) + data_run_list = sequence_list[data_run_offset_decimal_value:] + fragments = ExportDataRunList(data_run_list) + results = ParseMultipleDataRuns(fragments) + return results + + +# if __name__ == '__main__': +# arri80_data = GetFile80hPattern(r"Z:\hello.txt") +# data = GetFragmentData(arri80_data) +# print(data) diff --git a/test/export_useful_fragments.py b/test/export_useful_fragments.py new file mode 100644 index 0000000..b6de818 --- /dev/null +++ b/test/export_useful_fragments.py @@ -0,0 +1,139 @@ +def extract_data_run_fragments(data_run): + """ + 将 data_run 中的多个 Data Run 提取为独立的 list 片段。 + + 参数: + data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容 + + 返回: + list: 每个元素是一个代表单个 Data Run 的 list + """ + result = [] + pos = 0 + + while pos < len(data_run): + current_byte = data_run[pos] + + if current_byte == '00': + # 遇到空运行块,停止解析 + break + + try: + header = int(current_byte, 16) + len_bytes = (header >> 4) & 0x0F + offset_bytes = header & 0x0F + + if len_bytes == 0 or offset_bytes == 0: + print(f"⚠️ 无效的字段长度,跳过位置 {pos}") + break + + # 计算当前 Data Run 总长度 + run_length = 1 + offset_bytes + len_bytes + + # 截取当前 Data Run + fragment = data_run[pos: pos + run_length] + + result.append(fragment) + + # 移动指针 + pos += run_length + + except Exception as e: + print(f"❌ 解析失败,位置 {pos}:{e}") + break + + return result + + +def hex_list_to_int(lst, byteorder='little'): + """ + 将十六进制字符串列表转换为整数(支持小端序) + """ + if byteorder == 'little': + lst = list(reversed(lst)) + return int(''.join(f"{int(b, 16):02x}" for b in lst), 16) + + +def parse_data_run(data_run, previous_cluster=0): + """ + 解析 NTFS 单个 Data Run,返回起始簇号和结束簇号 + + 参数: + data_run (list): Data Run 的十六进制字符串列表 + previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移) + + 返回: + dict: 包含起始簇、结束簇、运行长度等信息 + """ + if not data_run or data_run[0] == '00': + return None + + header = int(data_run[0], 16) + len_bytes = (header >> 4) & 0x0F + offset_bytes = header & 0x0F + + # 提取偏移字段和长度字段(注意顺序是先偏移后长度) + offset_data = data_run[1:1 + offset_bytes] + length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes] + + # 解析偏移和长度 + offset = hex_list_to_int(offset_data, 'little') + run_length = hex_list_to_int(length_data, 'little') + + # 计算起始簇号(如果是第一个就是绝对偏移,否则是相对偏移) + starting_cluster = previous_cluster + offset + ending_cluster = starting_cluster + run_length - 1 + + return { + "starting_cluster": starting_cluster, + "ending_cluster": ending_cluster, + "run_length": run_length + } + + +def parse_multiple_data_runs(fragments): + """ + 批量解析多个 Data Run 片段,支持相对偏移。 + + 参数: + fragments (list): 多个 Data Run 字符串列表,如: + [ + ['31', '7a', '00', 'ee', '0b'], + ['22', '29', '06', 'bb', '00'], + ... + ] + + 返回: + list: 每个元素是一个 dict,包含该片段的解析结果 + """ + results = [] + previous_starting_cluster = 0 + + for fragment in fragments: + result = parse_data_run(fragment, previous_starting_cluster) + + if result: + results.append(result) + previous_starting_cluster = result["starting_cluster"] + + return results + + +data_run = [ + '31', '7a', '00', 'ee', '0b', + '22', '29', '06', 'bb', '00', + '32', '7a', '02', 'ee', '00', '00', + '00', 'a0', 'f8', 'ff', 'ff', 'ff', 'ff', 'ff' +] + +# Step 1: 提取所有有效片段 +fragments = extract_data_run_fragments(data_run) +print("提取到的片段:") +for i, frag in enumerate(fragments): + print(f"片段{i + 1}: {frag}") + +# Step 2: 批量解析这些片段 +results = parse_multiple_data_runs(fragments) +print("\n解析结果:") +for i, res in enumerate(results): + print(f"片段{i + 1}: {res}") diff --git a/test/get_extent_counts.py b/test/get_extent_counts.py new file mode 100644 index 0000000..1d0309e --- /dev/null +++ b/test/get_extent_counts.py @@ -0,0 +1,92 @@ +def analyze_ntfs_data_attribute(data): + """ + 分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量 + + 参数: + data (list): 包含字典的列表,每个字典需有'sequence'键 + (示例结构见问题描述) + + 返回: + int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数) + + 异常: + ValueError: 当输入数据无效时抛出 + """ + # 第一步:提取并转换sequence数据 + hex_bytes = [] + for entry in data: + if 'sequence' in entry: + for hex_str in entry['sequence']: + hex_bytes.extend(hex_str.split()) + + print(hex_bytes) + + # 将十六进制字符串转换为整数列表 + try: + attribute_data = [int(x, 16) for x in hex_bytes] + except ValueError: + raise ValueError("无效的十六进制数据") + + # 第二步:分析属性结构 + if len(attribute_data) < 24: + raise ValueError("属性数据过短,无法解析头部信息") + + # 检查属性类型(0x80) + if attribute_data[0] != 0x80: + raise ValueError("不是80属性($DATA属性)") + + # 检查是否常驻(偏移0x08) + is_resident = attribute_data[8] == 0 + + if is_resident: + return 1 + else: + # 解析非常驻属性的数据运行列表 + data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8) + + if data_run_offset >= len(attribute_data): + raise ValueError("数据运行偏移超出属性长度") + + data_runs = attribute_data[data_run_offset:] + fragment_count = 0 + pos = 0 + + while pos < len(data_runs): + header_byte = data_runs[pos] + if header_byte == 0x00: + break + + len_len = (header_byte >> 4) & 0x0F + offset_len = header_byte & 0x0F + + if len_len == 0 or offset_len == 0: + break + + pos += 1 + len_len + offset_len + fragment_count += 1 + + return fragment_count + + +input_data = [ + { + 'start_byte': 3221267456, + 'offset': 264, + 'sequence': [ + '80 00 00 00 48 00 00 00', + '01 00 00 00 00 00 01 00', + '00 00 00 00 00 00 00 00', + '79 00 00 00 00 00 00 00', + '40 00 00 00 00 00 00 00', + '00 a0 07 00 00 00 00 00', + '0b 93 07 00 00 00 00 00', + '0b 93 07 00 00 00 00 00', + '31 7a 00 ee 0b 00 00 00' + ], + 'is_resident': False, + 'total_groups': 9, + 'attribute_length': 72 + } +] + +print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量 diff --git a/test/parse_80_attribution.py b/test/parse_80_attribution.py new file mode 100644 index 0000000..a4a39ab --- /dev/null +++ b/test/parse_80_attribution.py @@ -0,0 +1,105 @@ +def ParseDataRuns(data_bytes: list, cluster_size=512): + """ + 解析 NTFS $80 属性中的数据运行(Data Run),返回每个分片的起始字节数和长度。 + + 参数: + data_bytes (list): 十六进制字符串组成的列表,表示完整的 $80 属性内容。 + cluster_size (int): 簇大小(默认为 512 字节) + + 返回: + dict: 包含每个分片信息的字典,格式如下: + { + "is_resident": False, + "data_runs": { + "片段1": {"起始字节数": 3202351104, "字节长度": 499712 - 1}, + "片段2": {...} + } + } + """ + + def hex_list_to_int(lst, length, byteorder='little'): + """从列表中提取指定长度的字节并转换为整数""" + bytes_data = bytes([int(x, 16) for x in lst[:length]]) + return int.from_bytes(bytes_data, byteorder=byteorder) + + result = { + "is_resident": True, + "data_runs": {} + } + + # 检查是否是 $80 属性 + if data_bytes[0] != '80': + raise ValueError("不是 $80 属性") + + # 常驻标志在偏移 0x08(第 8 个字节) + is_resident = data_bytes[8] == '00' + result["is_resident"] = is_resident + + if is_resident: + result["data_runs"]["常驻文件"] = { + "起始字节数": 0, + "字节长度": "该文件为常驻,无分片" + } + return result + + # 非常驻属性:获取数据运行偏移(偏移 0x20 处的 DWORD) + data_run_offset = hex_list_to_int(data_bytes[0x20:0x20 + 4], 4) + if data_run_offset >= len(data_bytes): + raise ValueError("数据运行偏移超出范围") + + # 提取数据运行部分 + data_run_bytes = data_bytes[data_run_offset:] + pos = 0 + fragment_index = 1 + + while pos < len(data_run_bytes): + header_byte = int(data_run_bytes[pos], 16) + if header_byte == 0x00: + break + + # 高4位:长度字段数量;低4位:偏移字段数量 + len_len = (header_byte >> 4) & 0x0F + offset_len = header_byte & 0x0F + + if len_len == 0 or offset_len == 0: + break + + pos += 1 + + # 提取偏移量(小端序) + offset_bytes = data_run_bytes[pos:pos + offset_len] + offset = hex_list_to_int(offset_bytes, offset_len, byteorder='little') + + # 提取长度(小端序) + length_bytes = data_run_bytes[pos + offset_len:pos + offset_len + len_len] + length = hex_list_to_int(length_bytes, len_len, byteorder='little') + + # 计算起始字节数 = offset * cluster_size + start_byte = offset * cluster_size + byte_length = length * cluster_size - 1 + + result["data_runs"][f"片段{fragment_index}"] = { + "起始字节数": start_byte, + "字节长度": byte_length + } + + pos += offset_len + len_len + fragment_index += 1 + + return result + + +input_data = [ + '80', '00', '00', '00', '48', '00', '00', '00', + '01', '00', '00', '00', '00', '00', '01', '00', + '00', '00', '00', '00', '00', '00', '00', '00', + '79', '00', '00', '00', '00', '00', '00', '00', + '40', '00', '00', '00', '00', '00', '00', '00', + '00', 'a0', '07', '00', '00', '00', '00', '00', + '0b', '93', '07', '00', '00', '00', '00', '00', + '0b', '93', '07', '00', '00', '00', '00', '00', + '31', '7a', '00', 'ee', '0b', '00', '00', '00' +] + +result = ParseDataRuns(input_data) +print(result)