Compare commits

5 Commits

Author SHA1 Message Date
f9e72de564 删除 src/db_ntfs_info.db 2025-05-19 17:47:48 +08:00
Burgess Leo
542e334987 modify .gitignore 2025-05-19 17:45:57 +08:00
Burgess Leo
975f7f3fbc modify FOREIGN KEY db_path 2025-05-19 10:35:37 +08:00
Burgess Leo
ae777f75d9 analyze db_node table 2025-05-16 17:45:35 +08:00
Burgess Leo
7d21842287 reanalyze db_path schema 2025-05-15 18:04:59 +08:00
10 changed files with 593 additions and 51 deletions

3
.gitignore vendored
View File

@@ -175,4 +175,5 @@ cython_debug/
.pypirc
# Custom stuff
.idea/
.idea/
src/*.db

View File

@@ -0,0 +1,29 @@
import sqlite3
def ClearTableRecordsWithReset(db_path, table_name):
"""
清空指定表的记录并重置自增ID。
:param db_path: str, SQLite 数据库路径
:param table_name: str, 表名
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute(f"DELETE FROM {table_name};")
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name='{table_name}';")
conn.commit()
print(f"表 [{table_name}] 已清空并重置自增ID")
except sqlite3.Error as e:
print(f"❌ 操作失败: {e}")
finally:
conn.close()
if __name__ == '__main__':
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')

View File

@@ -86,9 +86,9 @@ def CreateDBDeviceTable(db_path='../src/db_ntfs_info.db', table_name='db_device'
def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建节点信息
创建 NewDBNode 表,用于存储文件的具体属性和物理分布信息。
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -100,28 +100,18 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
# 连接到SQLite数据库如果文件不存在会自动创建
conn = sqlite3.connect(db_path)
# 创建一个游标对象
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
cursor = conn.cursor()
# 动态构建创建表的SQL语句
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
PathID INTEGER,
ParentID INTEGER,
NameHash TEXT,
PathHash TEXT,
PathID INTEGER NOT NULL,
ExtendNameID INTEGER,
DirLayer INTEGER,
GroupID INTEGER,
UserID INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileSize INTEGER,
FileMode INTEGER,
FileHash TEXT,
ExtentCount INTEGER,
extent1_DeviceID INTEGER,
@@ -137,21 +127,17 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
extent4_Location INTEGER,
extent4_Length INTEGER,
-- 外键约束(可选)
FOREIGN KEY(PathID) REFERENCES path_table(ID),
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
FOREIGN KEY(GroupID) REFERENCES groups(ID),
FOREIGN KEY(UserID) REFERENCES users(ID)
-- 外键约束
FOREIGN KEY(PathID) REFERENCES NewDBPath(ID),
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
FOREIGN KEY(UserID) REFERENCES db_user(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
# 关闭连接
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
@@ -280,12 +266,11 @@ def CreateDBExtendSnippetTable(db_path='../src/db_ntfs_info.db', table_name='db_
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建路径信息表,
包含 DeviceID 字段,用于标记文件所属设备(磁盘)。
创建 NewDBPath 表,用于存储文件/目录的路径信息
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -295,39 +280,42 @@ def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
if directory and not os.path.exists(directory):
os.makedirs(directory)
# 连接到SQLite数据库如果文件不存在会自动创建)
# 连接到SQLite数据库如果不存在会自动创建
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
cursor = conn.cursor()
# 动态构建创建表的SQL语句(包含 DeviceID 外键)
# 动态构建创建表的SQL语句
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
-- DeviceID TEXT NOT NULL,
Path TEXT NOT NULL,
Name TEXT NOT NULL,
DirLayer INTEGER NOT NULL,
PathHash TEXT UNIQUE NOT NULL,
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
-- 外键约束
-- FOREIGN KEY(DeviceID) REFERENCES db_device(ID),
-- 外键约束(可选)
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBExtendNameTable(db_path='../src/db_extend_name.db', table_name='db_extend_name'):
def CreateDBExtendNameTable(db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建扩展名表。

View File

@@ -8,9 +8,10 @@ def GetNTFSBootInfo(volume_letter):
- Bytes per sector
- Sectors per cluster
- Cluster size (bytes)
- $MFT 起始簇号
参数:
volume_letter: 卷标字符串,例如 'C'
volume_letter: str卷标字符串,例如 'C'
返回:
dict 包含上述信息
@@ -62,10 +63,15 @@ def GetNTFSBootInfo(volume_letter):
# 计算簇大小
cluster_size = bytes_per_sector * sectors_per_cluster
# 解析 $MFT 起始簇号LCN偏移 0x30QWORD8 字节)
mft_lcn_bytes = buffer[0x30:0x38]
mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False)
return {
"BytesPerSector": bytes_per_sector,
"SectorsPerCluster": sectors_per_cluster,
"ClusterSize": cluster_size
"ClusterSize": cluster_size,
"MftPosition": mft_lcn
}

187
ntfs_utils/db_node.py Normal file
View File

@@ -0,0 +1,187 @@
import hashlib
import random
import sqlite3
from mft_analyze import GetFile80hPattern
# ✅ 工具函数:获取文件扩展名
def GetFileExtension(name: str) -> str:
parts = name.rsplit('.', 1)
if len(parts) > 1:
return parts[1].lower()
return ""
# ✅ 函数:获取 ExtendNameID基于文件名后缀
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
ext = GetFileExtension(name)
if not ext:
return 0
cursor.execute("SELECT ID FROM db_extend_name WHERE ExtendName = ?", (ext,))
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 函数:获取 GroupID默认第一个
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 函数:获取 UserID默认第一个
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 【伪代码】获取文件大小(字节)
def GetFileSize(full_path: str) -> int:
return 10
# ✅ 【伪代码】获取文件内容哈希
def GetFileHash(full_path: str) -> str:
return hashlib.sha256(b"mocked_file_content").hexdigest()
# ✅ 【伪代码】获取分片数
def GetExtentCount(full_path: str) -> int:
return 1
# ✅ 【伪代码】获取设备IDdb_device第一条记录
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 【伪代码】获取随机 Location
def GetRandomLocation() -> int:
return random.randint(1000, 9999)
# ✅ 【伪代码】获取随机 Length
def GetRandomLength() -> int:
return random.randint(1000, 9999)
# ✅ 主函数:遍历 NewDBPath 插入 NewDBNode或自定义表名
def InsertNodeDataToDb(db_path='../src/filesystem.db', table_name='db_node'):
"""
遍历 NewDBPath 表,并生成对应的 Node 数据插入到指定表中。
参数:
db_path: str数据库路径
table_name: str目标表名
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 动态创建表(如果不存在)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
PathID INTEGER NOT NULL,
ExtendNameID INTEGER,
GroupID INTEGER,
UserID INTEGER,
FileSize INTEGER,
FileHash TEXT,
ExtentCount INTEGER,
extent1_DeviceID INTEGER,
extent1_Location INTEGER,
extent1_Length INTEGER,
extent2_DeviceID INTEGER,
extent2_Location INTEGER,
extent2_Length INTEGER,
extent3_DeviceID INTEGER,
extent3_Location INTEGER,
extent3_Length INTEGER,
extent4_DeviceID INTEGER,
extent4_Location INTEGER,
extent4_Length INTEGER,
-- 外键约束
FOREIGN KEY(PathID) REFERENCES db_path(ID),
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
FOREIGN KEY(UserID) REFERENCES db_user(ID)
);
"""
cursor.execute(create_table_sql)
# 获取所有 NewDBPath 记录
cursor.execute("SELECT ID, Name, Path, IsDir FROM db_path")
path_records = cursor.fetchall()
batch = []
device_id = GetDeviceId(cursor)
for path_id, name, full_path, is_dir in path_records:
if is_dir == 1:
extend_name_id = 0
else:
extend_name_id = GetExtendNameId(name, cursor)
group_id = GetFirstGroupId(cursor)
user_id = GetFirstUserId(cursor)
file_size = GetFileSize(full_path)
file_hash = GetFileHash(full_path)
extent_count = GetExtentCount(full_path)
# 构造 extent 数据(最多 4 个片段)
extent_data = []
for i in range(extent_count):
extent_data.append((device_id, GetRandomLocation(), GetRandomLength()))
# 填充到 4 个字段
while len(extent_data) < 4:
extent_data.append((0, 0, 0))
# 添加到批次插入数据
batch.append((
path_id,
extend_name_id,
group_id,
user_id,
file_size,
file_hash,
extent_count,
*extent_data[0],
*extent_data[1],
*extent_data[2],
*extent_data[3]
))
# 批量插入
insert_sql = f"""
INSERT OR IGNORE INTO {table_name} (
PathID, ExtendNameID, GroupID, UserID, FileSize, FileHash, ExtentCount,
extent1_DeviceID, extent1_Location, extent1_Length,
extent2_DeviceID, extent2_Location, extent2_Length,
extent3_DeviceID, extent3_Location, extent3_Length,
extent4_DeviceID, extent4_Location, extent4_Length
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 成功插入 {cursor.rowcount}{table_name} 记录")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
# 示例调用
if __name__ == "__main__":
InsertNodeDataToDb(db_path='../src/db_ntfs_info.db', table_name='db_node')

View File

@@ -1,6 +1,49 @@
import hashlib
import os
import sqlite3
import time
def get_file_times(full_path):
"""
获取文件的创建、修改、访问时间,并格式化为字符串。
参数:
full_path: str文件路径
返回:
tuple: (create_time, modify_time, access_time, auth_time)
"""
try:
stat = os.stat(full_path)
# 转换为可读时间格式YYYY-MM-DD HH:MM:SS
def format_time(timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
create_time = format_time(stat.st_ctime)
modify_time = format_time(stat.st_mtime)
access_time = format_time(stat.st_atime)
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
return create_time, modify_time, access_time, auth_time
except Exception as e:
print(f"⚠️ 获取时间失败: {e}")
return "unknown", "unknown", "unknown", "unknown"
def get_file_mode(full_path):
"""
获取文件权限模式Windows 下模拟)。
可以根据只读、隐藏等属性扩展
"""
try:
stat = os.stat(full_path)
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
return "default"
except Exception as e:
return "unknown"
def GenerateHash(s: str) -> str:
@@ -23,10 +66,37 @@ def ShouldSkipPath(path: str) -> bool:
return False
def GetDirLayer(full_path: str, volume_letter: str) -> int:
"""
根据路径计算目录层级。
示例:
Z:\demo.txt → 0
Z:\folder\test.txt → 1
Z:\folder\subfolder\file.txt → 2
参数:
full_path: str完整路径
volume_letter: str磁盘盘符'Z'
返回:
int层级数
"""
root_prefix = f"{volume_letter.upper()}:\\"
if not full_path.startswith(root_prefix):
return -1 # 非法路径
relative_path = full_path[len(root_prefix):]
if not relative_path:
return 0 # 根目录层级为 0
return len(relative_path.split(os.sep)) - 1
def ScanVolume(volume_letter: str):
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID。
并为每个节点分配 ParentID 和 DirLayer
返回:
list of dict包含文件/目录信息的字典列表
@@ -60,8 +130,6 @@ def ScanVolume(volume_letter: str):
continue
name = entry
# ✅ 修正点:对 Path 字段进行哈希
path_hash = GenerateHash(full_path)
# 计算 ContentSizeKB小文件至少显示为 1 KB
@@ -69,9 +137,13 @@ def ScanVolume(volume_letter: str):
if content_size == 0 and bytes_size > 0:
content_size = 1
# 获取父目录路径
parent_path = os.path.dirname(full_path)
parent_id = path_to_id.get(parent_path, 0) # 默认为 0根目录可能未录入
parent_id = path_to_id.get(parent_path, 0)
dir_layer = GetDirLayer(full_path, volume_letter)
# ✅ 获取文件时间属性
ctime, mtime, atime, chgtime = get_file_times(full_path)
mode = get_file_mode(full_path)
item = {
"ID": counter,
@@ -80,7 +152,13 @@ def ScanVolume(volume_letter: str):
"PathHash": path_hash,
"IsDir": is_dir,
"ParentID": parent_id,
"ContentSize": content_size
"ContentSize": content_size,
"DirLayer": dir_layer,
"FileCreateTime": ctime,
"FileModifyTime": mtime,
"FileAccessTime": atime,
"FileAuthTime": chgtime,
"FileMode": mode
}
result.append(item)
@@ -95,7 +173,13 @@ def ScanVolume(volume_letter: str):
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
批量将扫描结果写入数据库
批量将扫描结果写入 NewDBPath 表中,支持新字段
参数:
data: list of dict扫描结果数据
db_path: strSQLite 数据库路径
table_name: str目标表名
batch_size: int每多少条提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
@@ -111,6 +195,12 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
DirLayer INTEGER NOT NULL,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
@@ -120,8 +210,8 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
# 插入语句(忽略重复 PathHash
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
VALUES (?, ?, ?, ?, ?, ?)
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
total_inserted = 0
@@ -134,7 +224,13 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
item['PathHash'],
item['IsDir'],
item['ParentID'] or 0,
item['ContentSize']
item['ContentSize'],
item['DirLayer'],
item['FileCreateTime'],
item['FileModifyTime'],
item['FileAccessTime'],
item['FileAuthTime'],
item['FileMode']
))
if len(batch) >= batch_size:
@@ -151,7 +247,7 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
total_inserted += cursor.rowcount
print(f"✅ 提交最后一批 {len(batch)} 条数据")
print(f"✅ 总共插入 {total_inserted} 条记录到数据库")
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表")
except Exception as e:
print(f"❌ 插入失败: {e}")
@@ -171,7 +267,7 @@ def main():
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
InsertPathDataToDB(scanned_data)
print("✅ 全盘扫描与入库完成")
print("✅ 全盘扫描与 NewDBPath 表入库完成")
if __name__ == "__main__":

224
ntfs_utils/mft_analyze.py Normal file
View File

@@ -0,0 +1,224 @@
import os
from typing import Any
import pytsk3
from db_config import GetNTFSBootInfo
def find_file_mft_entry(fs, target_path):
"""
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
"""
def traverse_directory(inode, path_components):
if not path_components:
return inode
dir_name = path_components[0].lower()
try:
directory = fs.open_dir(inode=inode)
except Exception as e:
print(f"Error opening directory with inode {inode}: {e}")
return None
for entry in directory:
if not entry.info or not entry.info.name or not entry.info.meta:
continue
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
meta = entry.info.meta
# 匹配当前层级目录或文件名
if name == dir_name:
if len(path_components) == 1:
# 是目标文件/目录
return meta.addr
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
# 继续深入查找子目录
next_inode = entry.info.meta.addr
result = traverse_directory(next_inode, path_components[1:])
if result:
return result
return None
# 拆分路径
path_parts = target_path.strip("\\").lower().split("\\")
root_inode = fs.info.root_inum # 根目录 MFT Entry
return traverse_directory(root_inode, path_parts)
def GetFileMftEntry(file_path):
"""
获取指定文件在 NTFS 中的 MFT Entry 编号
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# 获取驱动器字母
drive_letter = os.path.splitdrive(file_path)[0][0]
device = f"\\\\.\\{drive_letter}:"
print(f"Opening device: {device}")
try:
img = pytsk3.Img_Info(device)
fs = pytsk3.FS_Info(img)
except Exception as e:
raise RuntimeError(f"Failed to open device '{device}': {e}")
# 构建相对路径
abs_path = os.path.abspath(file_path)
root_path = f"{drive_letter}:\\"
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
print(f"Looking up MFT entry for: {rel_path}")
mft_entry = find_file_mft_entry(fs, rel_path)
if mft_entry is None:
raise RuntimeError("Could not find MFT entry for the specified file.")
return mft_entry
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
"""
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
参数:
mft_entry (int): 文件的 MFT Entry 编号(即 inode
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
bytes_per_sector (int): 每扇区字节数,默认 512
返回:
int: 文件 MFT Entry 的起始扇区号
"""
if mft_entry < 0:
raise ValueError("MFT Entry 编号不能为负数")
# 获取 NTFS 引导信息
config_data = GetNTFSBootInfo(volume_letter)
# 计算文件 MFT Entry 的起始扇区号
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
return start_sector
def Get80hPattern(sector_number, volume_letter="Z"):
"""
读取NTFS扇区并查找特定模式的数据
参数:
sector_number (int): 要读取的扇区号
drive_path (str): 磁盘设备路径默认为Z盘
返回:
list: 包含所有匹配信息的列表,每个元素为:
{
'start_byte': 文件MFT Entry的起始字节位置StartSector * 512,
'offset': 当前80属性在扇区内的偏移位置,
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ...",
'is_resident': 是否为常驻属性,
'total_groups': 实际读取的组数,
'attribute_length': 属性总长度(字节)
}
"""
drive_path = fr"\\.\{volume_letter}:"
SECTOR_SIZE = 512
GROUP_SIZE = 8 # 每组8字节
MATCH_BYTE = 0x80 # 要匹配的起始字节
results = []
try:
with open(drive_path, 'rb') as disk:
disk.seek(sector_number * SECTOR_SIZE)
sector_data = disk.read(SECTOR_SIZE)
if not sector_data or len(sector_data) < GROUP_SIZE:
print(f"错误: 无法读取扇区 {sector_number}")
return results
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
for i in range(len(groups)):
current_group = groups[i]
if len(current_group) < GROUP_SIZE:
continue
if current_group[0] == MATCH_BYTE:
# 获取第5~8字节作为属性长度小端DWORD
if i + 1 >= len(groups):
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
attribute_length_bytes = b''.join([
groups[i][4:8], # 第一组的4~7字节
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
])
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
# 计算要读取的组数向上取整到8字节
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
end_idx = i + total_groups
if end_idx > len(groups):
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
raw_sequence = groups[i:end_idx]
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
# 判断是否为常驻属性查看第2个组第一个字节最低位
is_resident = False
if len(raw_sequence) >= 2:
second_group = raw_sequence[1]
is_resident = (second_group[0] & 0x01) == 0x00
result_entry = {
'start_byte': sector_number * SECTOR_SIZE, # 新增字段文件MFT Entry的起始字节位置
'offset': i * GROUP_SIZE,
'sequence': formatted_sequence,
'is_resident': is_resident,
'total_groups': total_groups,
'attribute_length': attribute_length
}
results.append(result_entry)
# resident_str = "常驻" if is_resident else "非常驻"
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
# for j, group in enumerate(formatted_sequence):
# print(f"组 {j + 1}: {group}")
#
# print(f"\n共找到 {len(results)} 个匹配序列")
return results
except PermissionError:
print("错误: 需要管理员权限访问磁盘设备")
except Exception as e:
print(f"发生错误: {str(e)}")
return results
def GetFile80hPattern(file_path):
volume_letter = file_path.split(':')[0]
try:
mft_entry_value = GetFileMftEntry(file_path)
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
print(Get80hPattern(StartSector, volume_letter))
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == '__main__':
GetFile80hPattern(r"Z:\demo.jpg")

View File

@@ -6,4 +6,5 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"psutil>=7.0.0",
"pytsk3>=20250312",
]

Binary file not shown.

12
uv.lock generated
View File

@@ -8,10 +8,14 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "psutil" },
{ name = "pytsk3" },
]
[package.metadata]
requires-dist = [{ name = "psutil", specifier = ">=7.0.0" }]
requires-dist = [
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pytsk3", specifier = ">=20250312" },
]
[[package]]
name = "psutil"
@@ -27,3 +31,9 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
]
[[package]]
name = "pytsk3"
version = "20250312"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/59/3f/2d440547eecca1786c2178a3e010e7fb61da1f0468d9809ff2b5b8fbb39b/pytsk3-20250312.tar.gz", hash = "sha256:bb47d4aa5976adbc8d4350bed719b771c548139bc8efe761e1d081aa99074c1b", size = 5274913, upload-time = "2025-03-12T05:49:14.937Z" }