Compare commits
5 Commits
main
...
new_db_sch
Author | SHA1 | Date | |
---|---|---|---|
f9e72de564 | |||
![]() |
542e334987 | ||
![]() |
975f7f3fbc | ||
![]() |
ae777f75d9 | ||
![]() |
7d21842287 |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -175,4 +175,5 @@ cython_debug/
|
||||
.pypirc
|
||||
|
||||
# Custom stuff
|
||||
.idea/
|
||||
.idea/
|
||||
src/*.db
|
29
db_manage/clear_table_record.py
Normal file
29
db_manage/clear_table_record.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def ClearTableRecordsWithReset(db_path, table_name):
|
||||
"""
|
||||
清空指定表的记录,并重置自增ID。
|
||||
|
||||
:param db_path: str, SQLite 数据库路径
|
||||
:param table_name: str, 表名
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"DELETE FROM {table_name};")
|
||||
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name='{table_name}';")
|
||||
conn.commit()
|
||||
print(f"表 [{table_name}] 已清空并重置自增ID")
|
||||
except sqlite3.Error as e:
|
||||
print(f"❌ 操作失败: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')
|
@@ -86,9 +86,9 @@ def CreateDBDeviceTable(db_path='../src/db_ntfs_info.db', table_name='db_device'
|
||||
|
||||
def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建节点信息表。
|
||||
创建 NewDBNode 表,用于存储文件的具体属性和物理分布信息。
|
||||
|
||||
:param db_path: str, 数据库文件的路径
|
||||
:param db_path: str, 数据库文件路径
|
||||
:param table_name: str, 要创建的表名
|
||||
:return: None
|
||||
"""
|
||||
@@ -100,28 +100,18 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
|
||||
# 连接到SQLite数据库(如果文件不存在会自动创建)
|
||||
conn = sqlite3.connect(db_path)
|
||||
|
||||
# 创建一个游标对象
|
||||
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 动态构建创建表的SQL语句
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
PathID INTEGER,
|
||||
ParentID INTEGER,
|
||||
NameHash TEXT,
|
||||
PathHash TEXT,
|
||||
PathID INTEGER NOT NULL,
|
||||
ExtendNameID INTEGER,
|
||||
DirLayer INTEGER,
|
||||
GroupID INTEGER,
|
||||
UserID INTEGER,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileSize INTEGER,
|
||||
FileMode INTEGER,
|
||||
FileHash TEXT,
|
||||
ExtentCount INTEGER,
|
||||
extent1_DeviceID INTEGER,
|
||||
@@ -137,21 +127,17 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
extent4_Location INTEGER,
|
||||
extent4_Length INTEGER,
|
||||
|
||||
-- 外键约束(可选)
|
||||
FOREIGN KEY(PathID) REFERENCES path_table(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES groups(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES users(ID)
|
||||
-- 外键约束
|
||||
FOREIGN KEY(PathID) REFERENCES NewDBPath(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES db_user(ID)
|
||||
);
|
||||
"""
|
||||
|
||||
# 执行SQL语句
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 提交更改
|
||||
conn.commit()
|
||||
|
||||
# 关闭连接
|
||||
conn.close()
|
||||
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
@@ -280,12 +266,11 @@ def CreateDBExtendSnippetTable(db_path='../src/db_ntfs_info.db', table_name='db_
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
|
||||
|
||||
def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
|
||||
def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建路径信息表,
|
||||
包含 DeviceID 字段,用于标记文件所属设备(磁盘)。
|
||||
创建 NewDBPath 表,用于存储文件/目录的路径信息。
|
||||
|
||||
:param db_path: str, 数据库文件的路径
|
||||
:param db_path: str, 数据库文件路径
|
||||
:param table_name: str, 要创建的表名
|
||||
:return: None
|
||||
"""
|
||||
@@ -295,39 +280,42 @@ def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
|
||||
if directory and not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
# 连接到SQLite数据库(如果文件不存在会自动创建)
|
||||
# 连接到SQLite数据库(如果不存在会自动创建)
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 动态构建创建表的SQL语句(包含 DeviceID 外键)
|
||||
# 动态构建创建表的SQL语句
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
-- DeviceID TEXT NOT NULL,
|
||||
Path TEXT NOT NULL,
|
||||
Name TEXT NOT NULL,
|
||||
DirLayer INTEGER NOT NULL,
|
||||
PathHash TEXT UNIQUE NOT NULL,
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileMode TEXT,
|
||||
|
||||
-- 外键约束
|
||||
-- FOREIGN KEY(DeviceID) REFERENCES db_device(ID),
|
||||
-- 外键约束(可选)
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
"""
|
||||
|
||||
# 执行SQL语句
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 提交更改
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
|
||||
|
||||
def CreateDBExtendNameTable(db_path='../src/db_extend_name.db', table_name='db_extend_name'):
|
||||
def CreateDBExtendNameTable(db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建扩展名表。
|
||||
|
||||
|
@@ -8,9 +8,10 @@ def GetNTFSBootInfo(volume_letter):
|
||||
- Bytes per sector
|
||||
- Sectors per cluster
|
||||
- Cluster size (bytes)
|
||||
- $MFT 起始簇号
|
||||
|
||||
参数:
|
||||
volume_letter: 卷标字符串,例如 'C'
|
||||
volume_letter: str,卷标字符串,例如 'C'
|
||||
|
||||
返回:
|
||||
dict 包含上述信息
|
||||
@@ -62,10 +63,15 @@ def GetNTFSBootInfo(volume_letter):
|
||||
# 计算簇大小
|
||||
cluster_size = bytes_per_sector * sectors_per_cluster
|
||||
|
||||
# 解析 $MFT 起始簇号(LCN),偏移 0x30,QWORD(8 字节)
|
||||
mft_lcn_bytes = buffer[0x30:0x38]
|
||||
mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False)
|
||||
|
||||
return {
|
||||
"BytesPerSector": bytes_per_sector,
|
||||
"SectorsPerCluster": sectors_per_cluster,
|
||||
"ClusterSize": cluster_size
|
||||
"ClusterSize": cluster_size,
|
||||
"MftPosition": mft_lcn
|
||||
}
|
||||
|
||||
|
||||
|
187
ntfs_utils/db_node.py
Normal file
187
ntfs_utils/db_node.py
Normal file
@@ -0,0 +1,187 @@
|
||||
import hashlib
|
||||
import random
|
||||
import sqlite3
|
||||
from mft_analyze import GetFile80hPattern
|
||||
|
||||
|
||||
# ✅ 工具函数:获取文件扩展名
|
||||
def GetFileExtension(name: str) -> str:
|
||||
parts = name.rsplit('.', 1)
|
||||
if len(parts) > 1:
|
||||
return parts[1].lower()
|
||||
return ""
|
||||
|
||||
|
||||
# ✅ 函数:获取 ExtendNameID(基于文件名后缀)
|
||||
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
|
||||
ext = GetFileExtension(name)
|
||||
if not ext:
|
||||
return 0
|
||||
|
||||
cursor.execute("SELECT ID FROM db_extend_name WHERE ExtendName = ?", (ext,))
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# ✅ 函数:获取 GroupID(默认第一个)
|
||||
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# ✅ 函数:获取 UserID(默认第一个)
|
||||
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取文件大小(字节)
|
||||
def GetFileSize(full_path: str) -> int:
|
||||
return 10
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取文件内容哈希
|
||||
def GetFileHash(full_path: str) -> str:
|
||||
return hashlib.sha256(b"mocked_file_content").hexdigest()
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取分片数
|
||||
def GetExtentCount(full_path: str) -> int:
|
||||
return 1
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取设备ID(db_device第一条记录)
|
||||
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取随机 Location
|
||||
def GetRandomLocation() -> int:
|
||||
return random.randint(1000, 9999)
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取随机 Length
|
||||
def GetRandomLength() -> int:
|
||||
return random.randint(1000, 9999)
|
||||
|
||||
|
||||
# ✅ 主函数:遍历 NewDBPath 插入 NewDBNode(或自定义表名)
|
||||
def InsertNodeDataToDb(db_path='../src/filesystem.db', table_name='db_node'):
|
||||
"""
|
||||
遍历 NewDBPath 表,并生成对应的 Node 数据插入到指定表中。
|
||||
|
||||
参数:
|
||||
db_path: str,数据库路径
|
||||
table_name: str,目标表名
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 动态创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
PathID INTEGER NOT NULL,
|
||||
ExtendNameID INTEGER,
|
||||
GroupID INTEGER,
|
||||
UserID INTEGER,
|
||||
FileSize INTEGER,
|
||||
FileHash TEXT,
|
||||
ExtentCount INTEGER,
|
||||
extent1_DeviceID INTEGER,
|
||||
extent1_Location INTEGER,
|
||||
extent1_Length INTEGER,
|
||||
extent2_DeviceID INTEGER,
|
||||
extent2_Location INTEGER,
|
||||
extent2_Length INTEGER,
|
||||
extent3_DeviceID INTEGER,
|
||||
extent3_Location INTEGER,
|
||||
extent3_Length INTEGER,
|
||||
extent4_DeviceID INTEGER,
|
||||
extent4_Location INTEGER,
|
||||
extent4_Length INTEGER,
|
||||
|
||||
-- 外键约束
|
||||
FOREIGN KEY(PathID) REFERENCES db_path(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES db_user(ID)
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 获取所有 NewDBPath 记录
|
||||
cursor.execute("SELECT ID, Name, Path, IsDir FROM db_path")
|
||||
path_records = cursor.fetchall()
|
||||
|
||||
batch = []
|
||||
device_id = GetDeviceId(cursor)
|
||||
|
||||
for path_id, name, full_path, is_dir in path_records:
|
||||
if is_dir == 1:
|
||||
extend_name_id = 0
|
||||
else:
|
||||
extend_name_id = GetExtendNameId(name, cursor)
|
||||
|
||||
group_id = GetFirstGroupId(cursor)
|
||||
user_id = GetFirstUserId(cursor)
|
||||
|
||||
file_size = GetFileSize(full_path)
|
||||
file_hash = GetFileHash(full_path)
|
||||
extent_count = GetExtentCount(full_path)
|
||||
|
||||
# 构造 extent 数据(最多 4 个片段)
|
||||
extent_data = []
|
||||
for i in range(extent_count):
|
||||
extent_data.append((device_id, GetRandomLocation(), GetRandomLength()))
|
||||
|
||||
# 填充到 4 个字段
|
||||
while len(extent_data) < 4:
|
||||
extent_data.append((0, 0, 0))
|
||||
|
||||
# 添加到批次插入数据
|
||||
batch.append((
|
||||
path_id,
|
||||
extend_name_id,
|
||||
group_id,
|
||||
user_id,
|
||||
file_size,
|
||||
file_hash,
|
||||
extent_count,
|
||||
*extent_data[0],
|
||||
*extent_data[1],
|
||||
*extent_data[2],
|
||||
*extent_data[3]
|
||||
))
|
||||
|
||||
# 批量插入
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (
|
||||
PathID, ExtendNameID, GroupID, UserID, FileSize, FileHash, ExtentCount,
|
||||
extent1_DeviceID, extent1_Location, extent1_Length,
|
||||
extent2_DeviceID, extent2_Location, extent2_Length,
|
||||
extent3_DeviceID, extent3_Location, extent3_Length,
|
||||
extent4_DeviceID, extent4_Location, extent4_Length
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
print(f"✅ 成功插入 {cursor.rowcount} 条 {table_name} 记录")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例调用
|
||||
if __name__ == "__main__":
|
||||
InsertNodeDataToDb(db_path='../src/db_ntfs_info.db', table_name='db_node')
|
@@ -1,6 +1,49 @@
|
||||
import hashlib
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
|
||||
def get_file_times(full_path):
|
||||
"""
|
||||
获取文件的创建、修改、访问时间,并格式化为字符串。
|
||||
|
||||
参数:
|
||||
full_path: str,文件路径
|
||||
|
||||
返回:
|
||||
tuple: (create_time, modify_time, access_time, auth_time)
|
||||
"""
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
|
||||
# 转换为可读时间格式:YYYY-MM-DD HH:MM:SS
|
||||
def format_time(timestamp):
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
|
||||
|
||||
create_time = format_time(stat.st_ctime)
|
||||
modify_time = format_time(stat.st_mtime)
|
||||
access_time = format_time(stat.st_atime)
|
||||
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
|
||||
|
||||
return create_time, modify_time, access_time, auth_time
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取时间失败: {e}")
|
||||
return "unknown", "unknown", "unknown", "unknown"
|
||||
|
||||
|
||||
def get_file_mode(full_path):
|
||||
"""
|
||||
获取文件权限模式(Windows 下模拟)。
|
||||
可以根据只读、隐藏等属性扩展
|
||||
"""
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
|
||||
return "default"
|
||||
except Exception as e:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def GenerateHash(s: str) -> str:
|
||||
@@ -23,10 +66,37 @@ def ShouldSkipPath(path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def GetDirLayer(full_path: str, volume_letter: str) -> int:
|
||||
"""
|
||||
根据路径计算目录层级。
|
||||
|
||||
示例:
|
||||
Z:\demo.txt → 0
|
||||
Z:\folder\test.txt → 1
|
||||
Z:\folder\subfolder\file.txt → 2
|
||||
|
||||
参数:
|
||||
full_path: str,完整路径
|
||||
volume_letter: str,磁盘盘符(如 'Z')
|
||||
|
||||
返回:
|
||||
int,层级数
|
||||
"""
|
||||
root_prefix = f"{volume_letter.upper()}:\\"
|
||||
if not full_path.startswith(root_prefix):
|
||||
return -1 # 非法路径
|
||||
|
||||
relative_path = full_path[len(root_prefix):]
|
||||
if not relative_path:
|
||||
return 0 # 根目录层级为 0
|
||||
|
||||
return len(relative_path.split(os.sep)) - 1
|
||||
|
||||
|
||||
def ScanVolume(volume_letter: str):
|
||||
"""
|
||||
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
|
||||
并为每个节点分配 ParentID。
|
||||
并为每个节点分配 ParentID 和 DirLayer。
|
||||
|
||||
返回:
|
||||
list of dict:包含文件/目录信息的字典列表
|
||||
@@ -60,8 +130,6 @@ def ScanVolume(volume_letter: str):
|
||||
continue
|
||||
|
||||
name = entry
|
||||
|
||||
# ✅ 修正点:对 Path 字段进行哈希
|
||||
path_hash = GenerateHash(full_path)
|
||||
|
||||
# 计算 ContentSize(KB),小文件至少显示为 1 KB
|
||||
@@ -69,9 +137,13 @@ def ScanVolume(volume_letter: str):
|
||||
if content_size == 0 and bytes_size > 0:
|
||||
content_size = 1
|
||||
|
||||
# 获取父目录路径
|
||||
parent_path = os.path.dirname(full_path)
|
||||
parent_id = path_to_id.get(parent_path, 0) # 默认为 0(根目录可能未录入)
|
||||
parent_id = path_to_id.get(parent_path, 0)
|
||||
dir_layer = GetDirLayer(full_path, volume_letter)
|
||||
|
||||
# ✅ 获取文件时间属性
|
||||
ctime, mtime, atime, chgtime = get_file_times(full_path)
|
||||
mode = get_file_mode(full_path)
|
||||
|
||||
item = {
|
||||
"ID": counter,
|
||||
@@ -80,7 +152,13 @@ def ScanVolume(volume_letter: str):
|
||||
"PathHash": path_hash,
|
||||
"IsDir": is_dir,
|
||||
"ParentID": parent_id,
|
||||
"ContentSize": content_size
|
||||
"ContentSize": content_size,
|
||||
"DirLayer": dir_layer,
|
||||
"FileCreateTime": ctime,
|
||||
"FileModifyTime": mtime,
|
||||
"FileAccessTime": atime,
|
||||
"FileAuthTime": chgtime,
|
||||
"FileMode": mode
|
||||
}
|
||||
|
||||
result.append(item)
|
||||
@@ -95,7 +173,13 @@ def ScanVolume(volume_letter: str):
|
||||
|
||||
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
"""
|
||||
批量将扫描结果写入数据库。
|
||||
批量将扫描结果写入 NewDBPath 表中,支持新字段。
|
||||
|
||||
参数:
|
||||
data: list of dict,扫描结果数据
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,目标表名
|
||||
batch_size: int,每多少条提交一次
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
@@ -111,6 +195,12 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
DirLayer INTEGER NOT NULL,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileMode TEXT,
|
||||
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
@@ -120,8 +210,8 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
# 插入语句(忽略重复 PathHash)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name}
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
total_inserted = 0
|
||||
@@ -134,7 +224,13 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
item['PathHash'],
|
||||
item['IsDir'],
|
||||
item['ParentID'] or 0,
|
||||
item['ContentSize']
|
||||
item['ContentSize'],
|
||||
item['DirLayer'],
|
||||
item['FileCreateTime'],
|
||||
item['FileModifyTime'],
|
||||
item['FileAccessTime'],
|
||||
item['FileAuthTime'],
|
||||
item['FileMode']
|
||||
))
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
@@ -151,7 +247,7 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条数据")
|
||||
|
||||
print(f"✅ 总共插入 {total_inserted} 条记录到数据库。")
|
||||
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
@@ -171,7 +267,7 @@ def main():
|
||||
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
print("✅ 全盘扫描与入库完成")
|
||||
print("✅ 全盘扫描与 NewDBPath 表入库完成")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
224
ntfs_utils/mft_analyze.py
Normal file
224
ntfs_utils/mft_analyze.py
Normal file
@@ -0,0 +1,224 @@
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import pytsk3
|
||||
|
||||
from db_config import GetNTFSBootInfo
|
||||
|
||||
|
||||
def find_file_mft_entry(fs, target_path):
|
||||
"""
|
||||
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
|
||||
"""
|
||||
|
||||
def traverse_directory(inode, path_components):
|
||||
if not path_components:
|
||||
return inode
|
||||
|
||||
dir_name = path_components[0].lower()
|
||||
try:
|
||||
directory = fs.open_dir(inode=inode)
|
||||
except Exception as e:
|
||||
print(f"Error opening directory with inode {inode}: {e}")
|
||||
return None
|
||||
|
||||
for entry in directory:
|
||||
if not entry.info or not entry.info.name or not entry.info.meta:
|
||||
continue
|
||||
|
||||
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
|
||||
meta = entry.info.meta
|
||||
|
||||
# 匹配当前层级目录或文件名
|
||||
if name == dir_name:
|
||||
if len(path_components) == 1:
|
||||
# 是目标文件/目录
|
||||
return meta.addr
|
||||
|
||||
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
|
||||
# 继续深入查找子目录
|
||||
next_inode = entry.info.meta.addr
|
||||
result = traverse_directory(next_inode, path_components[1:])
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
# 拆分路径
|
||||
path_parts = target_path.strip("\\").lower().split("\\")
|
||||
root_inode = fs.info.root_inum # 根目录 MFT Entry
|
||||
return traverse_directory(root_inode, path_parts)
|
||||
|
||||
|
||||
def GetFileMftEntry(file_path):
|
||||
"""
|
||||
获取指定文件在 NTFS 中的 MFT Entry 编号
|
||||
"""
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
# 获取驱动器字母
|
||||
drive_letter = os.path.splitdrive(file_path)[0][0]
|
||||
device = f"\\\\.\\{drive_letter}:"
|
||||
|
||||
print(f"Opening device: {device}")
|
||||
|
||||
try:
|
||||
img = pytsk3.Img_Info(device)
|
||||
fs = pytsk3.FS_Info(img)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to open device '{device}': {e}")
|
||||
|
||||
# 构建相对路径
|
||||
abs_path = os.path.abspath(file_path)
|
||||
root_path = f"{drive_letter}:\\"
|
||||
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
|
||||
|
||||
print(f"Looking up MFT entry for: {rel_path}")
|
||||
|
||||
mft_entry = find_file_mft_entry(fs, rel_path)
|
||||
if mft_entry is None:
|
||||
raise RuntimeError("Could not find MFT entry for the specified file.")
|
||||
|
||||
return mft_entry
|
||||
|
||||
|
||||
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
|
||||
"""
|
||||
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
|
||||
|
||||
参数:
|
||||
mft_entry (int): 文件的 MFT Entry 编号(即 inode)
|
||||
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
|
||||
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
|
||||
bytes_per_sector (int): 每扇区字节数,默认 512
|
||||
|
||||
返回:
|
||||
int: 文件 MFT Entry 的起始扇区号
|
||||
"""
|
||||
if mft_entry < 0:
|
||||
raise ValueError("MFT Entry 编号不能为负数")
|
||||
|
||||
# 获取 NTFS 引导信息
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
# 计算文件 MFT Entry 的起始扇区号
|
||||
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
|
||||
|
||||
return start_sector
|
||||
|
||||
|
||||
def Get80hPattern(sector_number, volume_letter="Z"):
|
||||
"""
|
||||
读取NTFS扇区并查找特定模式的数据
|
||||
|
||||
参数:
|
||||
sector_number (int): 要读取的扇区号
|
||||
drive_path (str): 磁盘设备路径,默认为Z盘
|
||||
|
||||
返回:
|
||||
list: 包含所有匹配信息的列表,每个元素为:
|
||||
{
|
||||
'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512),
|
||||
'offset': 当前80属性在扇区内的偏移位置,
|
||||
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."),
|
||||
'is_resident': 是否为常驻属性,
|
||||
'total_groups': 实际读取的组数,
|
||||
'attribute_length': 属性总长度(字节)
|
||||
}
|
||||
"""
|
||||
drive_path = fr"\\.\{volume_letter}:"
|
||||
SECTOR_SIZE = 512
|
||||
GROUP_SIZE = 8 # 每组8字节
|
||||
MATCH_BYTE = 0x80 # 要匹配的起始字节
|
||||
results = []
|
||||
|
||||
try:
|
||||
with open(drive_path, 'rb') as disk:
|
||||
disk.seek(sector_number * SECTOR_SIZE)
|
||||
sector_data = disk.read(SECTOR_SIZE)
|
||||
|
||||
if not sector_data or len(sector_data) < GROUP_SIZE:
|
||||
print(f"错误: 无法读取扇区 {sector_number}")
|
||||
return results
|
||||
|
||||
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
|
||||
|
||||
for i in range(len(groups)):
|
||||
current_group = groups[i]
|
||||
|
||||
if len(current_group) < GROUP_SIZE:
|
||||
continue
|
||||
|
||||
if current_group[0] == MATCH_BYTE:
|
||||
# 获取第5~8字节作为属性长度(小端DWORD)
|
||||
if i + 1 >= len(groups):
|
||||
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||||
continue
|
||||
|
||||
attribute_length_bytes = b''.join([
|
||||
groups[i][4:8], # 第一组的4~7字节
|
||||
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
|
||||
])
|
||||
|
||||
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
|
||||
|
||||
# 计算要读取的组数(向上取整到8字节)
|
||||
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
|
||||
|
||||
end_idx = i + total_groups
|
||||
if end_idx > len(groups):
|
||||
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||||
continue
|
||||
|
||||
raw_sequence = groups[i:end_idx]
|
||||
|
||||
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
|
||||
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
|
||||
|
||||
# 判断是否为常驻属性(查看第2个组第一个字节最低位)
|
||||
is_resident = False
|
||||
if len(raw_sequence) >= 2:
|
||||
second_group = raw_sequence[1]
|
||||
is_resident = (second_group[0] & 0x01) == 0x00
|
||||
|
||||
result_entry = {
|
||||
'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置
|
||||
'offset': i * GROUP_SIZE,
|
||||
'sequence': formatted_sequence,
|
||||
'is_resident': is_resident,
|
||||
'total_groups': total_groups,
|
||||
'attribute_length': attribute_length
|
||||
}
|
||||
|
||||
results.append(result_entry)
|
||||
|
||||
# resident_str = "常驻" if is_resident else "非常驻"
|
||||
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
|
||||
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
|
||||
# for j, group in enumerate(formatted_sequence):
|
||||
# print(f"组 {j + 1}: {group}")
|
||||
#
|
||||
# print(f"\n共找到 {len(results)} 个匹配序列")
|
||||
|
||||
return results
|
||||
|
||||
except PermissionError:
|
||||
print("错误: 需要管理员权限访问磁盘设备")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def GetFile80hPattern(file_path):
|
||||
volume_letter = file_path.split(':')[0]
|
||||
try:
|
||||
mft_entry_value = GetFileMftEntry(file_path)
|
||||
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
|
||||
print(Get80hPattern(StartSector, volume_letter))
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
GetFile80hPattern(r"Z:\demo.jpg")
|
@@ -6,4 +6,5 @@ readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"psutil>=7.0.0",
|
||||
"pytsk3>=20250312",
|
||||
]
|
||||
|
Binary file not shown.
12
uv.lock
generated
12
uv.lock
generated
@@ -8,10 +8,14 @@ version = "0.1.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "psutil" },
|
||||
{ name = "pytsk3" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [{ name = "psutil", specifier = ">=7.0.0" }]
|
||||
requires-dist = [
|
||||
{ name = "psutil", specifier = ">=7.0.0" },
|
||||
{ name = "pytsk3", specifier = ">=20250312" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
@@ -27,3 +31,9 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytsk3"
|
||||
version = "20250312"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/59/3f/2d440547eecca1786c2178a3e010e7fb61da1f0468d9809ff2b5b8fbb39b/pytsk3-20250312.tar.gz", hash = "sha256:bb47d4aa5976adbc8d4350bed719b771c548139bc8efe761e1d081aa99074c1b", size = 5274913, upload-time = "2025-03-12T05:49:14.937Z" }
|
||||
|
Reference in New Issue
Block a user