135 lines
3.6 KiB
Python
135 lines
3.6 KiB
Python
import ctypes
|
||
import sqlite3
|
||
|
||
|
||
def GetNTFSBootInfo(volume_letter):
|
||
"""
|
||
从指定 NTFS 卷的 $Boot 元文件中提取:
|
||
- Bytes per sector
|
||
- Sectors per cluster
|
||
- Cluster size (bytes)
|
||
- $MFT 起始簇号
|
||
|
||
参数:
|
||
volume_letter: str,卷标字符串,例如 'C'
|
||
|
||
返回:
|
||
dict 包含上述信息
|
||
"""
|
||
|
||
# 构造设备路径,格式为 \\.\C:
|
||
device_path = f"\\\\.\\{volume_letter}:"
|
||
|
||
# 打开卷设备(需要管理员权限)
|
||
handle = ctypes.windll.kernel32.CreateFileW(
|
||
device_path,
|
||
0x80000000, # GENERIC_READ
|
||
0x00000001 | 0x00000002, # FILE_SHARE_READ | FILE_SHARE_WRITE
|
||
None,
|
||
3, # OPEN_EXISTING
|
||
0,
|
||
None
|
||
)
|
||
|
||
if handle == -1:
|
||
raise PermissionError(f"无法打开卷 {volume_letter},请以管理员身份运行。")
|
||
|
||
try:
|
||
buffer = bytearray(512)
|
||
buffer_address = (ctypes.c_byte * len(buffer)).from_buffer(buffer)
|
||
bytes_read = ctypes.c_ulong(0)
|
||
|
||
# 读取第一个扇区(BPB / $Boot 扇区)
|
||
success = ctypes.windll.kernel32.ReadFile(
|
||
handle,
|
||
buffer_address,
|
||
len(buffer),
|
||
ctypes.byref(bytes_read),
|
||
None
|
||
)
|
||
|
||
if not success or bytes_read.value != 512:
|
||
raise RuntimeError("读取卷引导扇区失败。")
|
||
|
||
finally:
|
||
ctypes.windll.kernel32.CloseHandle(handle)
|
||
|
||
# 解析 Bytes Per Sector (偏移 0x0B,WORD 类型)
|
||
bytes_per_sector = int.from_bytes(buffer[0x0B:0x0D], byteorder='little')
|
||
|
||
# 解析 Sectors Per Cluster (偏移 0x0D,BYTE 类型)
|
||
sectors_per_cluster = buffer[0x0D]
|
||
|
||
# 计算簇大小
|
||
cluster_size = bytes_per_sector * sectors_per_cluster
|
||
|
||
# 解析 $MFT 起始簇号(LCN),偏移 0x30,QWORD(8 字节)
|
||
mft_lcn_bytes = buffer[0x30:0x38]
|
||
mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False)
|
||
|
||
return {
|
||
"BytesPerSector": bytes_per_sector,
|
||
"SectorsPerCluster": sectors_per_cluster,
|
||
"ClusterSize": cluster_size,
|
||
"MftPosition": mft_lcn
|
||
}
|
||
|
||
|
||
def InsertInfoToDBConfig(config_data, db_path='../src/db_ntfs_info.db', table_name='db_config'):
|
||
"""
|
||
将 NTFS 配置信息以键值对形式写入数据库的配置表中。
|
||
|
||
参数:
|
||
config_data: dict,包含配置键值对
|
||
db_path: str,SQLite 数据库路径
|
||
table_name: str,目标表名(默认为 'db_config')
|
||
|
||
返回:
|
||
None
|
||
"""
|
||
# 连接到 SQLite 数据库(如果不存在则会自动创建)
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
try:
|
||
# 创建表(如果不存在)
|
||
create_table_sql = f"""
|
||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
Key TEXT UNIQUE NOT NULL,
|
||
Value TEXT NOT NULL
|
||
);
|
||
"""
|
||
cursor.execute(create_table_sql)
|
||
|
||
# 插入或替换数据(使用 INSERT OR REPLACE)
|
||
insert_sql = f"""
|
||
INSERT OR REPLACE INTO {table_name} (Key, Value)
|
||
VALUES (?, ?)
|
||
"""
|
||
|
||
for key, value in config_data.items():
|
||
cursor.execute(insert_sql, (key, str(value)))
|
||
|
||
conn.commit()
|
||
print("键值对配置已成功写入数据库")
|
||
|
||
except Exception as e:
|
||
print(f"数据库操作失败: {e}")
|
||
conn.rollback()
|
||
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def main():
|
||
volume = "Z"
|
||
info = GetNTFSBootInfo(volume)
|
||
print(f"卷 {volume} 的 BPB 信息:")
|
||
print(info)
|
||
InsertInfoToDBConfig(info)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|