init database expect db_node
This commit is contained in:
45
ntfs_utils/__init__.py
Normal file
45
ntfs_utils/__init__.py
Normal file
@@ -0,0 +1,45 @@
|
||||
from db_config import GetNTFSBootInfo, InsertInfoToDBConfig
|
||||
from db_device import ScanSpecialVolumes, InsertVolumesToDB
|
||||
from db_extend_name import InsertExtensionsToDB
|
||||
from db_group import InsertGroupToDB
|
||||
from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB
|
||||
from db_user import InsertUserToDB
|
||||
|
||||
|
||||
def main():
|
||||
volume_letter = 'Z'
|
||||
|
||||
# 初始化 db_config 表
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
InsertInfoToDBConfig(config_data)
|
||||
|
||||
# 初始化 db_device 表
|
||||
device_data = ScanSpecialVolumes(volume_letter)
|
||||
InsertVolumesToDB([device_data])
|
||||
|
||||
# 初始化 db_user 表
|
||||
user_list = ["Copier"]
|
||||
InsertUserToDB(user_list)
|
||||
|
||||
# 初始化 db_group 表
|
||||
group_name_list = ["Copier"]
|
||||
InsertGroupToDB(group_name_list)
|
||||
|
||||
# 初始化 db_path 表
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
# 初始化 db_extend_name 表
|
||||
common_extensions = [
|
||||
"txt", "log", "csv", "xls", "xlsx", "doc", "docx",
|
||||
"ppt", "pptx", "pdf", "jpg", "jpeg", "png", "gif",
|
||||
"bmp", "mp3", "wav", "mp4", "avi", "mkv", "mov",
|
||||
"exe", "dll", "bat", "ini", "reg", "zip", "rar", "7z",
|
||||
"json", "xml", "html", "css", "js", "py", "java", "cpp"
|
||||
]
|
||||
count = InsertExtensionsToDB(common_extensions)
|
||||
print(f"共插入 {count} 个新扩展名。")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
128
ntfs_utils/db_config.py
Normal file
128
ntfs_utils/db_config.py
Normal file
@@ -0,0 +1,128 @@
|
||||
import ctypes
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GetNTFSBootInfo(volume_letter):
|
||||
"""
|
||||
从指定 NTFS 卷的 $Boot 元文件中提取:
|
||||
- Bytes per sector
|
||||
- Sectors per cluster
|
||||
- Cluster size (bytes)
|
||||
|
||||
参数:
|
||||
volume_letter: 卷标字符串,例如 'C'
|
||||
|
||||
返回:
|
||||
dict 包含上述信息
|
||||
"""
|
||||
|
||||
# 构造设备路径,格式为 \\.\C:
|
||||
device_path = f"\\\\.\\{volume_letter}:"
|
||||
|
||||
# 打开卷设备(需要管理员权限)
|
||||
handle = ctypes.windll.kernel32.CreateFileW(
|
||||
device_path,
|
||||
0x80000000, # GENERIC_READ
|
||||
0x00000001 | 0x00000002, # FILE_SHARE_READ | FILE_SHARE_WRITE
|
||||
None,
|
||||
3, # OPEN_EXISTING
|
||||
0,
|
||||
None
|
||||
)
|
||||
|
||||
if handle == -1:
|
||||
raise PermissionError(f"无法打开卷 {volume_letter},请以管理员身份运行。")
|
||||
|
||||
try:
|
||||
buffer = bytearray(512)
|
||||
buffer_address = (ctypes.c_byte * len(buffer)).from_buffer(buffer)
|
||||
bytes_read = ctypes.c_ulong(0)
|
||||
|
||||
# 读取第一个扇区(BPB / $Boot 扇区)
|
||||
success = ctypes.windll.kernel32.ReadFile(
|
||||
handle,
|
||||
buffer_address,
|
||||
len(buffer),
|
||||
ctypes.byref(bytes_read),
|
||||
None
|
||||
)
|
||||
|
||||
if not success or bytes_read.value != 512:
|
||||
raise RuntimeError("读取卷引导扇区失败。")
|
||||
|
||||
finally:
|
||||
ctypes.windll.kernel32.CloseHandle(handle)
|
||||
|
||||
# 解析 Bytes Per Sector (偏移 0x0B,WORD 类型)
|
||||
bytes_per_sector = int.from_bytes(buffer[0x0B:0x0D], byteorder='little')
|
||||
|
||||
# 解析 Sectors Per Cluster (偏移 0x0D,BYTE 类型)
|
||||
sectors_per_cluster = buffer[0x0D]
|
||||
|
||||
# 计算簇大小
|
||||
cluster_size = bytes_per_sector * sectors_per_cluster
|
||||
|
||||
return {
|
||||
"BytesPerSector": bytes_per_sector,
|
||||
"SectorsPerCluster": sectors_per_cluster,
|
||||
"ClusterSize": cluster_size
|
||||
}
|
||||
|
||||
|
||||
def InsertInfoToDBConfig(config_data, db_path='../src/db_ntfs_info.db', table_name='db_config'):
|
||||
"""
|
||||
将 NTFS 配置信息以键值对形式写入数据库的配置表中。
|
||||
|
||||
参数:
|
||||
config_data: dict,包含配置键值对
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,目标表名(默认为 'db_config')
|
||||
|
||||
返回:
|
||||
None
|
||||
"""
|
||||
# 连接到 SQLite 数据库(如果不存在则会自动创建)
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
Key TEXT UNIQUE NOT NULL,
|
||||
Value TEXT NOT NULL
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入或替换数据(使用 INSERT OR REPLACE)
|
||||
insert_sql = f"""
|
||||
INSERT OR REPLACE INTO {table_name} (Key, Value)
|
||||
VALUES (?, ?)
|
||||
"""
|
||||
|
||||
for key, value in config_data.items():
|
||||
cursor.execute(insert_sql, (key, str(value)))
|
||||
|
||||
conn.commit()
|
||||
print("键值对配置已成功写入数据库")
|
||||
|
||||
except Exception as e:
|
||||
print(f"数据库操作失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
volume = "Z"
|
||||
info = GetNTFSBootInfo(volume)
|
||||
print(f"卷 {volume} 的 BPB 信息:")
|
||||
print(info)
|
||||
InsertInfoToDBConfig(info)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
125
ntfs_utils/db_device.py
Normal file
125
ntfs_utils/db_device.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import sqlite3
|
||||
|
||||
import psutil
|
||||
|
||||
|
||||
def ScanSpecialVolumes(volume_letter):
|
||||
"""
|
||||
扫描指定的单个磁盘卷,返回其基本信息字典。
|
||||
|
||||
参数:
|
||||
volume_letter: str,磁盘盘符(例如 "C", "Z")
|
||||
|
||||
返回:
|
||||
dict: 包含 Path, Type, Option 字段的字典
|
||||
"""
|
||||
# 简单校验盘符格式(去除可能的冒号)
|
||||
if len(volume_letter) >= 1 and volume_letter[-1] == ":":
|
||||
volume_letter = volume_letter[:-1]
|
||||
|
||||
if not volume_letter or len(volume_letter) != 1 or not volume_letter.isalpha():
|
||||
raise ValueError("无效的磁盘盘符,应为单个字母,如 'C' 或 'Z'")
|
||||
|
||||
return {
|
||||
"Path": volume_letter.upper(),
|
||||
"Type": "磁盘",
|
||||
"Option": None
|
||||
}
|
||||
|
||||
|
||||
def ScanNTFSVolumes():
|
||||
"""
|
||||
扫描当前系统中所有 NTFS 格式的磁盘卷。
|
||||
|
||||
返回:
|
||||
list of dict: 包含盘符等信息的字典列表
|
||||
"""
|
||||
ntfs_volumes = []
|
||||
|
||||
for partition in psutil.disk_partitions():
|
||||
if partition.fstype.upper() == 'NTFS':
|
||||
# 提取盘符(去掉冒号)
|
||||
drive_letter = partition.device[0] if len(partition.device) >= 2 and partition.device[1] == ':' else None
|
||||
if drive_letter:
|
||||
ntfs_volumes.append({
|
||||
"Path": drive_letter,
|
||||
"Type": "磁盘",
|
||||
"Option": None
|
||||
})
|
||||
|
||||
return ntfs_volumes
|
||||
|
||||
|
||||
def InsertVolumesToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_device'):
|
||||
"""
|
||||
将 NTFS 磁盘信息写入数据库表,并防止重复插入。
|
||||
|
||||
参数:
|
||||
data: list of dict,包含 Path, Type, Option 字段的数据
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,目标表名
|
||||
|
||||
返回:
|
||||
int: 成功插入的记录数
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在),并添加 Path 的唯一性约束
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
Path TEXT NOT NULL UNIQUE,
|
||||
Type TEXT NOT NULL CHECK(Type IN ('磁盘', '文件', '文件夹')),
|
||||
Option TEXT
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
inserted_count = 0
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (Path, Type, Option)
|
||||
VALUES (?, ?, ?)
|
||||
"""
|
||||
|
||||
for item in data:
|
||||
cursor.execute(insert_sql, (
|
||||
item['Path'],
|
||||
item['Type'],
|
||||
item['Option']
|
||||
))
|
||||
if cursor.rowcount > 0:
|
||||
inserted_count += 1
|
||||
|
||||
conn.commit()
|
||||
print(f"✅ 成功插入 {inserted_count} 条 NTFS 磁盘信息")
|
||||
return inserted_count
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 数据库操作失败: {e}")
|
||||
conn.rollback()
|
||||
return 0
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def main():
|
||||
# 扫描系统下所有 NTFS 磁盘
|
||||
# volumes = ScanNTFSVolumes()
|
||||
# print("🔍 找到以下 NTFS 磁盘:")
|
||||
# for vol in volumes:
|
||||
# print(vol)
|
||||
#
|
||||
# success_count = InsertVolumesToDB(volumes)
|
||||
# print(f"共插入 {success_count} 条记录到数据库。")
|
||||
|
||||
# 扫描单个磁盘
|
||||
volume_latter = "Z"
|
||||
device_data = ScanSpecialVolumes(volume_latter)
|
||||
InsertVolumesToDB([device_data])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
68
ntfs_utils/db_extend_name.py
Normal file
68
ntfs_utils/db_extend_name.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def InsertExtensionsToDB(extensions, db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
|
||||
"""
|
||||
将扩展名列表插入到数据库中,自动忽略重复项。
|
||||
|
||||
参数:
|
||||
extensions: list of str,要插入的扩展名列表(如 ["txt", "jpg"])
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,扩展名表名
|
||||
|
||||
返回:
|
||||
int: 成功插入的新记录数量
|
||||
"""
|
||||
if not isinstance(extensions, list):
|
||||
raise TypeError("extensions 必须是一个列表")
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ExtendName TEXT UNIQUE NOT NULL
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入语句(忽略重复)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (ExtendName)
|
||||
VALUES (?)
|
||||
"""
|
||||
|
||||
# 构造插入数据格式
|
||||
data_to_insert = [(ext.lower(),) for ext in extensions if ext.strip()]
|
||||
|
||||
# 批量插入
|
||||
cursor.executemany(insert_sql, data_to_insert)
|
||||
conn.commit()
|
||||
|
||||
inserted_count = cursor.rowcount
|
||||
if inserted_count > 0:
|
||||
print(f"✅ 成功插入 {inserted_count} 个扩展名")
|
||||
else:
|
||||
print("⚠️ 没有新的扩展名被插入(可能已存在)")
|
||||
|
||||
return inserted_count
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
return 0
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例调用
|
||||
if __name__ == "__main__":
|
||||
# 常见的文件扩展名列表(可选)
|
||||
common_extensions = ["txt", "log", "csv", "xls", "xlsx", "doc", "docx"]
|
||||
|
||||
count = InsertExtensionsToDB(common_extensions)
|
||||
print(f"共插入 {count} 个新扩展名。")
|
65
ntfs_utils/db_group.py
Normal file
65
ntfs_utils/db_group.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def InsertGroupToDB(group_name_list, db_path='../src/db_ntfs_info.db', table_name='db_group'):
|
||||
"""
|
||||
向用户组表中插入多个组名。
|
||||
|
||||
参数:
|
||||
group_name_list: list of str,要插入的组名列表
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,用户组表名
|
||||
|
||||
返回:
|
||||
int: 成功插入的记录数
|
||||
"""
|
||||
if not isinstance(group_name_list, list):
|
||||
raise TypeError("group_name_list 必须是一个列表")
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
GroupName TEXT UNIQUE NOT NULL
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 构建插入数据格式
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (GroupName)
|
||||
VALUES (?)
|
||||
"""
|
||||
|
||||
data_to_insert = [(name,) for name in group_name_list]
|
||||
|
||||
# 批量插入
|
||||
cursor.executemany(insert_sql, data_to_insert)
|
||||
conn.commit()
|
||||
|
||||
inserted_count = cursor.rowcount
|
||||
if inserted_count > 0:
|
||||
print(f"✅ 成功插入 {inserted_count} 个组名")
|
||||
else:
|
||||
print("⚠️ 没有新的组名被插入(可能已存在)")
|
||||
|
||||
return inserted_count
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
return 0
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例调用
|
||||
if __name__ == "__main__":
|
||||
groups = ["Copier", "Admin", "Guest", "Developer"]
|
||||
count = InsertGroupToDB(groups)
|
||||
print(f"共插入 {count} 个新组名。")
|
178
ntfs_utils/db_path.py
Normal file
178
ntfs_utils/db_path.py
Normal file
@@ -0,0 +1,178 @@
|
||||
import hashlib
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GenerateHash(s: str) -> str:
|
||||
"""
|
||||
对输入字符串生成 SHA-256 哈希值。
|
||||
用于唯一标识一个路径(PathHash)。
|
||||
"""
|
||||
return hashlib.sha256(s.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def ShouldSkipPath(path: str) -> bool:
|
||||
"""
|
||||
判断是否应跳过该路径(NTFS元文件或系统文件夹)。
|
||||
"""
|
||||
name = os.path.basename(path)
|
||||
if name.startswith('$'):
|
||||
return True
|
||||
if name == "System Volume Information":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def ScanVolume(volume_letter: str):
|
||||
"""
|
||||
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
|
||||
并为每个节点分配 ParentID。
|
||||
|
||||
返回:
|
||||
list of dict:包含文件/目录信息的字典列表
|
||||
"""
|
||||
root_path = f"{volume_letter.upper()}:\\"
|
||||
if not os.path.exists(root_path):
|
||||
raise ValueError(f"磁盘 {root_path} 不存在")
|
||||
|
||||
result = []
|
||||
path_to_id = {} # 用于记录路径到数据库 ID 的映射
|
||||
counter = 1 # 模拟数据库自增 ID
|
||||
|
||||
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
|
||||
# 过滤掉需要跳过的目录
|
||||
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
|
||||
|
||||
for entry in files + dirs:
|
||||
full_path = os.path.join(root, entry)
|
||||
|
||||
if ShouldSkipPath(full_path):
|
||||
continue
|
||||
|
||||
try:
|
||||
if os.path.isdir(full_path):
|
||||
is_dir = 1
|
||||
bytes_size = 0
|
||||
elif os.path.isfile(full_path):
|
||||
is_dir = 0
|
||||
bytes_size = os.path.getsize(full_path)
|
||||
else:
|
||||
continue
|
||||
|
||||
name = entry
|
||||
|
||||
# ✅ 修正点:对 Path 字段进行哈希
|
||||
path_hash = GenerateHash(full_path)
|
||||
|
||||
# 计算 ContentSize(KB),小文件至少显示为 1 KB
|
||||
content_size = bytes_size // 1024
|
||||
if content_size == 0 and bytes_size > 0:
|
||||
content_size = 1
|
||||
|
||||
# 获取父目录路径
|
||||
parent_path = os.path.dirname(full_path)
|
||||
parent_id = path_to_id.get(parent_path, 0) # 默认为 0(根目录可能未录入)
|
||||
|
||||
item = {
|
||||
"ID": counter,
|
||||
"Path": full_path,
|
||||
"Name": name,
|
||||
"PathHash": path_hash,
|
||||
"IsDir": is_dir,
|
||||
"ParentID": parent_id,
|
||||
"ContentSize": content_size
|
||||
}
|
||||
|
||||
result.append(item)
|
||||
path_to_id[full_path] = counter
|
||||
counter += 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
"""
|
||||
批量将扫描结果写入数据库。
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
Path TEXT NOT NULL,
|
||||
Name TEXT NOT NULL,
|
||||
PathHash TEXT UNIQUE NOT NULL,
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入语句(忽略重复 PathHash)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name}
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
total_inserted = 0
|
||||
batch = []
|
||||
|
||||
for item in data:
|
||||
batch.append((
|
||||
item['Path'],
|
||||
item['Name'],
|
||||
item['PathHash'],
|
||||
item['IsDir'],
|
||||
item['ParentID'] or 0,
|
||||
item['ContentSize']
|
||||
))
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交一批 {len(batch)} 条数据")
|
||||
batch.clear()
|
||||
|
||||
# 插入剩余数据
|
||||
if batch:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条数据")
|
||||
|
||||
print(f"✅ 总共插入 {total_inserted} 条记录到数据库。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例主函数
|
||||
def main():
|
||||
volume_letter = "Z"
|
||||
|
||||
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
|
||||
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
print("✅ 全盘扫描与入库完成")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
66
ntfs_utils/db_user.py
Normal file
66
ntfs_utils/db_user.py
Normal file
@@ -0,0 +1,66 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def InsertUserToDB(username_list, db_path='../src/db_ntfs_info.db', table_name='db_user'):
|
||||
"""
|
||||
向用户表中插入多个用户名。
|
||||
|
||||
参数:
|
||||
username_list: list of str,要插入的用户名列表
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,用户表名
|
||||
|
||||
返回:
|
||||
int: 成功插入的新用户数量
|
||||
"""
|
||||
if not isinstance(username_list, list):
|
||||
raise TypeError("username_list 必须是一个列表")
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
UserName TEXT UNIQUE NOT NULL
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入语句(忽略重复)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (UserName)
|
||||
VALUES (?)
|
||||
"""
|
||||
|
||||
# 构建数据格式
|
||||
data_to_insert = [(name,) for name in username_list]
|
||||
|
||||
# 批量插入
|
||||
cursor.executemany(insert_sql, data_to_insert)
|
||||
conn.commit()
|
||||
|
||||
inserted_count = cursor.rowcount
|
||||
if inserted_count > 0:
|
||||
print(f"✅ 成功插入 {inserted_count} 个用户")
|
||||
else:
|
||||
print("⚠️ 没有新的用户被插入(可能已存在)")
|
||||
|
||||
return inserted_count
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
return 0
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例调用
|
||||
if __name__ == "__main__":
|
||||
users = ["Alice", "Bob", "Charlie", "David"]
|
||||
count = InsertUserToDB(users)
|
||||
print(f"共插入 {count} 个新用户。")
|
Reference in New Issue
Block a user