브레이크포인트 부분을 my_debugger에 확장
#-*- coding: utf-8 -*-
from ctypes import *
from my_debugger_defines import *
kernel32 = windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
self.h_thread = None
self.context = None
self.exception = None
self.exception_address = None
self.breakpoints = {}
self.first_breakpoint = True
def load(self,path_to_exe):
# dwCreation 플래그를 이용해 프로세스를 어떻게 생성할 것인지 판단.
# 계산기의 GUI를 보고자 한다면 creation_flags를 CREATE_NEW_CONSOLE로 설정
creation_flags = DEBUG_PROCESS
#구조체 인스턴트화
startupinfo = STARTUPINFO()
process_information = PROCESS_INFORMATION()
# 프로세스가 독립적인 창으로 실행되게 해줌
# STARTUPINFO struct 구조체내용에따라 디버기프로세스에 어떤영향을 미치는지 보여줌
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
# STARTUPINFO struct 구조체 자신의 크기를 나타내는 cb 변수값을 초기화
startupinfo.cb = sizeof(startupinfo)
if kernel32.CreateProcessA(path_to_exe,
None,
None,
None,
None,
creation_flags,
None,
None,
byref(startupinfo),
byref(process_information)):
print "[*] We have successfully launched the process!"
print "[*] PID : %d" % process_information.dwProcessId
#새로 생성한 프로세스의 핸들을 구한 후 나중에 접근하기 위해 저장.
self.h_process = self.open_process(
process_infomation.dwProcessId)
else:
print "[*] Error : 0x%08x." % kernel32.GetLastError()
def open_process(self,pid):
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS,False,pid)
return h_process
def attach(self,pid):
self.h_process = self.open_process(pid)
#프로세스에 대한 어태치를 시도한다. 실패하면호출 종료
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
else:
print "[*] Unable to attach to the process."
def run(self):
#디버기에 대한 디버그 이벤트를 처리
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
resultMap = {
1:"EXCEPTION_DEBUG_EVENT",
2:"CREATE_THREAD_DEBUG_EVENT",
3:"CREATE_PROCESS_DEBUG_EVENT",
4:"EXIT_THREAD_DEBUG_EVENT",
5:"EXIT_PROCESS_DEBUG_EVENT",
6:"LOAD_DLL_DEBUG_EVENT",
7:"UNLOAD_DLL_DEBUG_EVENT",
8:"OUTPUT_DEBUG_STRING_EVENT",
9:"RIP_EVENT"
}
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
#스레드의 컨텍스트 정보를 구한다.
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
self.debug_event = debug_event
print "Event Code: %d Thread Id :%d"%(debug_event.dwDebugEventCode,
debug_event.dwThreadId)
#발생한 이벤트의 종류가 예외 이벤트이면 그것을 좀 더 자세히 조사한다.
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
#예외 코드를 구한다.
self.exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
if self.exception == EXCEPTION_ACCESS_VIOLATION:
print "Access Violation Detected"
elif self.exception == EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
elif self.exception == EXCEPTION_GUARD_PAGE:
print "Guard Page Access Detected"
elif self.exception == EXCEPTION_SINGLE_STEP:
print "Single Stepping"
kernel32.ContinueDebugEvent(
debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status )
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print "[*] Finished debugging. Exiting..."
return True
else:
print "There was an error"
return False
def open_thread (self,thread_id):
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS,None,thread_id)
if h_thread is not None:
return h_thread
else:
print "[*] Could not obtain a valid thread handle."
return False
def enumerate_threads(self):
thread_entry =THREADENTRY32()
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(
TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
#먼저 구조체의 크기 설정
thread_entry.dwSize = sizeof(thread_entry)
success = kernel32.Thread32First(snapshot,
byref(thread_entry))
while success:
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
success = kernel32.Thread32Next(snapshot,
byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
def get_thread_context (self, thread_id=None, h_thread = None):
context = CONTEXT()
context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(self.h_thread, byref(context)):
return context
else:
return False
def exception_handler_breakpoint(self):
#print "[+] Inside the breakpoint handler."
print "Exception Address: 0x%08x" % self.exception_address
if not self.breakpoints.has_key(self.exception_address):
if self.first_breakpoint == True:
self.first_breakpoint = False
print "[+] Hit the first breakpoint."
return DBG_CONTINUE
else:
print "[+] Hit user defined breakpoint."
self.write_process_memory(self.exception_address,self.breakpoints[self.exception_address])
self.context = self.get_thread_context(h_thread=self.h_thread)
self.context.Eip -= 0x01 # Error!
kernel32.SetThreadContext(self.h_thread,byref(self.context))
continue_status = DBG_CONTINUE
return continue_status
def read_process_memory(self,address,length):
data = ""
read_buf = create_string_buffer(length)
count = c_ulong(0)
if not kernel32.ReadProcessMemory(self.h_process,
address,
read_buf,
length,
byref(count)):
return False
else:
data += read_buf.raw
return data
def write_process_memory(self,address,data):
count = c_ulong(0)
length = len(data)
c_data = c_char_p(data[count.value:])
kernel32.VirtualProtect(self.h_process, address, 0x40, dwSi)
if not kernel32.WriteProcessMemory(self.h_process,
address,
c_data,
length,
byref(count)):
return False
else:
return True
def bp_set(self,address):
if not self.breakpoints.has_key(address):
try:
#원래의 바이트값을 저장.
original_byte = self.read_process_memory(address,1)
#INT3 opcode를 써넣음
self.write_process_memory(address, "\xCC")
#내부 리스트에 브레이크보인트 등록
self.breakpoints[address] = (original_byte)
print self.breakpoints[address]
except:
return False
return True
def func_resolve(self,dll,function):
handle = kernel32.GetModuleHandleA(dll)
address = kernel32.GetProcAddress(handle, function)
kernel32.CloseHandle(handle)
return address
루프를 돌면서 printf()함수를 호출하는 테스트 프로그램
#printf_loop.py
from ctypes import *
import time
msvcrt = cdll.msvcrt
counter = 0
while 1:
msvcrt.printf("Loop iteration %d!\n" % counter)
time.sleep(2)
counter +=1
printf() 함수에 브레이크포인트를 테스트 할 수 있게 변경한 테스트프로그램
#my_test.py
#-*- coding: utf-8 -*-
import my_debugger
debugger = my_debugger.debugger()
pid = raw_input("Enter the PID of the process to attach to: ")
debugger.attach(int(pid))
printf_address = debugger.func_resolve("msvcrt.dll","printf")
print "[*] Address of printf : 0x%08x" % printf_address
debugger.bp_set(printf_address)
debugger.run()
print_loop.py를 실행시키고 작업관리자로 python.exe 프로세스의 PID를 구한다.
그 후 my_test.py 스크립트를 실행시켜 python.exe의 PID를 입력한다
결과
Enter the PID of the process to attach to: 3104
[*] Address of printf : 0x7720c5b9
Event Code: 3 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 6 Thread Id :1924
Event Code: 2 Thread Id :3852
Event Code: 1 Thread Id :3852
Exception Address: 0x77a040f0
[+] Hit the first breakpoint.
Event Code: 4 Thread Id :3852