reanalyze db_path schema

This commit is contained in:
Burgess Leo
2025-05-15 18:04:59 +08:00
parent b1e86f345f
commit 7d21842287
3 changed files with 92 additions and 50 deletions

View File

@@ -86,9 +86,9 @@ def CreateDBDeviceTable(db_path='../src/db_ntfs_info.db', table_name='db_device'
def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建节点信息
创建 NewDBNode 表,用于存储文件的具体属性和物理分布信息。
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -100,28 +100,18 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
# 连接到SQLite数据库如果文件不存在会自动创建
conn = sqlite3.connect(db_path)
# 创建一个游标对象
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
cursor = conn.cursor()
# 动态构建创建表的SQL语句
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
PathID INTEGER,
ParentID INTEGER,
NameHash TEXT,
PathHash TEXT,
PathID INTEGER NOT NULL,
ExtendNameID INTEGER,
DirLayer INTEGER,
GroupID INTEGER,
UserID INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileSize INTEGER,
FileMode INTEGER,
FileHash TEXT,
ExtentCount INTEGER,
extent1_DeviceID INTEGER,
@@ -137,21 +127,17 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
extent4_Location INTEGER,
extent4_Length INTEGER,
-- 外键约束(可选)
FOREIGN KEY(PathID) REFERENCES path_table(ID),
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
FOREIGN KEY(GroupID) REFERENCES groups(ID),
FOREIGN KEY(UserID) REFERENCES users(ID)
-- 外键约束
FOREIGN KEY(PathID) REFERENCES NewDBPath(ID),
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
FOREIGN KEY(UserID) REFERENCES db_user(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
# 关闭连接
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
@@ -280,12 +266,11 @@ def CreateDBExtendSnippetTable(db_path='../src/db_ntfs_info.db', table_name='db_
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建路径信息表,
包含 DeviceID 字段,用于标记文件所属设备(磁盘)。
创建 NewDBPath 表,用于存储文件/目录的路径信息
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -295,39 +280,42 @@ def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
if directory and not os.path.exists(directory):
os.makedirs(directory)
# 连接到SQLite数据库如果文件不存在会自动创建)
# 连接到SQLite数据库如果不存在会自动创建
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
cursor = conn.cursor()
# 动态构建创建表的SQL语句(包含 DeviceID 外键)
# 动态构建创建表的SQL语句
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
-- DeviceID TEXT NOT NULL,
Path TEXT NOT NULL,
Name TEXT NOT NULL,
DirLayer INTEGER NOT NULL,
PathHash TEXT UNIQUE NOT NULL,
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
-- 外键约束
-- FOREIGN KEY(DeviceID) REFERENCES db_device(ID),
-- 外键约束(可选)
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBExtendNameTable(db_path='../src/db_extend_name.db', table_name='db_extend_name'):
def CreateDBExtendNameTable(db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建扩展名表。

View File

@@ -23,10 +23,37 @@ def ShouldSkipPath(path: str) -> bool:
return False
def GetDirLayer(full_path: str, volume_letter: str) -> int:
"""
根据路径计算目录层级。
示例:
Z:\demo.txt → 0
Z:\folder\test.txt → 1
Z:\folder\subfolder\file.txt → 2
参数:
full_path: str完整路径
volume_letter: str磁盘盘符'Z'
返回:
int层级数
"""
root_prefix = f"{volume_letter.upper()}:\\"
if not full_path.startswith(root_prefix):
return -1 # 非法路径
relative_path = full_path[len(root_prefix):]
if not relative_path:
return 0 # 根目录层级为 0
return len(relative_path.split(os.sep)) - 1
def ScanVolume(volume_letter: str):
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID。
并为每个节点分配 ParentID 和 DirLayer
返回:
list of dict包含文件/目录信息的字典列表
@@ -36,7 +63,7 @@ def ScanVolume(volume_letter: str):
raise ValueError(f"磁盘 {root_path} 不存在")
result = []
path_to_id = {} # 用于记录路径到数据库 ID 的映射
path_to_id = {} # 用于记录路径到 ID 的映射
counter = 1 # 模拟数据库自增 ID
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
@@ -61,17 +88,20 @@ def ScanVolume(volume_letter: str):
name = entry
# ✅ 修正点:对 Path 字段进行哈希
# ✅ 对 Path 字段进行哈希
path_hash = GenerateHash(full_path)
# 计算 ContentSizeKB小文件至少显示为 1 KB
# 计算 ContentSizeKB小文件至少显示为 1 KB
content_size = bytes_size // 1024
if content_size == 0 and bytes_size > 0:
content_size = 1
# 获取父目录路径
# 获取父目录路径
parent_path = os.path.dirname(full_path)
parent_id = path_to_id.get(parent_path, 0) # 默认为 0根目录可能未录入
parent_id = path_to_id.get(parent_path, 0)
# ✅ 计算 DirLayer目录层级
dir_layer = GetDirLayer(full_path, volume_letter)
item = {
"ID": counter,
@@ -80,7 +110,13 @@ def ScanVolume(volume_letter: str):
"PathHash": path_hash,
"IsDir": is_dir,
"ParentID": parent_id,
"ContentSize": content_size
"ContentSize": content_size,
"DirLayer": dir_layer,
"FileCreateTime": "default",
"FileModifyTime": "default",
"FileAccessTime": "default",
"FileAuthTime": "default",
"FileMode": "default"
}
result.append(item)
@@ -93,9 +129,15 @@ def ScanVolume(volume_letter: str):
return result
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
def InsertNewDBPathToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
批量将扫描结果写入数据库
批量将扫描结果写入 NewDBPath 表中,支持新字段
参数:
data: list of dict扫描结果数据
db_path: strSQLite 数据库路径
table_name: str目标表名
batch_size: int每多少条提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
@@ -111,6 +153,12 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
DirLayer INTEGER NOT NULL,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
@@ -120,8 +168,8 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
# 插入语句(忽略重复 PathHash
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
VALUES (?, ?, ?, ?, ?, ?)
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
total_inserted = 0
@@ -134,7 +182,13 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
item['PathHash'],
item['IsDir'],
item['ParentID'] or 0,
item['ContentSize']
item['ContentSize'],
item['DirLayer'],
item['FileCreateTime'],
item['FileModifyTime'],
item['FileAccessTime'],
item['FileAuthTime'],
item['FileMode']
))
if len(batch) >= batch_size:
@@ -151,7 +205,7 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
total_inserted += cursor.rowcount
print(f"✅ 提交最后一批 {len(batch)} 条数据")
print(f"✅ 总共插入 {total_inserted} 条记录到数据库")
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表")
except Exception as e:
print(f"❌ 插入失败: {e}")
@@ -169,9 +223,9 @@ def main():
scanned_data = ScanVolume(volume_letter)
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
InsertPathDataToDB(scanned_data)
InsertNewDBPathToDB(scanned_data)
print("✅ 全盘扫描与入库完成")
print("✅ 全盘扫描与 NewDBPath 表入库完成")
if __name__ == "__main__":

Binary file not shown.