Aperi’CTF 2019: x32 Emulator
Challenge details
Event | Challenge | Category | Points | Solves |
---|---|---|---|---|
Aperi’CTF 2019 | x32 Emulator | MISC | 250 | 6 |
Maintenant que vous vous êtes suffisamment familiarisés avec ce langage, il va vous falloir un moyen d’exécuter du bytecode x32. Comment comptez-vous analyser leurs microcontrôleurs sinon ?
Le serveur va vous donner plusieurs suites de bytecode x32, il faudra lui envoyer le résultat de l’exécution de celles-ci. Vous pouvez être sûr que le serveur ne vous enverra que du bytecode valide.
nc x32.aperictf.fr 32323
Note : Lisez la documentation de ce langage avant de commencer.
Ressource complémentaire : la documentation - md5sum: 5ac1ef34b4641b319281e65d80e84411
Methodology
The service gives us x32 bytecode encoded in hexadecimal and require us to supply the output of executing the bytecode. If our given result doesn’t match the expected one, the server will gracefully show us the intended result. 300 checks must be passed and each time a new bytecode is generated, we can’t fool the server by remembering the expected outputs. We have to implement an emulator.
To simplify a bit this tedious task, we know that the bytecode produced by the service is 100% valid, so there is no need to spend much time on error handling.
General idea
First, it’s mandatory to read the documentation of the language because it’s what we will need to implement and there are examples given. To implement an emulator, I decided to write a python class.
An x32 emulator must have a stack, registers, stdin, stdout and a set of instructions to execute.
The stack
To represent the stack a simple list of 256 bytes is used.
Registers
Registers are properties of the class, one per register. A table is used to map register IDs to register names. Functions to get/set a register value based on it’s name must be implemented.
IO
STDIN and STDOUT are both represented by a byte string. This way we can dissociate between the real IO streams of the machine running the emulator and directly access the result by accessing a property.
Instructions
A table is used to map each bytecode to an instruction and a corresponding function that will handle the logic behind this instruction.
Execution flow
To execute the entire bytecode, the emulator will start at IP=0. Depending on the bytecode encountered the appropriate function will be called. Each instruction updates the IP register which will automatically move to the next instruction.
Solution
The complete class can be found in emulator.py.
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author: ENOENT
class Emulator:
STDIN = bytes()
STDOUT = bytes()
stack = list(bytes(256))
IP = 0
SP = 0xFF
R1, R2, R3, R4 = 0, 0, 0, 0
A1 = 0
ZF, GF = 0, 0
bytecode = bytes()
opcodeNames = {
0x11 : "IN",
0x12 : "OUT",
0x31 : "SET",
0x22 : "LOAD",
0x23 : "STORE",
0x32 : "ADD",
0x33 : "SUB",
0x34 : "XOR",
0x41 : "PUSH",
0x42 : "POP",
0x51 : "CMP",
0x61 : "JG",
0x62 : "JL",
0x63 : "JE",
0x64 : "GOTO"
}
registerID = {
0x1 : "R1",
0x2 : "R2",
0x3 : "R3",
0x4 : "R4",
0x5 : "A1",
0x6 : "SP"
}
def __init__(self, bytecode, debug=False):
self.bytecode = bytecode
self.debug = debug
self.opcodeID = {
0x11 : self._handleIN,
0x12 : self._handleOUT,
0x31 : self._handleSET,
0x22 : self._handleLOAD,
0x23 : self._handleSTORE,
0x32 : self._handleADD,
0x33 : self._handleSUB,
0x34 : self._handleXOR,
0x41 : self._handlePUSH,
0x42 : self._handlePOP,
0x51 : self._handleCMP,
0x61 : self._handleJG,
0x62 : self._handleJL,
0x63 : self._handleJE,
0x64 : self._handleGOTO
}
def _getRegisterValue(self, regname):
if regname == "R1":
return self.R1
elif regname == "R2":
return self.R2
elif regname == "R3":
return self.R3
elif regname == "R4":
return self.R4
elif regname == "A1":
return self.A1
elif regname == "SP":
return self.SP
elif regname == "ZF":
return self.ZF
elif regname == "GF":
return self.GF
else:
return regname
def _setRegisterValue(self, regname, value):
if regname == "R1":
self.R1 = value
elif regname == "R2":
self.R2 = value
elif regname == "R3":
self.R3 = value
elif regname == "R4":
self.R4 = value
elif regname == "A1":
self.A1 = value
elif regname == "SP":
self.SP = value
elif regname == "ZF":
self.ZF = value
elif regname == "GF":
self.GF = value
def _getJumpOffset(self):
b = self.bytecode[self.IP:self.IP+2]
offset = int.from_bytes(b, 'big')
self.IP += 2
return offset
def _getRegisters(self):
b = self.bytecode[self.IP]
a1IsReg = bool(b & 0x80)
a2IsReg = bool(b & 0x40)
reg1 = (b & 0b111000) >> 3
reg2 = b & 0b111
t = [None, None]
if a1IsReg and self.registerID.get(reg1):
t[0] = self.registerID[reg1]
if a2IsReg and self.registerID.get(reg2):
t[1] = self.registerID[reg2]
if (a1IsReg and not self.registerID.get(reg1)) or (a2IsReg and not self.registerID.get(reg2)):
raise Exception("IP = {} : Invalid bytecode for registers".format(self.IP), b)
self.IP += 1
return t
def _getNextInstruction(self):
op = self.bytecode[self.IP]
if not self.opcodeID.get(op):
raise Exception("IP = {} : Invalid bytecode for instruction".format(self.IP), op)
self.IP += 1
return op
def _getArguments(self, op, nArgs, a1CanBeImm, a2CanBeImm=False):
reg1, reg2 = self._getRegisters()
if reg2 and nArgs < 2:
raise Exception("IP = {} : Invalid number of registers for instruction {}".format(self.IP, self.opcodeNames[op]), [reg1, reg2])
arguments = []
# first argument
a1 = 0
if reg1:
a1 = reg1
elif a1CanBeImm:
a1 = self.bytecode[self.IP]
self.IP += 1
else:
raise Exception("IP = {} : Invalid first argument for instruction {}".format(self.IP, self.opcodeNames[op]), a1)
arguments.append(a1)
# second argument
if nArgs > 1:
a2 = 0
if reg2:
a2 = reg2
elif a2CanBeImm:
a2 = self.bytecode[self.IP]
self.IP += 1
else:
raise Exception("IP = {} : Invalid second argument for instruction {}".format(self.IP, self.opcodeNames[op]), a2)
arguments.append(a2)
return arguments
def _handleIN(self, op):
dst = self._getArguments(op, 1, True)[0]
if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
dst = self._getRegisterValue(dst)
if self.A1 > len(self.STDIN):
raise Exception("STDIN too small", self.A1)
data = self.STDIN[:self.A1]
try:
for i in range(dst, dst+self.A1):
self.stack[i] = data[i-dst]
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)
def _handleOUT(self, op):
src = self._getArguments(op, 1, True)[0]
if self.debug : print("{} {}".format(self.opcodeNames[op], src))
src = self._getRegisterValue(src)
try:
# could do that but wouldn't crash if attempting to read outside stack
# data = self.stack[src: src+self.A1]
data = []
for i in range(src, src+self.A1):
data.append(self.stack[i])
self.STDOUT = bytes(data)
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), i)
def _handleSET(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
self._setRegisterValue(dst, src)
def _handleADD(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
a = self._getRegisterValue(dst)
r = (a+src) % 0x100
self.ZF = r == 0
self._setRegisterValue(dst, r)
def _handleSUB(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
a = self._getRegisterValue(dst)
r = (a-src) % 0x100
self.ZF = r == 0
self._setRegisterValue(dst, r)
def _handleXOR(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
a = self._getRegisterValue(dst)
r = a^src
self.ZF = r == 0
self._setRegisterValue(dst, r)
def _handleLOAD(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
try:
self._setRegisterValue(dst, self.stack[src])
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), src)
def _handleSTORE(self, op):
dst, src = self._getArguments(op, 2, True, False)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
dst = self._getRegisterValue(dst)
try:
self.stack[dst] = self._getRegisterValue(src)
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
def _handlePUSH(self, op):
src = self._getArguments(op, 1, True)[0]
if self.debug : print("{} {}".format(self.opcodeNames[op], src))
src = self._getRegisterValue(src)
self.SP -= 1
try:
self.stack[self.SP] = src
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
def _handlePOP(self, op):
dst = self._getArguments(op, 1, False)[0]
if self.debug : print("{} {}".format(self.opcodeNames[op], dst))
try:
src = self.stack[self.SP]
self._setRegisterValue(dst, src)
self.SP -= 1
except:
raise Exception("IP = {} : Out of bounds exception during instruction {}".format(self.IP, self.opcodeNames[op]), dst)
def _handleCMP(self, op):
dst, src = self._getArguments(op, 2, False, True)
if self.debug : print("{} {} {}".format(self.opcodeNames[op], dst, src))
src = self._getRegisterValue(src)
dst = self._getRegisterValue(dst)
self.GF = dst > src
self.ZF = dst == src
def _handleJE(self, op):
offset = self._getJumpOffset()
if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
if self.ZF:
self.IP = offset
def _handleJG(self, op):
offset = self._getJumpOffset()
if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
if self.GF:
self.IP = offset
def _handleJL(self, op):
offset = self._getJumpOffset()
if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
if not self.GF and not self.ZF:
self.IP = offset
def _handleGOTO(self, op):
offset = self._getJumpOffset()
if self.debug : print("{} {}".format(self.opcodeNames[op], offset))
self.IP = offset
def run(self):
N = len(self.bytecode) - 1
while self.IP < N:
if self.debug : print("{} : ".format(self.IP), end="")
op = self._getNextInstruction()
self.opcodeID[op](op)
We can now use it to dialogue with the service and pass all the checks (solve.py).
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
import socket # Because pwntools doesn't work with python3 :(
from emulator import Emulator
import binascii
def recvuntil(msg, drop=False):
buffer = b""
while msg not in buffer:
buffer += conn.recv(1)
if drop:
buffer = buffer[:-len(msg)]
return buffer
def solve():
recvuntil(b"Byte code in hex : ")
code = recvuntil(b"\n", drop=True).decode("utf-8")
code = binascii.unhexlify(code)
emu = Emulator(code)
emu.run()
expected = emu.STDOUT
recvuntil(b"What is the outut of this programm ?\n")
conn.send(expected)
print(conn.recv(1024).decode('utf-8').strip())
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect(("x32.aperictf.fr", 32323))
for i in range(300):
solve()
print(conn.recv(1024).decode('utf-8').strip())
conn.close()
Flag
python3 solve.py
Test 1/300 : SUCCESS
Test 2/300 : SUCCESS
Test 3/300 : SUCCESS
...
Test 298/300 : SUCCESS
Test 299/300 : SUCCESS
Test 300/300 : SUCCESS
You passed all the tests, here you go :
APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}
APRK{Th4ts_S0m3_c00l_3mul4t10n_y0u_G0t_Th3r3!}
ENOENT