Files
fastcopy/ntfs_utils/db_path.py
2025-05-16 17:45:35 +08:00

275 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import os
import sqlite3
import time
def get_file_times(full_path):
"""
获取文件的创建、修改、访问时间,并格式化为字符串。
参数:
full_path: str文件路径
返回:
tuple: (create_time, modify_time, access_time, auth_time)
"""
try:
stat = os.stat(full_path)
# 转换为可读时间格式YYYY-MM-DD HH:MM:SS
def format_time(timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
create_time = format_time(stat.st_ctime)
modify_time = format_time(stat.st_mtime)
access_time = format_time(stat.st_atime)
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
return create_time, modify_time, access_time, auth_time
except Exception as e:
print(f"⚠️ 获取时间失败: {e}")
return "unknown", "unknown", "unknown", "unknown"
def get_file_mode(full_path):
"""
获取文件权限模式Windows 下模拟)。
可以根据只读、隐藏等属性扩展
"""
try:
stat = os.stat(full_path)
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
return "default"
except Exception as e:
return "unknown"
def GenerateHash(s: str) -> str:
"""
对输入字符串生成 SHA-256 哈希值。
用于唯一标识一个路径PathHash
"""
return hashlib.sha256(s.encode('utf-8')).hexdigest()
def ShouldSkipPath(path: str) -> bool:
"""
判断是否应跳过该路径NTFS元文件或系统文件夹
"""
name = os.path.basename(path)
if name.startswith('$'):
return True
if name == "System Volume Information":
return True
return False
def GetDirLayer(full_path: str, volume_letter: str) -> int:
"""
根据路径计算目录层级。
示例:
Z:\demo.txt → 0
Z:\folder\test.txt → 1
Z:\folder\subfolder\file.txt → 2
参数:
full_path: str完整路径
volume_letter: str磁盘盘符'Z'
返回:
int层级数
"""
root_prefix = f"{volume_letter.upper()}:\\"
if not full_path.startswith(root_prefix):
return -1 # 非法路径
relative_path = full_path[len(root_prefix):]
if not relative_path:
return 0 # 根目录层级为 0
return len(relative_path.split(os.sep)) - 1
def ScanVolume(volume_letter: str):
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID 和 DirLayer。
返回:
list of dict包含文件/目录信息的字典列表
"""
root_path = f"{volume_letter.upper()}:\\"
if not os.path.exists(root_path):
raise ValueError(f"磁盘 {root_path} 不存在")
result = []
path_to_id = {} # 用于记录路径到数据库 ID 的映射
counter = 1 # 模拟数据库自增 ID
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
# 过滤掉需要跳过的目录
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
for entry in files + dirs:
full_path = os.path.join(root, entry)
if ShouldSkipPath(full_path):
continue
try:
if os.path.isdir(full_path):
is_dir = 1
bytes_size = 0
elif os.path.isfile(full_path):
is_dir = 0
bytes_size = os.path.getsize(full_path)
else:
continue
name = entry
path_hash = GenerateHash(full_path)
# 计算 ContentSizeKB小文件至少显示为 1 KB
content_size = bytes_size // 1024
if content_size == 0 and bytes_size > 0:
content_size = 1
parent_path = os.path.dirname(full_path)
parent_id = path_to_id.get(parent_path, 0)
dir_layer = GetDirLayer(full_path, volume_letter)
# ✅ 获取文件时间属性
ctime, mtime, atime, chgtime = get_file_times(full_path)
mode = get_file_mode(full_path)
item = {
"ID": counter,
"Path": full_path,
"Name": name,
"PathHash": path_hash,
"IsDir": is_dir,
"ParentID": parent_id,
"ContentSize": content_size,
"DirLayer": dir_layer,
"FileCreateTime": ctime,
"FileModifyTime": mtime,
"FileAccessTime": atime,
"FileAuthTime": chgtime,
"FileMode": mode
}
result.append(item)
path_to_id[full_path] = counter
counter += 1
except Exception as e:
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
return result
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
批量将扫描结果写入 NewDBPath 表中,支持新字段。
参数:
data: list of dict扫描结果数据
db_path: strSQLite 数据库路径
table_name: str目标表名
batch_size: int每多少条提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 创建表(如果不存在)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
Path TEXT NOT NULL,
Name TEXT NOT NULL,
PathHash TEXT UNIQUE NOT NULL,
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
DirLayer INTEGER NOT NULL,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
cursor.execute(create_table_sql)
# 插入语句(忽略重复 PathHash
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
total_inserted = 0
batch = []
for item in data:
batch.append((
item['Path'],
item['Name'],
item['PathHash'],
item['IsDir'],
item['ParentID'] or 0,
item['ContentSize'],
item['DirLayer'],
item['FileCreateTime'],
item['FileModifyTime'],
item['FileAccessTime'],
item['FileAuthTime'],
item['FileMode']
))
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交一批 {len(batch)} 条数据")
batch.clear()
# 插入剩余数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交最后一批 {len(batch)} 条数据")
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表。")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
# 示例主函数
def main():
volume_letter = "Z"
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
scanned_data = ScanVolume(volume_letter)
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
InsertPathDataToDB(scanned_data)
print("✅ 全盘扫描与 NewDBPath 表入库完成")
if __name__ == "__main__":
main()