Files
fastcopy/ntfs_utils/db_node.py
Burgess Leo e167ff5d9f temp store
2025-05-19 10:21:12 +08:00

151 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import hashlib
import random
import sqlite3
# 工具函数:获取文件扩展名
def GetFileExtension(name: str) -> str:
parts = name.rsplit('.', 1)
return parts[1].lower() if len(parts) > 1 else ""
# 获取 ExtendNameID基于文件名后缀
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
ext = GetFileExtension(name)
if not ext:
return 0
cursor.execute("SELECT ID FROM db_extend_name WHERE ExtendName = ?", (ext,))
result = cursor.fetchone()
return result[0] if result else 0
# 获取 DirLayer路径层级
def GetDirLayer(path: str) -> int:
# 示例Z:\demo.jpg → 层级为0Z:\pictures\RHCE.jpg → 层级为1
path = path.strip().strip("\\")
return path.count("\\")
# 获取 GroupID默认第一个
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# 获取 UserID默认第一个
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# 获取设备IDdb_device第一条记录
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# 获取文件大小(伪数据)
def GetFileSize(full_path: str) -> int:
return random.randint(100, 999)
# 获取文件内容哈希(伪数据)
def GetFileHash(full_path: str) -> str:
return hashlib.sha256(full_path.encode()).hexdigest()
# 获取分片数1~4
def GetExtentCount(full_path: str) -> int:
return random.randint(1, 4)
# 获取随机位置
def GetRandomLocation() -> int:
return random.randint(1000, 9999)
# 获取随机长度
def GetRandomLength() -> int:
return random.randint(1000, 9999)
# 主函数:将 db_path 数据导入 db_node
def MigratePathToNode(db_path='../src/db_ntfs_info.db'):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
group_id = GetFirstGroupId(cursor)
user_id = GetFirstUserId(cursor)
device_id = GetDeviceId(cursor)
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
rows = cursor.fetchall()
for row in rows:
path_id, full_path, name, parent_id = row
# 计算字段
name_hash = hashlib.sha256(name.encode()).hexdigest()
dir_layer = GetDirLayer(full_path)
extend_name_id = GetExtendNameId(name, cursor)
file_size = GetFileSize(full_path)
file_hash = GetFileHash(full_path)
extent_count = GetExtentCount(full_path)
# 构建插入语句字段和参数
fields = [
'PathID', 'ParentID', 'NameHash', 'PathHash',
'ExtendNameID', 'DirLayer', 'GroupID', 'UserID',
'FileSize', 'FileMode', 'FileHash', 'ExtentCount'
]
values = [
path_id, parent_id, name_hash, '', # PathHash 待填
extend_name_id, dir_layer, group_id, user_id,
file_size, 'default', file_hash, extent_count
]
# 查询 PathHash与 db_path.PathHash 一致)
cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,))
path_hash_result = cursor.fetchone()
path_hash = path_hash_result[0] if path_hash_result else ""
values[3] = path_hash # 替换 PathHash
# 处理 Extent 片段字段
extent_data = []
for i in range(1, 5):
if i <= extent_count:
location = GetRandomLocation()
length = GetRandomLength()
extent_data.extend([device_id, location, length])
else:
extent_data.extend([None, None, None])
# 拼接字段和值
extent_fields = [
"extent1_DeviceID", "extent1_Location", "extent1_Length",
"extent2_DeviceID", "extent2_Location", "extent2_Length",
"extent3_DeviceID", "extent3_Location", "extent3_Length",
"extent4_DeviceID", "extent4_Location", "extent4_Length"
]
fields += extent_fields
values += extent_data
# 构建 SQL 插入语句
placeholders = ', '.join('?' * len(values))
insert_sql = f"INSERT INTO db_node ({', '.join(fields)}) VALUES ({placeholders})"
# 执行插入
cursor.execute(insert_sql, values)
conn.commit()
conn.close()
print("✅ db_path 数据已成功迁移到 db_node 表")
if __name__ == '__main__':
MigratePathToNode()