275 lines
8.2 KiB
Python
275 lines
8.2 KiB
Python
import hashlib
|
||
import os
|
||
import sqlite3
|
||
import time
|
||
|
||
|
||
def get_file_times(full_path):
|
||
"""
|
||
获取文件的创建、修改、访问时间,并格式化为字符串。
|
||
|
||
参数:
|
||
full_path: str,文件路径
|
||
|
||
返回:
|
||
tuple: (create_time, modify_time, access_time, auth_time)
|
||
"""
|
||
try:
|
||
stat = os.stat(full_path)
|
||
|
||
# 转换为可读时间格式:YYYY-MM-DD HH:MM:SS
|
||
def format_time(timestamp):
|
||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
|
||
|
||
create_time = format_time(stat.st_ctime)
|
||
modify_time = format_time(stat.st_mtime)
|
||
access_time = format_time(stat.st_atime)
|
||
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
|
||
|
||
return create_time, modify_time, access_time, auth_time
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ 获取时间失败: {e}")
|
||
return "unknown", "unknown", "unknown", "unknown"
|
||
|
||
|
||
def get_file_mode(full_path):
|
||
"""
|
||
获取文件权限模式(Windows 下模拟)。
|
||
可以根据只读、隐藏等属性扩展
|
||
"""
|
||
try:
|
||
stat = os.stat(full_path)
|
||
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
|
||
return "default"
|
||
except Exception as e:
|
||
return "unknown"
|
||
|
||
|
||
def GenerateHash(s: str) -> str:
|
||
"""
|
||
对输入字符串生成 SHA-256 哈希值。
|
||
用于唯一标识一个路径(PathHash)。
|
||
"""
|
||
return hashlib.sha256(s.encode('utf-8')).hexdigest()
|
||
|
||
|
||
def ShouldSkipPath(path: str) -> bool:
|
||
"""
|
||
判断是否应跳过该路径(NTFS元文件或系统文件夹)。
|
||
"""
|
||
name = os.path.basename(path)
|
||
if name.startswith('$'):
|
||
return True
|
||
if name == "System Volume Information":
|
||
return True
|
||
return False
|
||
|
||
|
||
def GetDirLayer(full_path: str, volume_letter: str) -> int:
|
||
"""
|
||
根据路径计算目录层级。
|
||
|
||
示例:
|
||
Z:\demo.txt → 0
|
||
Z:\folder\test.txt → 1
|
||
Z:\folder\subfolder\file.txt → 2
|
||
|
||
参数:
|
||
full_path: str,完整路径
|
||
volume_letter: str,磁盘盘符(如 'Z')
|
||
|
||
返回:
|
||
int,层级数
|
||
"""
|
||
root_prefix = f"{volume_letter.upper()}:\\"
|
||
if not full_path.startswith(root_prefix):
|
||
return -1 # 非法路径
|
||
|
||
relative_path = full_path[len(root_prefix):]
|
||
if not relative_path:
|
||
return 0 # 根目录层级为 0
|
||
|
||
return len(relative_path.split(os.sep)) - 1
|
||
|
||
|
||
def ScanVolume(volume_letter: str):
|
||
"""
|
||
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
|
||
并为每个节点分配 ParentID 和 DirLayer。
|
||
|
||
返回:
|
||
list of dict:包含文件/目录信息的字典列表
|
||
"""
|
||
root_path = f"{volume_letter.upper()}:\\"
|
||
if not os.path.exists(root_path):
|
||
raise ValueError(f"磁盘 {root_path} 不存在")
|
||
|
||
result = []
|
||
path_to_id = {} # 用于记录路径到数据库 ID 的映射
|
||
counter = 1 # 模拟数据库自增 ID
|
||
|
||
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
|
||
# 过滤掉需要跳过的目录
|
||
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
|
||
|
||
for entry in files + dirs:
|
||
full_path = os.path.join(root, entry)
|
||
|
||
if ShouldSkipPath(full_path):
|
||
continue
|
||
|
||
try:
|
||
if os.path.isdir(full_path):
|
||
is_dir = 1
|
||
bytes_size = 0
|
||
elif os.path.isfile(full_path):
|
||
is_dir = 0
|
||
bytes_size = os.path.getsize(full_path)
|
||
else:
|
||
continue
|
||
|
||
name = entry
|
||
path_hash = GenerateHash(full_path)
|
||
|
||
# 计算 ContentSize(KB),小文件至少显示为 1 KB
|
||
content_size = bytes_size // 1024
|
||
if content_size == 0 and bytes_size > 0:
|
||
content_size = 1
|
||
|
||
parent_path = os.path.dirname(full_path)
|
||
parent_id = path_to_id.get(parent_path, 0)
|
||
dir_layer = GetDirLayer(full_path, volume_letter)
|
||
|
||
# ✅ 获取文件时间属性
|
||
ctime, mtime, atime, chgtime = get_file_times(full_path)
|
||
mode = get_file_mode(full_path)
|
||
|
||
item = {
|
||
"ID": counter,
|
||
"Path": full_path,
|
||
"Name": name,
|
||
"PathHash": path_hash,
|
||
"IsDir": is_dir,
|
||
"ParentID": parent_id,
|
||
"ContentSize": content_size,
|
||
"DirLayer": dir_layer,
|
||
"FileCreateTime": ctime,
|
||
"FileModifyTime": mtime,
|
||
"FileAccessTime": atime,
|
||
"FileAuthTime": chgtime,
|
||
"FileMode": mode
|
||
}
|
||
|
||
result.append(item)
|
||
path_to_id[full_path] = counter
|
||
counter += 1
|
||
|
||
except Exception as e:
|
||
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
|
||
|
||
return result
|
||
|
||
|
||
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||
"""
|
||
批量将扫描结果写入 NewDBPath 表中,支持新字段。
|
||
|
||
参数:
|
||
data: list of dict,扫描结果数据
|
||
db_path: str,SQLite 数据库路径
|
||
table_name: str,目标表名
|
||
batch_size: int,每多少条提交一次
|
||
"""
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 创建表(如果不存在)
|
||
create_table_sql = f"""
|
||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
Path TEXT NOT NULL,
|
||
Name TEXT NOT NULL,
|
||
PathHash TEXT UNIQUE NOT NULL,
|
||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||
ParentID INTEGER,
|
||
ContentSize INTEGER,
|
||
DirLayer INTEGER NOT NULL,
|
||
FileCreateTime TEXT,
|
||
FileModifyTime TEXT,
|
||
FileAccessTime TEXT,
|
||
FileAuthTime TEXT,
|
||
FileMode TEXT,
|
||
|
||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||
);
|
||
"""
|
||
cursor.execute(create_table_sql)
|
||
|
||
# 插入语句(忽略重复 PathHash)
|
||
insert_sql = f"""
|
||
INSERT OR IGNORE INTO {table_name}
|
||
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
"""
|
||
|
||
total_inserted = 0
|
||
batch = []
|
||
|
||
for item in data:
|
||
batch.append((
|
||
item['Path'],
|
||
item['Name'],
|
||
item['PathHash'],
|
||
item['IsDir'],
|
||
item['ParentID'] or 0,
|
||
item['ContentSize'],
|
||
item['DirLayer'],
|
||
item['FileCreateTime'],
|
||
item['FileModifyTime'],
|
||
item['FileAccessTime'],
|
||
item['FileAuthTime'],
|
||
item['FileMode']
|
||
))
|
||
|
||
if len(batch) >= batch_size:
|
||
cursor.executemany(insert_sql, batch)
|
||
conn.commit()
|
||
total_inserted += cursor.rowcount
|
||
print(f"✅ 提交一批 {len(batch)} 条数据")
|
||
batch.clear()
|
||
|
||
# 插入剩余数据
|
||
if batch:
|
||
cursor.executemany(insert_sql, batch)
|
||
conn.commit()
|
||
total_inserted += cursor.rowcount
|
||
print(f"✅ 提交最后一批 {len(batch)} 条数据")
|
||
|
||
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表。")
|
||
|
||
except Exception as e:
|
||
print(f"❌ 插入失败: {e}")
|
||
conn.rollback()
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
# 示例主函数
|
||
def main():
|
||
volume_letter = "Z"
|
||
|
||
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
|
||
scanned_data = ScanVolume(volume_letter)
|
||
|
||
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
|
||
InsertPathDataToDB(scanned_data)
|
||
|
||
print("✅ 全盘扫描与 NewDBPath 表入库完成")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|