import ctypes import sqlite3 def GetNTFSBootInfo(volume_letter): """ 从指定 NTFS 卷的 $Boot 元文件中提取: - Bytes per sector - Sectors per cluster - Cluster size (bytes) - $MFT 起始簇号 参数: volume_letter: str,卷标字符串,例如 'C' 返回: dict 包含上述信息 """ # 构造设备路径,格式为 \\.\C: device_path = f"\\\\.\\{volume_letter}:" # 打开卷设备(需要管理员权限) handle = ctypes.windll.kernel32.CreateFileW( device_path, 0x80000000, # GENERIC_READ 0x00000001 | 0x00000002, # FILE_SHARE_READ | FILE_SHARE_WRITE None, 3, # OPEN_EXISTING 0, None ) if handle == -1: raise PermissionError(f"无法打开卷 {volume_letter},请以管理员身份运行。") try: buffer = bytearray(512) buffer_address = (ctypes.c_byte * len(buffer)).from_buffer(buffer) bytes_read = ctypes.c_ulong(0) # 读取第一个扇区(BPB / $Boot 扇区) success = ctypes.windll.kernel32.ReadFile( handle, buffer_address, len(buffer), ctypes.byref(bytes_read), None ) if not success or bytes_read.value != 512: raise RuntimeError("读取卷引导扇区失败。") finally: ctypes.windll.kernel32.CloseHandle(handle) # 解析 Bytes Per Sector (偏移 0x0B,WORD 类型) bytes_per_sector = int.from_bytes(buffer[0x0B:0x0D], byteorder='little') # 解析 Sectors Per Cluster (偏移 0x0D,BYTE 类型) sectors_per_cluster = buffer[0x0D] # 计算簇大小 cluster_size = bytes_per_sector * sectors_per_cluster # 解析 $MFT 起始簇号(LCN),偏移 0x30,QWORD(8 字节) mft_lcn_bytes = buffer[0x30:0x38] mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False) return { "BytesPerSector": bytes_per_sector, "SectorsPerCluster": sectors_per_cluster, "ClusterSize": cluster_size, "MftPosition": mft_lcn } def InsertInfoToDBConfig(config_data, db_path='../src/db_ntfs_info.db', table_name='db_config'): """ 将 NTFS 配置信息以键值对形式写入数据库的配置表中。 参数: config_data: dict,包含配置键值对 db_path: str,SQLite 数据库路径 table_name: str,目标表名(默认为 'db_config') 返回: None """ # 连接到 SQLite 数据库(如果不存在则会自动创建) conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 创建表(如果不存在) create_table_sql = f""" CREATE TABLE IF NOT EXISTS {table_name} ( ID INTEGER PRIMARY KEY AUTOINCREMENT, Key TEXT UNIQUE NOT NULL, Value TEXT NOT NULL ); """ cursor.execute(create_table_sql) # 插入或替换数据(使用 INSERT OR REPLACE) insert_sql = f""" INSERT OR REPLACE INTO {table_name} (Key, Value) VALUES (?, ?) """ for key, value in config_data.items(): cursor.execute(insert_sql, (key, str(value))) conn.commit() print("键值对配置已成功写入数据库") except Exception as e: print(f"数据库操作失败: {e}") conn.rollback() finally: conn.close() def main(): volume = "Z" info = GetNTFSBootInfo(volume) print(f"卷 {volume} 的 BPB 信息:") print(info) InsertInfoToDBConfig(info) if __name__ == "__main__": main()