"""
Pseudofiles Plugin
==================
This plugin creates, manages, and suggests new pseudofiles.
It is configurable via the Penguin project config file.
Purpose
-------
- Registers all pseudofile models as hyperfiles.
- Handles read, write, and ioctl commands issued to pseudofiles.
- Monitors for access attempts to missing files and suggests adding them as pseudofiles.
Usage
-----
Place pseudofile models in the Penguin project config.
Example
-------
/dev/mydevice:
read:
model: zero
write:
model: discard
ioctl:
'*':
model: return_const
val: 0
"""
import logging
import re
from os.path import dirname, isfile, isabs
from os.path import join as pjoin
from sys import path as syspath
from penguin import Plugin, plugins, yaml
syspath.append(dirname(__file__))
KNOWN_PATHS = [
"/dev/",
"/dev/pts",
"/sys",
"/proc",
"/run",
# Directories not in static FS that are added by igloo_init (mostly
# irrelevant with wrong prefixes)
"/tmp",
"/dev/ttyS0",
"/dev/console",
"/dev/root",
"/dev/ram",
"/dev/ram0" # We set these up in our init script, common device types
"/dev/null",
"/dev/zero",
"/dev/random",
"/dev/urandom", # Standard devices we should see in devtmpfs
# TODO: pull in devices like how we do during static analysis (e.g.,
# /resources/proc_sys.txt)
"/proc/penguin_net",
]
# Missing files go into our first log
outfile_missing = "pseudofiles_failures.yaml"
# Files we're modeling go into the second. Particularly useful for defaults
outfile_models = "pseudofiles_modeled.yaml"
MAGIC_SYMEX_RETVAL = 999
[docs]
def path_interesting(path):
"""
Determines if a path is likely to contain a good candidate for pseudofiles.
Parameters
----------
path : str
File path.
Returns
-------
bool
"""
if "/pipe:[" in path:
return False
if "\\" in path:
# non-printable chars get escaped somewhere
# These are junk
return False
if path.startswith("/dev/"):
return True
if path.startswith("/proc/"):
return True
if path.startswith("/sys/"):
return True
return False
[docs]
def proc_interesting(path):
"""
Determines if a process is relevant to overall rehosting.
Parameters
----------
path : str
File path.
Returns
-------
bool
"""
# Avoid standard procfs files
# Transformed PID references
if path.startswith("/proc/PID"):
return False
if path.startswith("/proc/self"):
return False
if path.startswith("/proc/thread-self"):
return False
return path.startswith("/proc/")
[docs]
def ignore_cmd(ioctl):
"""
Ignore TTY ioctls, see ioctls.h for T*, TC*, and TIO* ioctls
Parameters
----------
ioctl : int
Ioctl command number.
Returns
-------
bool
"""
if ioctl >= 0x5400 and ioctl <= 0x54FF:
return True
return False
[docs]
def ignore_ioctl_path(path):
"""
Filter out ioctl paths that are irrelevant to rehosting.
Parameters
----------
path : str
File path.
Returns
-------
bool
"""
# Paths we don't care about:
# /firmadyne/libnvram anything - this reveals the nvram values read though
# socket:{RAW,UDP,TCP,...}
# /proc/*/{mounts,stat,cmdline} - boring?
if path.startswith("/firmadyne/libnvram"):
return True
if path.startswith("/proc/"):
return True
if path.startswith("socket:"):
# XXX We do want to log socket failures and eventually model them!
return True
if "/pipe:[" in path:
return True
return False
# Closure so we can pass details through
[docs]
def make_rwif(details, fn_ref):
def rwif(*args):
return fn_ref(*args, details)
return rwif
[docs]
def get_total_counts(d):
"""
Get the sum of all "count" values of a nested dictionary
Parameters
----------
d : dict
Dictionary with values to count.
Returns
-------
int
"""
return (
(
d["count"]
if "count" in d else
sum(map(get_total_counts, d.values()))
)
if isinstance(d, dict) else 0
)
[docs]
def sort_file_failures(d):
"""
Get a sorted version of the file failures dictionary.
Parameters
----------
d : dict
Dictionary to sort.
Returns
-------
dict
"""
# This relies on dict iteration being the same as insertion order,
# which is an implementation detail in CPython,
# but OrderedDict is harder to serialize with pyyaml.
return (
dict(
sorted(
((k, sort_file_failures(v)) for k, v in d.items()),
key=lambda pair: get_total_counts(pair[1]),
reverse=True,
)
)
if isinstance(d, dict) else d
)
[docs]
class Pseudofiles(Plugin):
"""
Pseudofiles Plugin
==================
Creates and manages pseudofiles and guest interactions with them.
Also suggests new paths to add to modeled pseudofiles.
Attributes
----------
outdir : str
Output directory for logs.
proj_dir : str
Project directory, used to find host files.
conf : str
Penguin project config.
verbose : bool
If True, enable verbose logger output.
logging : str
Determines which log output files will be generated.
Can be 'all', 'modeled', 'missing', or 'none'.
Behavior
--------
- Create hyperfile to replace or create a new file in the guest.
- Subscribe to all interactions with that hyperfile, and use configuration models
for read, write, and ioctl commands.
"""
def __init__(self):
"""
Initialize pseudofiles plugin.
Creates hyperfiles of all modeled pseudofiles and creates log files.
Returns
-------
None
"""
self.outdir = self.get_arg("outdir")
self.proj_dir = self.get_arg("proj_dir")
self.written_data = {} # filename -> data that was written to it
if self.get_arg(
"conf") is None or "pseudofiles" not in self.get_arg("conf"):
raise ValueError(
"No 'pseudofiles' in config: {self.get_arg('conf')}")
self.config = self.get_arg("conf")
if self.get_arg_bool("verbose"):
self.logger.setLevel(logging.DEBUG)
self.logging_enabled = self.get_arg("logging")
if self.logging_enabled is None:
self.logging_enabled = "all" # Default is all logging on
if "all" in self.logging_enabled or "missing" in self.logging_enabled:
self.log_missing = True
else:
self.log_missing = False
self.logger.info(f"logging missing pseudofiles: {self.log_missing}")
if "all" in self.logging_enabled or "modeled" in self.logging_enabled:
self.log_modeled = True
else:
self.log_modeled = False
self.logger.info(f"logging modeled pseudofiles: {self.log_modeled}")
self.did_mtd_warn = False # Set if we've warned about misconfigured MTD devices
# XXX: It has seemed like this should be 1 for some architectures, but
# that can't be right?
self.ENOENT = 2
self.warned = set() # Syscalls we don't know about that we've seen
# We track when processes try accessing or IOCTLing on missing files
# here:
self.file_failures = (
{}
) # path: {event: {count: X}}. Event is like open/read/ioctl/stat/lstat.
self.devfs = []
self.procfs = []
self.sysfs = []
# self.last_symex = None
self.warned = set()
self.need_ioctl_hooks = False
self.hf_config = self.populate_hf_config()
self.logger.debug("Registered pseudofiles:")
for filename, details in self.hf_config.items():
self.logger.debug(f" {filename}")
# filename -> {read: model, write: model, ioctls: model}
# Coordinates with hyperfile for modeling behavior!
# Can we just pass our config straight over and load both?
# Need to implement read, write, and IOCTLs
# IOCTLs with symex gets scary, others are easy though?
from hyperfile import HyperFile
if self.log_modeled:
plugins.load(
HyperFile,
{
"models": self.hf_config,
"log_file": pjoin(self.outdir, outfile_models),
"logger": self.logger,
},
)
else:
plugins.load(
HyperFile,
{
"models": self.hf_config,
"logger": self.logger,
},
)
# Clear results file - we'll update it as we go
if self.log_missing:
self.dump_results()
plugins.subscribe(plugins.Events, "igloo_hyp_enoent", self.hyp_enoent)
# Open/openat is a special case with hypercalls helping us out
# because openat requires guest introspection to resolve the dfd, but we just
# did it in the kernel
plugins.subscribe(plugins.Events, "igloo_open", self.fail_detect_opens)
plugins.subscribe(
plugins.Events,
"igloo_ioctl",
self.fail_detect_ioctl)
# On ioctl return we might want to start symex. We detect failures with
# a special handler though
if self.need_ioctl_hooks:
plugins.syscalls.syscall("on_sys_ioctl_return")(
self.symex_ioctl_return)
[docs]
def gen_hyperfile_function(self, filename, details, ftype):
"""
Generate correct hyperfile handler.
Parameters
----------
filename : str
Pseudofile path.
details : dict
Additional pseudofile model information.
ftype : str
Pseudofile type.
Returns
-------
function
"""
if ftype not in details or "model" not in details[ftype]:
model = "default" # default is default
else:
model = details[ftype]["model"]
if hasattr(self, f"{ftype}_{model}"):
# Have a model specified
fn = getattr(self, f"{ftype}_{model}")
elif model == "from_plugin":
plugin_name = details[ftype]["plugin"]
plugin = getattr(plugins, plugin_name)
func = details[ftype].get("function", ftype)
if hasattr(plugin, func):
fn = getattr(plugin, func)
else:
raise ValueError(
f"Hyperfile {filename} depends on plugin {plugin} which does not have function {func}")
else:
if ftype == "ioctl":
guess = {"pseudofiles": {filename: {"*": details}}}
raise ValueError(
f"Invalid ioctl settings. Must specify ioctl number (or '*') within ioctl dictionary, then map each to a model. Did you mean: {guess}"
)
raise ValueError(
f"Unsupported hyperfile {ftype}_{model} for {filename}: {details[ftype] if ftype in details else None}"
)
return make_rwif(
details[ftype] if ftype in details else {}, fn
)
[docs]
def populate_hf_config(self):
"""
Populate the hyperfile config dictionary.
Returns
-------
None
"""
# XXX We need this import in here, otherwise when we load psueodfiles with panda.load_plugin /path/to/pseudofiles.py
# it sees both FileFailures AND HyperFile. But we only want hyperfile to be loaded by us here, not by our caller.
# we are not currently using HYPER_WRITE so we do not import it
from hyper.consts import hyperfs_file_ops as fops
from hyperfile import (HyperFile, hyper)
HYP_IOCTL = fops.HYP_IOCTL
HYP_READ = fops.HYP_READ
hf_config = {}
for filename, details in self.config["pseudofiles"].items():
if "logging" in filename:
continue
hf_config[filename] = {}
for targ, prefix in [
(self.devfs, "/dev/"),
(self.procfs, "/proc/"),
(self.sysfs, "/sys/"),
]:
if filename.startswith(prefix):
targ.append(filename[len(prefix):])
hf_config[filename]["size"] = details.get("size", 0)
# Check if any details with non /dev/mtd names has a 'name'
# property
if not filename.startswith("/dev/mtd") and "name" in details:
raise ValueError(
"Pseudofiles: name property can only be set for MTD devices"
)
if filename.startswith("/dev/mtd") and "name" not in details:
raise ValueError(
"Pseudofiles: name property must be set for MTD devices"
)
for ftype in "read", "write", "ioctl":
hf_config[filename][hyper(ftype)] = self.gen_hyperfile_function(
filename, details, ftype)
if (
ftype == "ioctl"
and ftype in details
and "model" not in details[ftype]
and any([x["model"] == "symex" for x in details[ftype].values()])
):
# If we have a symex model we'll need to enable some extra
# introspection
self.need_ioctl_hooks = True
hf_config["/proc/mtd"] = {
# Note we don't use our make_rwif closure helper here because these
# are static
HYP_READ: self.proc_mtd_check,
HYP_IOCTL: HyperFile.ioctl_unhandled,
"size": 0,
}
return hf_config
[docs]
def symex_ioctl_return(self, regs, proto, syscall, fd, cmd, arg):
"""
Replace ioctl return value to signal that we should start symbolic execution.
Parameters
----------
regs : PtRegsWrapper
CPU registers.
proto : Any
Protocol or plugin-specific context.
syscall : int
Syscall number.
fd : int
File descriptor.
cmd : int
Ioctl command number.
arg : int
Optional additional pointer to a buffer in memory.
Returns
-------
None
"""
# We'll return -999 as a magic placeholder value that indicates we should
# Start symex. Is this a terrible hack. You betcha!
rv = syscall.retval
if rv != MAGIC_SYMEX_RETVAL:
return
if not hasattr(self, "symex"):
# Initialize symex on first use
from symex import PathExpIoctl
self.symex = PathExpIoctl(self.outdir, self.config["core"]["fs"])
# Look through our config and find the filename with a symex model
# XXX: This is a bit of a hack - we're assuming we only have one symex
# model
filename = None
for fname, file_model in self.config["pseudofiles"].items():
if "ioctl" in file_model:
for _, model in file_model["ioctl"].items():
if model["model"] == "symex":
filename = fname
break
if filename is None:
raise ValueError(
"No filename with symex model found in config, but we got a symex ioctl. Unexpected"
)
cpu = self.panda.get_cpu()
# It's time to launch symex!
self.symex.do_symex(self.panda, cpu, syscall.pc, filename, cmd)
# We write down the "failure" so we can see that it happened (and know to query symex
# to get results)
self.log_ioctl_failure(filename, cmd)
# set retval to 0 with no error.
syscall.retval = 0
[docs]
def hyp_enoent(self, cpu, file):
"""
Log files that return ENOENT.
Parameters
----------
cpu : Any
CPU context from PANDA.
file : str
File path of hyperfile.
Returns
-------
None
"""
if any(file.startswith(x) for x in ("/dev/", "/proc/", "/sys/")):
self.centralized_log(file, "syscall")
#######################################
[docs]
def centralized_log(self, path, event, event_details=None):
"""
Log potential pseudofile candidates.
Parameters
----------
path : str
File path.
event : str
Event which triggered the file to be found as missing or interesting.
event_details : Any
Additional context, defaults to None.
Returns
-------
None
"""
# Log a failure to open a given path if it's interesting
# We just track count
if not path_interesting(path):
return
if path.startswith("/proc/"):
# replace /proc/<pid> with /proc/<PID> to avoid a ton of different
# paths
path = re.sub(r"/proc/\d+", "/proc/PID", path)
if path not in self.file_failures:
self.file_failures[path] = {}
if event not in self.file_failures[path]:
self.file_failures[path][event] = {"count": 0}
if "count" not in self.file_failures[path][event]:
# If we ioctl'd before opening, we'll have a count-free entry
self.file_failures[path][event]["count"] = 0
self.file_failures[path][event]["count"] += 1
if event_details is not None:
if "details" not in self.file_failures[path][event]:
self.file_failures[path][event]["details"] = []
self.file_failures[path][event]["details"].append(event_details)
[docs]
def proc_mtd_check(self, filename, buffer, length, offset, details=None):
"""
The guest is reading /proc/mtd. We should populate this file
dynamically based on the /dev/mtd* devices we've set up.
These devices have a name in addition to other properties:
/dev/mtd0:
name: mymtdname
read:
model: return_const
buf: "foo"
Parameters
----------
filename : str
Path to mtd file.
buffer : int
Pointer to buffer.
length : int
Length of buffer.
offset : int
Offset into buffer.
details : Any
Additional context.
Returns
-------
tuple
"""
assert filename == "/proc/mtd"
# For each device in our config that's /dev/mtdX, we'll add a line to the buffer
# Buffer size is limited to 512 in kernel for now.
buf = ""
did_warn = False
for filename, details in self.config["pseudofiles"].items():
if not filename.startswith("/dev/mtd"):
continue
idx = filename.split("/dev/mtd")[1]
if idx.startswith("/"): # i.e., /dev/mtd/0 -> 0
idx = idx[1:]
if not idx.isdigit():
if not self.did_mtd_warn:
did_warn = True
self.logger.warning(
f"Mtd device {filename} is non-numeric. Skipping in /proc/mtd report"
)
continue
if "name" not in details:
if not self.did_mtd_warn:
did_warn = True
self.logger.warning(
f"Mtd device {filename} has no name. Skipping in /proc/mtd report"
)
continue
buf += 'mtd{}: {:08x} {:08x} "{}"\n'.format(
int(idx), 0x1000000, 0x20000, details["name"]
)
if did_warn:
self.did_mtd_warn = True
buf = buf[offset: offset + length].encode()
if len(buf) == 0:
with open(pjoin(self.outdir, "pseudofiles_proc_mtd.txt"), "w") as f:
f.write("/proc/mtd was read with no devices in config")
# The guest read /proc/mtd, but we didn't have anything set up in it! Perhaps
# it's looking for a device of a specific name - potential failure we can mitigate
# self.file_failures['/proc/mtd'] = {'read': {'count': 1, 'details': 'special: no mtd devices in pseudofiles'}}
return (buf, len(buf))
[docs]
def fail_detect_ioctl(self, cpu, fname, cmd):
"""
Detect a failed ioctl call via return value.
Parameters
----------
cpu : Any
CPU context from PANDA.
fname : str
File path to device.
cmd : int
Command number of ioctl.
Returns
-------
None
"""
# A regular (non-dyndev) device was ioctl'd and is returning -ENOTTY so
# our hypercall triggers
self.log_ioctl_failure(fname, cmd)
[docs]
def fail_detect_opens(self, cpu, fname, fd):
"""
Triggers on failed open calls.
Parameters
----------
cpu : Any
CPU context from PANDA.
fname : str
File path.
fd : int
File descriptor.
Returns
-------
None
"""
fd = self.panda.from_unsigned_guest(fd)
if fd == -self.ENOENT:
# enoent let's gooooo
self.centralized_log(fname, "open")
[docs]
def log_ioctl_failure(self, path, cmd):
"""
Format and write ioctl failures to logs.
Parameters
----------
path : str
File path of device with ioctl failure.
cmd : int
Command number of ioctl that failed.
Returns
-------
None
"""
# This might trigger twice, depending on the -ENOTTY path
# between our dyndev ioctl handler and do_vfs_ioctl?
if ignore_ioctl_path(path) or ignore_cmd(cmd):
# Uninteresting
return
if path not in self.file_failures:
self.file_failures[path] = {}
if "ioctl" not in self.file_failures[path]:
self.file_failures[path]["ioctl"] = {}
first = False
if cmd not in self.file_failures[path]["ioctl"]:
self.file_failures[path]["ioctl"][cmd] = {"count": 0}
first = True
self.file_failures[path]["ioctl"][cmd]["count"] += 1
if first:
# The first time we see an IOCTL update our results on disk
# This is just relevant if someone's watching the output during a run
# final results are always written at the end.
if self.log_missing:
self.dump_results()
self.logger.debug(f"New ioctl failure observed: {cmd:x} on {path}")
[docs]
def read_zero(self, filename, buffer, length, offset, details=None):
"""
Simple peripheral model inspired by firmadyne/firmae. Just return 0.
If we've seen a write to this device, mix that data in with 0s
padding around it.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to fill with 0s.
length : int
Length of read.
offset : int
Offset into device being read from.
details : Any
Additional device context.
Returns
-------
tuple
"""
data = b"0"
if filename in self.written_data:
data = self.written_data[filename]
final_data = data[offset: offset + length]
# XXX if offset > len(data) should we return an error instead of 0?
return (final_data, len(final_data)) # data, rv
[docs]
def read_one(self, filename, buffer, length, offset, details=None):
"""
Simple peripheral model to return a read of '1'.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device being read from.
details : Any
Additional device context.
Returns
-------
tuple
"""
data = b"1"
if filename in self.written_data:
data = self.written_data[filename]
final_data = data[offset: offset + length]
# XXX if offset > len(data) should we return an error instead of 0?
return (final_data, len(final_data)) # data, rv
[docs]
def read_empty(self, filename, buffer, length, offset, details=None):
"""
Simple peripheral model to return an empty buffer.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device being read from.
details : Any
Additional device context.
Returns
-------
tuple
"""
data = b""
# XXX if offset > len(data) should we return an error instead of 0?
return (data, 0) # data, rv
[docs]
def read_const_buf(self, filename, buffer, length, offset, details=None):
"""
Simple peripheral model to return a constant buffer.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device being read from.
details : Any
Additional device context, including buffer contents.
Returns
-------
tuple
"""
data = details["val"].encode() + b"\x00" # Null terminate?
final_data = data[offset: offset + length]
# XXX if offset > len(data) should we return an error instead of 0?
if offset > len(data):
return (b"", 0) # -EINVAL
return (final_data, len(final_data)) # data, rv
def _render_file(self, details):
"""
Combine data mapping and return buffer.
Parameters
----------
details : Any
Device model details from config.
Returns
-------
bytestr
"""
# Given offset: data mapping plus a pad, we
# combine to return a buffer
pad = b"\x00"
if "pad" in details:
if isinstance(details["pad"], str):
pad = details["pad"].encode()
elif isinstance(details["pad"], int):
pad = bytes([details["pad"]])
else:
raise ValueError("const_map: pad value must be string or int")
size = details["size"] if "size" in details else 0x10000
vals = details["vals"]
# sort vals dict by key, lowest to highest
vals = {
k: v for k,
v in sorted(
vals.items(),
key=lambda item: item[0])}
# now we flatten. For each offset, val pair
# Need to grab first offset, then pad to that
data = b"" # pad * (list(vals.keys())[0] if len(vals.keys()) else 0)
for off, val in vals.items():
# We have offset: value where value
# may be a string, a list of ints (for non-printable chars)
# or a list of strings to be joined by null terminators
if isinstance(val, str):
val = val.encode()
elif isinstance(val, list):
if not len(val):
continue # Wat?
# All shoudl be same type. Could support a list of lists e.g.,
# ["key=val", [0x41, 0x00, 0x42], ...]?
first_val = val[0]
for this_val in val[1:]:
if not isinstance(this_val, type(first_val)):
raise ValueError(
f"Need matching vals but we have {this_val} and {first_val}"
)
if isinstance(first_val, int):
# We have a list of ints - these are non-printable chars
val = bytes(val)
elif isinstance(first_val, str):
# Join this list with null bytes
val = b"\x00".join([x.encode() for x in val])
else:
raise ValueError(
"_render_file: vals must be strings lists of ints/strings"
)
# Pad before this value, then add the value
data += pad * (off - len(data)) + val
# Finally pad up to size
assert len(
data) <= size, f"Data is too long: {len(data)} > size {size}"
data += pad * (size - len(data))
return data
[docs]
def read_const_map(self, filename, buffer, length, offset, details=None):
"""
Read data and returns tuple of buffer and size.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device file.
details : Any
Additional device context.
Returns
-------
tuple
"""
data = self._render_file(details)
final_data = data[offset: offset + length]
if offset > len(data):
return (b"", 0) # No data, no bytes read
return (final_data, len(final_data)) # data, length
[docs]
def read_const_map_file(self, filename, buffer,
length, offset, details=None):
"""
Create a file on the host using the specified pad, size, vals
When we read from the guest, we read from the host file.
Parameters
----------
filename : str
File path of device.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device.
details : Any
Additional device context.
Returns
-------
tuple
"""
# Create a file on the host using the specified pad, size, vals
# When we read from the guest, we read from the host file.
hostfile = details["filename"]
if not isabs(hostfile):
# Paths are relative to the project directory, unless absolute
hostfile = pjoin(self.proj_dir, hostfile)
# Create initial host file
if not isfile(hostfile):
data = self._render_file(details)
# Create initial file
with open(hostfile, "wb") as f:
f.write(data)
# Read from host file
with open(hostfile, "rb") as f:
f.seek(offset)
final_data = f.read(length)
return (final_data, len(final_data)) # data, length
[docs]
def read_from_file(self, filename, buffer, length, offset, details=None):
"""
Read from host file.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device.
details : Any
Additional device context.
Returns
-------
tuple
"""
self.logger.debug(
f"Reading {filename} with {length} bytes at {offset}:")
fname = details["filename"] # Host file
if not isabs(fname):
# Paths are relative to the project directory, unless absolute
fname = pjoin(self.proj_dir, fname)
with open(fname, "rb") as f:
f.seek(offset)
data = f.read(length)
return (data, len(data))
[docs]
def write_to_file(self, filename, buffer, length,
offset, contents, details=None):
"""
Write to host file.
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to read from.
length : int
Length of read.
offset : int
Offset into device.
details : Any
Additional device context.
Returns
-------
tuple
"""
fname = details["filename"] # Host file
if not isabs(fname):
# Paths are relative to the project directory, unless absolute
fname = pjoin(self.proj_dir, fname)
self.logger.debug(
f"Writing {fname} with {length} bytes at {offset}: {contents[:100]}"
)
with open(fname, "r+b") as f:
f.seek(offset)
f.write(contents)
return length
[docs]
def write_discard(self, filename, buffer, length,
offset, contents, details=None):
"""
TODO: make this actually discard - not sure where it's used right now
and default is a better model in general
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to write to.
length : int
Length of read.
offset : int
Offset into device.
details : Any
Additional device context.
Returns
-------
int
"""
return self.write_default(
filename, buffer, length, offset, contents, details)
[docs]
def write_default(self, filename, buffer, length,
offset, contents, details=None):
"""
Store the contents for this file
print(f"{filename} writes {length} bytes at {offset}: {contents[:100]}")
Parameters
----------
filename : str
File path of pseudofile/peripheral.
buffer : int
Pointer to buffer to read from.
length : int
Length of read.
offset : int
Offset into device.
details : Any
Additional device context.
Returns
-------
int
"""
if filename not in self.written_data:
self.written_data[filename] = b""
# Seek to offset and write contents
previous = self.written_data[filename][:offset]
if len(previous) < offset:
# Pad with null bytes
previous += b"\x00" * (offset - len(previous))
self.written_data[filename] = (
previous
+ contents
+ (
self.written_data[filename][offset + length:]
if len(self.written_data[filename]) > offset + length
else b""
)
)
return length
# XXX on write we can allow and store by default. Or should we explicitly error and require a model?
# def write_default(self, filename, buffer, length, offset, contents, details=None):
# self.centralized_log(filename, 'write')
# return -22 # -EINVAL - we don't support writes
# default models - log failures
[docs]
def read_default(self, filename, buffer, length, offset, details=None):
self.centralized_log(filename, "read")
return (b"", -22) # -EINVAL - we don't support reads
# IOCTL is more complicated than read/write.
# default is a bit of a misnomer, it's our default ioctl handler which
# implements default behavior (i.e., error) on issue of unspecified ioctls,
# but implements what it's told for others
[docs]
def ioctl_default(self, filename, cmd, arg, ioctl_details):
"""
Given a cmd and arg, return a value.
Parameters
----------
filename : str
Device path.
cmd : int
Ioctl command number.
arg : str
Optional pointer to buffer.
ioctl_details : dict
Dictionary structure is cmd -> {'model': 'return_const'|'symex'|'from_plugin',
'val': X}
Returns
-------
int
"""
# Try to use cmd as our key, but '*' is a fallback
# is_wildcard = False
if cmd in ioctl_details:
cmd_details = ioctl_details[cmd]
elif "*" in ioctl_details:
cmd_details = ioctl_details["*"]
# is_wildcard = True
else:
self.log_ioctl_failure(filename, cmd)
return -25 # -ENOTTY
model = cmd_details["model"]
if model == "return_const":
rv = cmd_details["val"]
return rv
elif model == "symex":
# Symex is tricky and different from normal models.
# First off, these models need to specify a 'val' just like any other
# for us to use after (and, to be honest, during) symex.
# JK: we're gonna always use 0 when doing symex!
# if self.last_symex:
# We could be smart and encode info in our retval
# or do something else. I don't think we want to fully
# ignore? But we probably could?
# raise NotImplementedError("Uhhhh nested symex")
# self.last_symex = filename
# We'll detect this on the return and know what to do. I think?
return MAGIC_SYMEX_RETVAL
elif model == "from_plugin":
plugin_name = cmd_details["plugin"]
plugin = getattr(plugins, plugin_name)
func = cmd_details.get("function", "ioctl")
if hasattr(plugin, func):
fn = getattr(plugin, func)
else:
raise ValueError(
f"Hyperfile {filename} depends on plugin {plugin} which does not have function {func}")
return fn(filename, cmd, arg, cmd_details)
else:
# This is an actual error - config is malformed. Bail
raise ValueError(f"Unsupported ioctl model {model} for cmd {cmd}")
# return -25 # -ENOTTY
[docs]
def dump_results(self):
"""
Dump all file failures to disk as yaml.
Returns
-------
None
"""
with open(pjoin(self.outdir, outfile_missing), "w") as f:
out = sort_file_failures(self.file_failures)
yaml.dump(out, f, sort_keys=False)
if hasattr(self, "symex"):
# Need to tell symex to export results as well
self.symex.save_results()
[docs]
def uninit(self):
"""
Uninitialize plugin and update logs for a final time.
Returns
-------
None
"""
if self.log_missing:
self.dump_results()