228 lines
8.1 KiB
Python
228 lines
8.1 KiB
Python
import os
|
||
|
||
import pytsk3
|
||
|
||
from db_config import GetNTFSBootInfo
|
||
|
||
|
||
def find_file_mft_entry(fs, target_path):
|
||
"""
|
||
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
|
||
"""
|
||
|
||
def traverse_directory(inode, path_components):
|
||
if not path_components:
|
||
return inode
|
||
|
||
dir_name = path_components[0].lower()
|
||
try:
|
||
directory = fs.open_dir(inode=inode)
|
||
except Exception as e:
|
||
print(f"Error opening directory with inode {inode}: {e}")
|
||
return None
|
||
|
||
for entry in directory:
|
||
if not entry.info or not entry.info.name or not entry.info.meta:
|
||
continue
|
||
|
||
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
|
||
meta = entry.info.meta
|
||
|
||
# 匹配当前层级目录或文件名
|
||
if name == dir_name:
|
||
if len(path_components) == 1:
|
||
# 是目标文件/目录
|
||
return meta.addr
|
||
|
||
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
|
||
# 继续深入查找子目录
|
||
next_inode = entry.info.meta.addr
|
||
result = traverse_directory(next_inode, path_components[1:])
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
# 拆分路径
|
||
path_parts = target_path.strip("\\").lower().split("\\")
|
||
root_inode = fs.info.root_inum # 根目录 MFT Entry
|
||
return traverse_directory(root_inode, path_parts)
|
||
|
||
|
||
def GetFileMftEntry(file_path):
|
||
"""
|
||
获取指定文件在 NTFS 中的 MFT Entry 编号
|
||
"""
|
||
|
||
if not os.path.exists(file_path):
|
||
raise FileNotFoundError(f"File not found: {file_path}")
|
||
|
||
# 获取驱动器字母
|
||
drive_letter = os.path.splitdrive(file_path)[0][0]
|
||
device = f"\\\\.\\{drive_letter}:"
|
||
|
||
print(f"Opening device: {device}")
|
||
|
||
try:
|
||
img = pytsk3.Img_Info(device)
|
||
fs = pytsk3.FS_Info(img)
|
||
except Exception as e:
|
||
raise RuntimeError(f"Failed to open device '{device}': {e}")
|
||
|
||
# 构建相对路径
|
||
abs_path = os.path.abspath(file_path)
|
||
root_path = f"{drive_letter}:\\"
|
||
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
|
||
|
||
print(f"Looking up MFT entry for: {rel_path}")
|
||
|
||
mft_entry = find_file_mft_entry(fs, rel_path)
|
||
print(f"MFT Entry: {mft_entry}")
|
||
if mft_entry is None:
|
||
raise RuntimeError("Could not find MFT entry for the specified file.")
|
||
|
||
return mft_entry
|
||
|
||
|
||
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
|
||
"""
|
||
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
|
||
|
||
参数:
|
||
mft_entry (int): 文件的 MFT Entry 编号(即 inode)
|
||
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
|
||
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
|
||
bytes_per_sector (int): 每扇区字节数,默认 512
|
||
|
||
返回:
|
||
int: 文件 MFT Entry 的起始扇区号
|
||
"""
|
||
if mft_entry < 0:
|
||
raise ValueError("MFT Entry 编号不能为负数")
|
||
|
||
# 获取 NTFS 引导信息
|
||
config_data = GetNTFSBootInfo(volume_letter)
|
||
# 计算文件 MFT Entry 的起始扇区号
|
||
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
|
||
if start_sector < 0:
|
||
raise ValueError("起始扇区号不能为负数")
|
||
print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
|
||
return start_sector
|
||
|
||
|
||
def Get80hPattern(sector_number, volume_letter="Z"):
|
||
"""
|
||
读取NTFS扇区并查找特定模式的数据
|
||
|
||
参数:
|
||
sector_number (int): 要读取的扇区号
|
||
drive_path (str): 磁盘设备路径,默认为Z盘
|
||
|
||
返回:
|
||
list: 包含所有匹配信息的列表,每个元素为:
|
||
{
|
||
'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512),
|
||
'offset': 当前80属性在扇区内的偏移位置,
|
||
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."),
|
||
'is_resident': 是否为常驻属性,
|
||
'total_groups': 实际读取的组数,
|
||
'attribute_length': 属性总长度(字节)
|
||
}
|
||
"""
|
||
drive_path = fr"\\.\{volume_letter}:"
|
||
SECTOR_SIZE = 512
|
||
GROUP_SIZE = 8 # 每组8字节
|
||
MATCH_BYTE = 0x80 # 要匹配的起始字节
|
||
results = []
|
||
|
||
try:
|
||
with open(drive_path, 'rb') as disk:
|
||
disk.seek(sector_number * SECTOR_SIZE)
|
||
sector_data = disk.read(SECTOR_SIZE)
|
||
|
||
if not sector_data or len(sector_data) < GROUP_SIZE:
|
||
print(f"错误: 无法读取扇区 {sector_number}")
|
||
return results
|
||
|
||
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
|
||
|
||
for i in range(len(groups)):
|
||
current_group = groups[i]
|
||
|
||
if len(current_group) < GROUP_SIZE:
|
||
continue
|
||
|
||
if current_group[0] == MATCH_BYTE:
|
||
# 获取第5~8字节作为属性长度(小端DWORD)
|
||
if i + 1 >= len(groups):
|
||
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||
continue
|
||
|
||
attribute_length_bytes = b''.join([
|
||
groups[i][4:8], # 第一组的4~7字节
|
||
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
|
||
])
|
||
|
||
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
|
||
|
||
# 计算要读取的组数(向上取整到8字节)
|
||
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
|
||
|
||
end_idx = i + total_groups
|
||
if end_idx > len(groups):
|
||
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||
continue
|
||
|
||
raw_sequence = groups[i:end_idx]
|
||
|
||
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
|
||
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
|
||
|
||
# 判断是否为常驻属性(查看第2个组第一个字节最低位)
|
||
is_resident = False
|
||
if len(raw_sequence) >= 2:
|
||
second_group = raw_sequence[1]
|
||
is_resident = (second_group[0] & 0x01) == 0x00
|
||
|
||
result_entry = {
|
||
'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置
|
||
'offset': i * GROUP_SIZE,
|
||
'sequence': formatted_sequence,
|
||
'is_resident': is_resident,
|
||
'total_groups': total_groups,
|
||
'attribute_length': attribute_length
|
||
}
|
||
|
||
results.append(result_entry)
|
||
|
||
# resident_str = "常驻" if is_resident else "非常驻"
|
||
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
|
||
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
|
||
# for j, group in enumerate(formatted_sequence):
|
||
# print(f"组 {j + 1}: {group}")
|
||
#
|
||
# print(f"\n共找到 {len(results)} 个匹配序列")
|
||
|
||
return results
|
||
|
||
except PermissionError:
|
||
print("错误: 需要管理员权限访问磁盘设备")
|
||
except Exception as e:
|
||
print(f"发生错误: {str(e)}")
|
||
|
||
return results
|
||
|
||
|
||
def GetFile80hPattern(file_path):
|
||
volume_letter = file_path.split(':')[0]
|
||
try:
|
||
mft_entry_value = GetFileMftEntry(file_path)
|
||
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
|
||
print(f"\n文件的相关信息以及80属性内容:")
|
||
print(Get80hPattern(StartSector, volume_letter))
|
||
except Exception as e:
|
||
print(f"❌ Error: {e}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
GetFile80hPattern(r"Z:\hello.txt")
|