409 lines
14 KiB
Python
409 lines
14 KiB
Python
import os
|
||
|
||
import pytsk3
|
||
|
||
from ntfs_utils.db_config import GetNTFSBootInfo
|
||
|
||
|
||
def find_file_mft_entry(fs, target_path):
|
||
"""
|
||
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
|
||
"""
|
||
|
||
def traverse_directory(inode, path_components):
|
||
if not path_components:
|
||
return inode
|
||
|
||
dir_name = path_components[0].lower()
|
||
try:
|
||
directory = fs.open_dir(inode=inode)
|
||
except Exception as e:
|
||
print(f"Error opening directory with inode {inode}: {e}")
|
||
return None
|
||
|
||
for entry in directory:
|
||
if not entry.info or not entry.info.name or not entry.info.meta:
|
||
continue
|
||
|
||
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
|
||
meta = entry.info.meta
|
||
|
||
# 匹配当前层级目录或文件名
|
||
if name == dir_name:
|
||
if len(path_components) == 1:
|
||
# 是目标文件/目录
|
||
return meta.addr
|
||
|
||
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
|
||
# 继续深入查找子目录
|
||
next_inode = entry.info.meta.addr
|
||
result = traverse_directory(next_inode, path_components[1:])
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
# 拆分路径
|
||
path_parts = target_path.strip("\\").lower().split("\\")
|
||
root_inode = fs.info.root_inum # 根目录 MFT Entry
|
||
return traverse_directory(root_inode, path_parts)
|
||
|
||
|
||
def GetFileMftEntry(file_path):
|
||
"""
|
||
获取指定文件在 NTFS 中的 MFT Entry 编号
|
||
"""
|
||
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"File not found: {file_path}")
|
||
|
||
# 获取驱动器字母
|
||
drive_letter = os.path.splitdrive(file_path)[0][0]
|
||
device = f"\\\\.\\{drive_letter}:"
|
||
|
||
# print(f"Opening device: {device}")
|
||
|
||
try:
|
||
img = pytsk3.Img_Info(device)
|
||
fs = pytsk3.FS_Info(img)
|
||
except Exception as e:
|
||
raise RuntimeError(f"Failed to open device '{device}': {e}")
|
||
|
||
# 构建相对路径
|
||
abs_path = os.path.abspath(file_path)
|
||
root_path = f"{drive_letter}:\\"
|
||
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
|
||
|
||
# print(f"Looking up MFT entry for: {rel_path}")
|
||
|
||
mft_entry = find_file_mft_entry(fs, rel_path)
|
||
# print(f"MFT Entry: {mft_entry}")
|
||
if mft_entry is None:
|
||
raise RuntimeError("Could not find MFT entry for the specified file.")
|
||
|
||
return mft_entry
|
||
|
||
|
||
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
|
||
"""
|
||
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
|
||
|
||
参数:
|
||
mft_entry (int): 文件的 MFT Entry 编号(即 inode)
|
||
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
|
||
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
|
||
bytes_per_sector (int): 每扇区字节数,默认 512
|
||
|
||
返回:
|
||
int: 文件 MFT Entry 的起始扇区号
|
||
"""
|
||
if mft_entry < 0:
|
||
raise ValueError("MFT Entry 编号不能为负数")
|
||
|
||
# 获取 NTFS 引导信息
|
||
config_data = GetNTFSBootInfo(volume_letter)
|
||
# 计算文件 MFT Entry 的起始扇区号
|
||
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
|
||
if start_sector < 0:
|
||
raise ValueError("起始扇区号不能为负数")
|
||
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
|
||
return start_sector
|
||
|
||
|
||
def Get80hPattern(sector_number, volume_letter="Z"):
|
||
"""
|
||
读取NTFS扇区并查找特定模式的数据
|
||
|
||
参数:
|
||
sector_number (int): 要读取的扇区号
|
||
drive_path (str): 磁盘设备路径,默认为Z盘
|
||
|
||
返回:
|
||
list: 包含所有匹配信息的列表,每个元素为:
|
||
{
|
||
'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512),
|
||
'offset': 当前80属性在扇区内的偏移位置,
|
||
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."),
|
||
'is_resident': 是否为常驻属性,
|
||
'total_groups': 实际读取的组数,
|
||
'attribute_length': 属性总长度(字节)
|
||
}
|
||
"""
|
||
drive_path = fr"\\.\{volume_letter}:"
|
||
SECTOR_SIZE = 512
|
||
GROUP_SIZE = 8 # 每组8字节
|
||
MATCH_BYTE = 0x80 # 要匹配的起始字节
|
||
results = []
|
||
|
||
try:
|
||
with open(drive_path, 'rb') as disk:
|
||
disk.seek(sector_number * SECTOR_SIZE)
|
||
sector_data = disk.read(SECTOR_SIZE)
|
||
|
||
if not sector_data or len(sector_data) < GROUP_SIZE:
|
||
print(f"错误: 无法读取扇区 {sector_number}")
|
||
return results
|
||
|
||
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
|
||
|
||
for i in range(len(groups)):
|
||
current_group = groups[i]
|
||
|
||
if len(current_group) < GROUP_SIZE:
|
||
continue
|
||
|
||
if current_group[0] == MATCH_BYTE:
|
||
# 获取第5~8字节作为属性长度(小端DWORD)
|
||
if i + 1 >= len(groups):
|
||
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||
continue
|
||
|
||
attribute_length_bytes = b''.join([
|
||
groups[i][4:8], # 第一组的4~7字节
|
||
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
|
||
])
|
||
|
||
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
|
||
|
||
# 计算要读取的组数(向上取整到8字节)
|
||
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
|
||
|
||
end_idx = i + total_groups
|
||
if end_idx > len(groups):
|
||
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||
continue
|
||
|
||
raw_sequence = groups[i:end_idx]
|
||
|
||
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
|
||
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
|
||
|
||
# 判断是否为常驻属性(查看第2个组第一个字节最低位)
|
||
is_resident = False
|
||
if len(raw_sequence) >= 2:
|
||
second_group = raw_sequence[1]
|
||
is_resident = (second_group[0] & 0x01) == 0x00
|
||
|
||
result_entry = {
|
||
'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置
|
||
'offset': i * GROUP_SIZE,
|
||
'sequence': formatted_sequence,
|
||
'is_resident': is_resident,
|
||
'total_groups': total_groups,
|
||
'attribute_length': attribute_length
|
||
}
|
||
|
||
results.append(result_entry)
|
||
|
||
# resident_str = "常驻" if is_resident else "非常驻"
|
||
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
|
||
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
|
||
# for j, group in enumerate(formatted_sequence):
|
||
# print(f"组 {j + 1}: {group}")
|
||
#
|
||
# print(f"\n共找到 {len(results)} 个匹配序列")
|
||
|
||
return results
|
||
|
||
except PermissionError:
|
||
print("错误: 需要管理员权限访问磁盘设备")
|
||
except Exception as e:
|
||
print(f"发生错误: {str(e)}")
|
||
|
||
return results
|
||
|
||
|
||
def GetFile80hPattern(file_path):
|
||
volume_letter = file_path.split(':')[0]
|
||
try:
|
||
mft_entry_value = GetFileMftEntry(file_path)
|
||
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
|
||
# print(f"文件的相关信息以及80属性内容:")
|
||
# print(Get80hPattern(StartSector, volume_letter))
|
||
file80h_pattern = Get80hPattern(StartSector, volume_letter)
|
||
return file80h_pattern
|
||
except Exception as e:
|
||
print(f"❌ Error: {e}")
|
||
return None
|
||
|
||
|
||
# if __name__ == '__main__':
|
||
# data = GetFile80hPattern(r"Z:\hello.txt")
|
||
# print(data)
|
||
|
||
|
||
def ExtractSequenceHexValues(file80h_pattern):
|
||
"""
|
||
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
|
||
|
||
参数:
|
||
data (list): 包含字典的列表,每个字典有 'sequence' 键
|
||
|
||
返回:
|
||
list: 包含所有 sequence 值的合并列表
|
||
"""
|
||
sequence_list = []
|
||
for entry in file80h_pattern:
|
||
if 'sequence' in entry:
|
||
# 将每个十六进制字符串按空格分割,然后合并到结果列表
|
||
for hex_str in entry['sequence']:
|
||
# 分割字符串并添加到结果
|
||
sequence_list.extend(hex_str.split())
|
||
return sequence_list
|
||
|
||
|
||
def ExportDataRunList(data_run_list):
|
||
"""
|
||
将 data_run_list 拆分成多个独立的 Data Run 片段。
|
||
"""
|
||
result = []
|
||
pos = 0
|
||
while pos < len(data_run_list):
|
||
current_byte = data_run_list[pos]
|
||
if current_byte == '00':
|
||
break
|
||
try:
|
||
header = int(current_byte, 16)
|
||
len_bytes = (header >> 4) & 0x0F
|
||
offset_bytes = header & 0x0F
|
||
|
||
run_length = 1 + offset_bytes + len_bytes
|
||
if pos + run_length > len(data_run_list):
|
||
print(f"⚠️ 数据越界,停止解析")
|
||
break
|
||
|
||
fragment = data_run_list[pos: pos + run_length]
|
||
result.append(fragment)
|
||
pos += run_length
|
||
except Exception as e:
|
||
print(f"❌ 解析 Data Run 失败:位置 {pos}, 错误: {e}")
|
||
pos += 1 # 跳过一个字节继续解析
|
||
return result
|
||
|
||
|
||
def hex_list_to_int(lst, byteorder='little'):
|
||
"""
|
||
将十六进制字符串列表转换为整数(支持小端序)
|
||
"""
|
||
if byteorder == 'little':
|
||
lst = list(reversed(lst))
|
||
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
|
||
|
||
|
||
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
|
||
"""
|
||
解析 NTFS 单个 Data Run,返回起始字节、结束字节、长度(字节)
|
||
|
||
参数:
|
||
data_run (list): Data Run 的十六进制字符串列表
|
||
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
|
||
cluster_size (int): 簇大小(默认为 512 字节)
|
||
|
||
返回:
|
||
dict: 包含起始字节、结束字节、长度等信息
|
||
"""
|
||
if not data_run or data_run[0] == '00':
|
||
return None
|
||
|
||
header = int(data_run[0], 16)
|
||
len_bytes = (header >> 4) & 0x0F
|
||
offset_bytes = header & 0x0F
|
||
|
||
if len(data_run) < 1 + offset_bytes + len_bytes:
|
||
print(f"⚠️ 数据长度不足,无法解析 Data Run")
|
||
return None
|
||
|
||
# 提取偏移字段和长度字段
|
||
offset_data = data_run[1:1 + offset_bytes]
|
||
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
|
||
|
||
# 小端序转整数
|
||
def hex_list_to_int(lst):
|
||
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
|
||
|
||
offset = hex_list_to_int(offset_data)
|
||
run_length = hex_list_to_int(length_data)
|
||
|
||
# 计算起始簇号
|
||
starting_cluster = previous_cluster + offset
|
||
ending_cluster = starting_cluster + run_length - 1
|
||
|
||
# 转换为字节偏移
|
||
cluster_per_sector = 8
|
||
byte_per_sector = cluster_size
|
||
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
|
||
starting_byte = run_length * cluster_per_sector * byte_per_sector
|
||
ending_byte = starting_byte + byte_length - 1
|
||
|
||
return {
|
||
"starting_byte": starting_byte,
|
||
"ending_byte": ending_byte,
|
||
"byte_length": byte_length,
|
||
"starting_cluster": starting_cluster,
|
||
"run_length_clusters": run_length
|
||
}
|
||
|
||
|
||
def ParseMultipleDataRuns(fragments, cluster_size=512):
|
||
"""
|
||
批量解析多个 Data Run 片段,返回字节偏移信息。
|
||
|
||
参数:
|
||
fragments (list): 多个 Data Run 字符串列表
|
||
cluster_size (int): 簇大小(默认为 512)
|
||
|
||
返回:
|
||
list: 每个元素是一个包含字节偏移信息的 dict
|
||
"""
|
||
results = []
|
||
previous_starting_cluster = 0
|
||
|
||
for fragment in fragments:
|
||
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
|
||
|
||
if result:
|
||
results.append(result)
|
||
previous_starting_cluster = result["starting_cluster"]
|
||
|
||
return results
|
||
|
||
|
||
def GetFragmentData(file80h_pattern):
|
||
if not file80h_pattern or not isinstance(file80h_pattern, list):
|
||
return []
|
||
|
||
if file80h_pattern[0].get('is_resident'):
|
||
start_byte = file80h_pattern[0].get('start_byte')
|
||
offset = file80h_pattern[0].get('offset')
|
||
content_start = file80h_pattern[0].get('sequence')[2]
|
||
|
||
content_start_list = content_start.split()
|
||
content_len = content_start_list[::-1][4:8]
|
||
content_offset = content_start_list[::-1][:4]
|
||
|
||
content_len_str = ''.join(content_len)
|
||
content_len_decimal_value = int(content_len_str, 16)
|
||
content_offset_str = ''.join(content_offset)
|
||
content_offset_decimal_value = int(content_offset_str, 16)
|
||
|
||
file_offset = start_byte + offset + content_offset_decimal_value
|
||
|
||
return [{
|
||
'starting_byte': file_offset,
|
||
'byte_length': content_len_decimal_value
|
||
}]
|
||
|
||
else:
|
||
sequence_list = ExtractSequenceHexValues(file80h_pattern)
|
||
data_run_offset = sequence_list[32:34][::-1]
|
||
data_run_offset_str = ''.join(data_run_offset)
|
||
data_run_offset_decimal_value = int(data_run_offset_str, 16)
|
||
data_run_list = sequence_list[data_run_offset_decimal_value:]
|
||
fragments = ExportDataRunList(data_run_list)
|
||
results = ParseMultipleDataRuns(fragments)
|
||
return results
|
||
|
||
# if __name__ == '__main__':
|
||
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
|
||
# data = GetFragmentData(arri80_data)
|
||
# print(data)
|