Files
fastcopy/ntfs_utils/db_path.py
2025-05-23 18:01:42 +08:00

183 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import sqlite3
def GenerateHash(s: str) -> str:
"""
对输入字符串生成 SHA-256 哈希值。
用于唯一标识一个路径PathHash
"""
return hashlib.sha256(s.encode('utf-8')).hexdigest()
def ShouldSkipPath(path: str) -> bool:
"""
判断是否应跳过该路径NTFS元文件或系统文件夹
"""
name = os.path.basename(path)
if name.startswith('$'):
return True
if name == "System Volume Information":
return True
return False
def ScanVolume(volume_letter: str) -> list:
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID。
"""
root_path = f"{volume_letter.upper()}:\\"
if not os.path.exists(root_path):
raise ValueError(f"磁盘 {root_path} 不存在")
path_to_id = {} # 路径 -> ID 映射
counter = 1
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
# 过滤掉需要跳过的目录
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
entries = files + dirs
for entry in entries:
full_path = os.path.join(root, entry)
if ShouldSkipPath(full_path):
continue
try:
if os.path.isdir(full_path):
is_dir = 1
bytes_size = 0
elif os.path.isfile(full_path):
is_dir = 0
bytes_size = os.path.getsize(full_path)
else:
continue
name = entry
# 分离盘符并处理路径格式
_, relative_path = os.path.splitdrive(full_path)
relative_path = relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(full_path) and not relative_path.endswith("/"):
relative_path += "/"
relative_path = relative_path.replace("\\", "/")
path_hash = GenerateHash(relative_path)
content_size = bytes_size // 1024
if content_size == 0 and bytes_size > 0:
content_size = 1
parent_path = os.path.dirname(full_path)
_, parent_relative_path = os.path.splitdrive(parent_path)
parent_relative_path = parent_relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(parent_path) and not parent_relative_path.endswith("/"):
parent_relative_path += "/"
parent_relative_path = parent_relative_path.replace("\\", "/")
parent_id = path_to_id.get(parent_relative_path, 0)
item = {
"ID": counter,
"Path": relative_path,
"Name": name,
"PathHash": path_hash,
"IsDir": is_dir,
"ParentID": parent_id,
"ContentSize": content_size
}
yield item # 使用 yield 返回每条记录
path_to_id[relative_path] = counter
counter += 1
except Exception as e:
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
流式写入数据库,边扫描边入库。
:param data_generator: 可迭代对象(如生成器)
:param db_path: 数据库路径
:param table_name: 表名
:param batch_size: 每多少条记录提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
Path TEXT NOT NULL,
Name TEXT NOT NULL,
PathHash TEXT UNIQUE NOT NULL,
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
cursor.execute(create_table_sql)
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
VALUES (?, ?, ?, ?, ?, ?)
"""
batch = []
for item in data_generator:
batch.append((
item['Path'],
item['Name'],
item['PathHash'],
item['IsDir'],
item['ParentID'] or 0,
item['ContentSize']
))
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 提交一批 {len(batch)} 条数据")
batch.clear()
# 提交剩余不足一批的数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 提交最后一批 {len(batch)} 条数据")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
# 示例主函数
def DBPathMain(volume_letter: str):
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
# 获取生成器对象
generator = ScanVolume(volume_letter)
print(f"📊 开始逐批入库...")
InsertPathDataToDB(generator)
print("✅ 全盘扫描与入库完成")
if __name__ == "__main__":
DBPathMain(volume_letter="Y")