
摘要
本報告分析一個在以 Rust 為基礎的核心元件中發現的安全弱點,特別是發生於 Windows 的繪圖裝置介面(Graphics Device Interface, GDI)。該弱點是透過針對 metafile 的有針對性的模糊測試(fuzzing)攻擊發現的。本文說明了模糊測試流程的方法論、所使用的工具,以及重現與分析核心層級當機的技術面細節。此外,報告進一步探討了基於網路的模糊樣本傳輸和伺服器端收集機制,並提供了測試工具和資料處理的程式碼範例和架構見解。研究結果突顯了持續性安全測試的重要性,尤其是在像 Rust 這類新興技術被引入到關鍵系統元件時。

1. 簡介
現代作業系統越來越多地整合先進的程式語言,以提升安全性與效能。Rust 在 Windows 核心的導入代表了朝向記憶體安全系統程式設計的一個重要轉變。然而,即使具備 Rust 的內建安全特性,仍然可能會出現弱點,特別是在與舊有系統介面互動或於核心環境中發生複雜互動時。本研究聚焦於一個在以 Rust 為基礎的 GDI 元件中發現的特定弱點,該弱點是透過大規模模糊測試攻擊所揭露。所採用的方法論不僅辨識出一個關鍵的錯誤,還促成了在困難測試環境中重現與分析核心當機的新技術發展。
2. 模糊測試方法與環境
模糊測試 (fuzzing)是一種動態軟體測試技術,透過餵入不良或意料外的輸入來揭露缺陷,例如當機、記憶體外洩或 assertion failure。本次攻擊的目標為 Windows 的繪圖元件,特別針對 metafile 的處理進行測試。有效的模糊測試需倚賴健全的測試環境,研究者使用了數個專用工具來支援測試工作。
2.1. 模糊測試工具
主要使用的 fuzzer 為 WinAFL ,這是一個為 Windows 二進位檔調整過且在揭露多項弱點方面成效顯著的模糊測試工具。為了管理中等規模的 fuzz 測試作業,研究者採用 WinAFL Pet 來簡化 fuzzing job 的建立、設定與監控。為有效分析程式當機的根本原因,則使用 BugId 來提供快速且完整的分析。
2.2. 目標選擇:Metafiles
繪圖裝置介面 (GDI)
是 Windows 的基礎元件,負責繪製圖形。本研究特別鎖定 metafile,包括
Enhanced Metafile Format (EMF)
及其變體
Enhanced Metafile Format Plus (EMF+)
。EMF 檔案包含對 GDI 函式的指令,而 EMF+ 記錄可被嵌入在 EMF 記錄中,使得複雜的繪圖指令得以表現。EMF+ 的嵌入是透過
EMR_COMMENT_EMFPLUS
記錄來實現,它可以封裝任意的私有資料。EMF+ 與其新增的 metafile 記錄所帶來的複雜性,使其成為一個重要的攻擊面。
2.3. 測試執行與「拒絕模糊測試」現象
模糊測試攻擊從 16 個初始種子檔案(包含 EMF+ 樣本)開始。在數日內辨識出多個潛在安全弱點。一項顯著的觀察是反覆出現的系統當機,研究者稱之為「Denial of Fuzzing」狀態,這指向一個核心層級的錯誤。此當機會導致
BugCheck
與系統重啟,顯示問題比一般使用者空間弱點更為深遠。接下來的挑戰從發現新弱點,轉為如何一致地重現與分析這個核心錯誤。
3. 核心錯誤重現與分析
在模糊測試環境中穩定地重現核心當機具有其獨特挑戰,尤其當 fuzzer 實例以共用記憶體模式運作並使用 RAM disk 以提升效能時。研究者採取了有系統的方法來克服這些困難。
3.1. 記憶體傾印分析(Memory Dump Analysis)
為了分析當機時作業系統的狀態,擷取記憶體傾印是必要的。然而,WinAFL 使用 RAM disk 與共用記憶體模式,使得精確找出導致當機的 fuzzed 樣本變得困難。傳統方法,例如在記憶體傾印中搜尋 EMF 簽章或使用像
Volatility
的
FileScan
與
DumpFiles
模組,對於大量檔案來說效率不佳。
3.2. 借助 MemProcFS
MemProcFS
工具在其
forensic
模式下,提供了對整個記憶體傾印自動識別並擷取檔案的更有效解法。透過在當機瞬間從 RAM disk 取得 fuzzer 狀態的快照,研究者得以擷取候選的種子檔案。以這些擷取的樣本為起點重新進行模糊測試,顯著縮短了重現核心錯誤所需的時間,將在一組 836 個檔案的資料集中重現時間降至 30 分鐘以內。
4. 遠端資料傳輸的模糊測試工具修改
為了進一步精簡分析流程並確保對 fuzzer 效能的影響最小,研究者修改了 fuzzing harness,使其能將變異後的測試檔案透過網路傳送至遠端伺服器。這允許集中式收集與分析導致當機的樣本,而不干擾 fuzzer 的執行。
4.1. 用戶端實作(C 程式碼)
用戶端的修改加入了一個
send_data()
函式到 fuzzing harness。該函式負責建立網路連線、傳送資料大小,然後傳輸實際的 fuzzed 樣本。包含了錯誤處理機制以確保穩健性。
- // Listing 1: Client-side modification to send every mutation to the server [1]
- int send_data(char* data, uint32_t size) {
- // Initialize Winsock library
- WSADATA wsa;
- // Socket descriptor
- SOCKET s;
- // Server address structure
- struct sockaddr_in server;
- // Remote server IP address (example: 192.168.1.1)
- wchar_t ip_address[] = L"192.168.1.1";
- // Set server address family and port
- server.sin_family = AF_INET;
- server.sin_port = htons(4444); // Port 4444 for communication
- // Initialize Winsock
- if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) {
- // Return error if Winsock initialization fails
- return 1;
- }
- // Create a socket
- if ((s = socket(AF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
- // Clean up Winsock and return error if socket creation fails
- WSACleanup();
- return 1;
- }
- // Convert IP address from wide character string to network address structure
- if (InetPton(AF_INET, ip_address, &(server.sin_addr)) != 1) {
- // Close socket, clean up Winsock, and return error if IP conversion fails
- closesocket(s);
- WSACleanup();
- return 1;
- }
- // Connect to the remote server
- if (connect(s, (struct sockaddr*)&server, sizeof(server)) < 0) {
- // Close socket, clean up Winsock, and return error if connection fails
- closesocket(s);
- WSACleanup();
- return 1;
- }
- // Convert data size to network byte order and send it
- uint32_t size_header = htonl(size);
- if (send(s, (char*)&size_header, sizeof(size_header), 0) < 0) {
- // Close socket, clean up Winsock, and return error if sending size fails
- closesocket(s);
- WSACleanup();
- return 1;
- }
- // Send the actual data
- if (send(s, data, size, 0) < 0) {
- // Close socket, clean up Winsock, and return error if sending data fails
- closesocket(s);
- WSACleanup();
- return 1;
- }
- // Close the socket and clean up Winsock
- closesocket(s);
- WSACleanup();
- return 0;
- }
4.2. 伺服端實作(Python 程式碼)
在伺服端,一個 Python 腳本會持續監聽來自 fuzzing harness 的連線。每個連線以獨立執行緒處理,以便併發處理多個傳入樣本。腳本接收資料後將其儲存為單獨的檔案,並週期性地將蒐集到的檔案壓縮成 ZIP 檔以有效管理儲存空間。
- # Listing 2: Server-side Python script to receive and store fuzzed samples [1]
- #!/usr/bin/env python3
- import os
- import socket
- import zipfile
- import threading
- from concurrent.futures import ThreadPoolExecutor
- # Global counters for files and zip archives, protected by a lock
- file_counter = 0
- file_counter_lock = threading.Lock()
- zip_counter = 1
- def handle_client(client_socket, address):
- global file_counter, zip_counter
- try:
- # Receive data size (4 bytes, big-endian)
- data_size_bytes = client_socket.recv(4)
- if not data_size_bytes:
- return
- data_size = int.from_bytes(data_size_bytes, byteorder='big')
- # Receive the actual data in chunks
- data = bytearray()
- while len(data) < data_size:
- packet = client_socket.recv(min(1024, data_size - len(data)))
- if not packet:
- break
- data.extend(packet)
- # Ensure all data is received
- if len(data) != data_size:
- print(f"[!] Incomplete data received from {address}")
- return
- with file_counter_lock:
- file_counter += 1
- file_name = f"id_{file_counter:06d}"
- print(f"Received {file_counter} from {address}")
- # Save the received data to a file
- with open(file_name, "wb") as file:
- file.write(data)
- # Periodically zip files and clean up old ones
- if file_counter % 5000 == 0:
- zip_name = f"archive_{zip_counter:03d}.zip"
- with zipfile.ZipFile(zip_name, 'w') as zipf:
- # Zip the last 5000 files
- for i in range(file_counter - 4999, file_counter + 1):
- file_to_zip = f"id_{i:06d}"
- if os.path.exists(file_to_zip):
- zipf.write(file_to_zip)
- os.remove(file_to_zip) # Delete original after zipping
- zip_counter += 1
- print(f"[*] Created {zip_name} and cleaned up files.")
- except Exception as e:
- print(f"[!] Error handling client {address}: {e}")
- finally:
- client_socket.close()
- def main():
- server_ip = "0.0.0.0" # Listen on all available interfaces
- server_port = 4444 # Listen on port 4444
- server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Allow reusing the address
- server.bind((server_ip, server_port))
- server.listen(5) # Max 5 pending connections
- print(f"[*] Listening on {server_ip}:{server_port}")
- # Use a thread pool to handle multiple client connections concurrently
- with ThreadPoolExecutor(max_workers=20) as executor:
- while True:
- client_socket, addr = server.accept()
- print(f"[*] Accepted connection from {addr[0]}:{addr[1]}")
- executor.submit(handle_client, client_socket, addr)
- if __name__ == "__main__":
- main()
4.3. 遠端模糊測試資料蒐集架構
遠端模糊測試資料蒐集的整體架構包括在測試系統上執行的 fuzzer,持續產生並變異樣本。修改後的 harness 將這些樣本送到專用伺服器以供儲存與進一步分析。這樣的設置能將對 fuzzing 程序的 overhead 降到最低,並集中保存可能導致當機的輸入樣本。
graph TD A["Fuzzing Harness (WinAFL)"] -->|Mutated Samples| B{"send_data() function"}; B -->|"Network (TCP/4444)"| C["Remote Server
(Python Script)"]; C --> D["File Storage
(id_XXXXXX files)"]; D --> E["Archiving (archive_XXX.zip)"]; subgraph Test System A end subgraph Collection Server C D E end
圖 1: 遠端模糊測試資料蒐集架構。
5. 結論
針對 Windows 中以 Rust 為基礎的 GDI 元件所做的研究顯示,即便使用現代且具記憶體安全保障的程式語言,透過模糊測試進行全面性的安全檢測仍然不可或缺。最初呈現為「Denial of Fuzzing」的核心層級弱點,凸顯出核心開發的複雜性與積極安全研究的重要性。利用像 MemProcFS 這類記憶體鑑識工具來重現核心當機,以及為變異樣本開發遠端傳輸機制,有效提升了模糊測試攻擊的效率與成效。此項工作為保護關鍵系統元件提供了寶貴的見解,並強調在作業系統發展演進過程中持續維持健全安全實務的必要性。