from typing import Annotated, Dict, List, Literal, Optional, Union, ClassVar
from pydantic import BaseModel, Field, RootModel
from pydantic.config import ConfigDict
from pydantic_partial import PartialModelMixin, create_partial_model
'''
We cannot import anything from penguin here as its used to generate the schema
in a self-contained context. If you need things from penguin import them in the
functions that use them.
'''
ENV_MAGIC_VAL = "DYNVALDYNVALDYNVAL"
[docs]
class StrSep(RootModel):
root: str
separator: ClassVar = None
[docs]
@classmethod
def merge_behavior(cls):
return f"Concatenate strings separated by `{repr(cls.separator)}`"
[docs]
def merge(self, other):
return self.root + self.separator + other.root
[docs]
class StrLines(StrSep):
separator = "\n"
[docs]
class StrSepSpace(StrSep):
separator = " "
def _newtype(class_name, type_, title, description=None, default=None, examples=None):
return type(
class_name,
(RootModel,),
dict(
__doc__=description,
model_config=ConfigDict(
title=title,
default=default,
json_schema_extra=examples and dict(examples=examples),
),
__annotations__=dict(root=type_),
),
)
def _variant(discrim_val, title, description, discrim_key, discrim_title, fields):
return type(
discrim_val,
(PartialModelMixin, BaseModel),
dict(
model_config=ConfigDict(title=title, extra="forbid"),
__doc__=description,
__annotations__={
discrim_key: Annotated[
Literal[discrim_val],
Field(title=f"{discrim_title} ({title.lower()})"),
],
}
| {key: Annotated[type, field] for key, type, field in fields},
),
)
def _union(class_name, title, description, discrim_key, discrim_title, variants):
variants = tuple(
_variant(discrim_key=discrim_key, discrim_title=discrim_title, **v)
for v in variants
)
return _newtype(
class_name=class_name,
type_=Annotated[Union[variants], Field(discriminator=discrim_key)],
title=title,
description=description,
)
GDBServerProgramPort = _newtype(class_name="GDBServerProgramPort", type_=int, title="Port")
GDBServerPrograms = _newtype(
class_name="GDBServerPrograms",
type_=dict[str, GDBServerProgramPort],
title="Programs to run through gdbserver",
description=" ".join((
"Mapping between names of programs and ports for gdbserver.",
"When a program in this mapping is run,",
"it will start paused with gdbserver attached, listening on the specified port.",
)),
default=dict(),
examples=[dict(), dict(lighttpd=9999)],
)
[docs]
class Core(PartialModelMixin, BaseModel):
"""Core configuration options for this rehosting"""
model_config = ConfigDict(title="Core configuration options", extra="forbid")
arch: Annotated[
Optional[Literal["armel", "aarch64", "mipsel", "mipseb", "mips64el", "mips64eb", "powerpc", "powerpc64", "powerpc64le", "riscv64", "loongarch64", "intel64"]],
Field(
None,
title="Architecture of guest",
examples=["armel", "aarch64", "mipsel", "mipseb", "mips64el", "mips64eb", "intel64"],
),
]
kernel: Annotated[
Optional[str],
Field(
None,
title="Path to kernel image",
examples=[
"/igloo_static/kernels/zImage.armel",
"/igloo_static/kernels/zImage.arm64",
"/igloo_static/kernels/vmlinux.mipsel",
"/igloo_static/kernels/vmlinux.mipseb",
"/igloo_static/kernels/vmlinux.mips64el",
"/igloo_static/kernels/vmlinux.mips64eb",
],
),
]
fs: Annotated[
Optional[str],
Field(
"./base/fs.tar.gz",
title="Project-relative path to filesystem tarball",
examples=["base/fs.tar.gz"],
),
]
plugin_path: Annotated[
str,
Field("/pyplugins", title="Path to search for PyPlugins", examples=["/pyplugins"]),
]
root_shell: Annotated[
bool,
Field(
False,
title="Enable root shell",
description="Whether to enable a root shell into the guest",
examples=[False, True],
),
]
strace: Annotated[
Union[bool, list[str]],
Field(
False,
title="Enable strace",
description=" ".join((
"If true, run strace for entire system starting from init.",
"If names of programs, enable strace only for those programs.",
)),
examples=[False, True, ["lighttpd"]],
),
]
ltrace: Annotated[
Union[bool, list[str]],
Field(
False,
title="Enable ltrace",
description=" ".join((
"If true, run ltrace for entire system starting from init.",
"If names of programs, enable ltrace only for those programs.",
)),
examples=[False, True, ["lighttpd"]],
),
]
gdbserver: Optional[GDBServerPrograms] = None
force_www: Annotated[
bool,
Field(
False,
title="Try to force webserver start",
description="Whether to try forcing webserver start",
examples=[False, True],
),
]
cpu: Annotated[
Optional[str],
Field(
None,
title="CPU model",
description="Specify non-default QEMU CPU model",
),
]
show_output: Annotated[
bool,
Field(
False,
title="Write serial to stdout",
description="Whether to print QEMU serial output to stdout instead of writing to a log file",
examples=[False, True],
),
]
immutable: Annotated[
bool,
Field(
True,
title="Enable immutable mode",
description="Whether to run the guest filesystem in immutable mode",
examples=[False, True],
),
]
network: Annotated[
bool,
Field(
False,
title="Connect guest to network",
description="Whether to connect the guest to the network",
examples=[False, True],
),
]
shared_dir: Annotated[
Optional[str],
Field(
None,
title="Project-relative path of shared directory",
description="Share this directory as /igloo/shared in the guest.",
examples=["my_shared_directory"],
),
]
version: Annotated[
Literal["1.0.0", 2],
Field(
title="Config format version",
description="Version of the config file format",
),
]
auto_patching: Annotated[
bool,
Field(
True,
title="Enable automatic patching",
description="Whether to automatically apply patches named patch_*.yaml or from patches/*.yaml in the project directory",
examples=[False, True],
),
]
guest_cmd: Annotated[
bool,
Field(
False,
title="Enable running commands in the guest",
description="When enabled, starts the guesthopper daemon in the guest that the host can use to run commands over vsock",
examples=[False, True],
),
]
extra_qemu_args: Annotated[
Optional[StrSepSpace],
Field(
None,
title="Extra QEMU arguments",
description="A list of additional QEMU command-line arguments to use when booting the guest",
examples=["-vnc :0 -vga std -device usb-kbd -device usb-tablet"],
),
]
mem: Annotated[
Optional[str],
Field(
"2G",
title="Panda Memory Value",
description="Allows users to customize memory allocation for guest",
examples=["16K", "512M", "1G", "2G"],
),
]
kernel_quiet: Annotated[
bool,
Field(
True,
title="Whether to include quiet flag in kernel command line",
description="If true, the kernel command line will include the quiet flag, otherwise all kernel boot messages will be printed to the console",
examples=[False, True],
),
]
smp: Annotated[
Optional[int],
Field(
1,
title="Number of CPUs",
description="Number of CPUs to emulate in the guest (Warning: This can break things)",
examples=[1, 2, 4],
),
]
graphics: Annotated[
bool,
Field(
False,
title="Enable graphics",
description="Whether to enable graphics in the guest",
examples=[False, True],
),
]
EnvVal = _newtype(
class_name="EnvVal",
type_=str,
title="Value",
description="Value of the environment variable",
examples=[ENV_MAGIC_VAL],
)
Env = _newtype(
class_name="Env",
type_=dict[str, EnvVal],
title="Environment",
description="Environment variables to set in the guest",
examples=[
dict(
VAR1="VAL1",
VAR2="VAL2",
),
dict(
PATH="/bin:/sbin",
TMPDIR="/tmp",
FOO=ENV_MAGIC_VAL,
),
],
)
NetDevs = Field(
default=[],
title="Network devices",
description="Names for guest network interfaces",
examples=[["eth0", "eth1"], ["ens33", "wlp3s0"]],
)
BlockedSignalsField = Field(
default=[],
title="List of blocked signals",
description="Signals numbers to block within the guest. Supported values are 6 (SIGABRT), 9 (SIGKILL), 15 (SIGTERM), and 17 (SIGCHLD).",
example=[[9], [9, 15]],
)
ConstMapVal = _newtype(
class_name="ConstMapVal",
type_=Union[str, tuple[int], tuple[str]],
title="Data to place in the file at an offset",
description="When this is a list of integers, it treated as a byte array. When this is a list of strings, the strings are separated by null bytes.",
)
_const_map_fields = (
("pad", Union[str, int], Field(0, title="Byte for padding file")),
("size", int, Field(0x10000, title="File size", ge=0)),
("vals", dict[int, ConstMapVal], Field(title="Mapping from file offsets to data")),
)
Read = _union(
class_name="Read",
title="Read",
description="How to handle reads from the file",
discrim_key="model",
discrim_title="Read modelling method",
variants=(
dict(
discrim_val="zero",
title="Read a zero",
description=None,
fields=(),
),
dict(
discrim_val="empty",
title="Read empty file",
description=None,
fields=(),
),
dict(
discrim_val="const_buf",
title="Read a constant buffer",
description=None,
fields=(
(
"val",
str,
Field(title="Pseudofile contents"),
),
),
),
dict(
discrim_val="const_map",
title="Read a constant map",
description=None,
fields=_const_map_fields,
),
dict(
discrim_val="const_map_file",
title="Read a constant map with host file",
description=None,
fields=(
(
"filename",
str,
Field(title="Path to host file to store constant map"),
),
)
+ _const_map_fields,
),
dict(
discrim_val="from_file",
title="Read from a host file",
description=None,
fields=(("filename", str, Field(title="Path to host file")),),
),
dict(
discrim_val="from_plugin",
title="Read from a custom PyPlugin",
description=None,
fields=(
("plugin", str, Field(title="Name of the loaded PyPlugin")),
("function", Optional[str], Field(title="Function to call", default="read")),
),
),
dict(
discrim_val="default",
title="Default",
description=None,
fields=(),
),
),
)
Write = _union(
class_name="Write",
title="Write",
description="How to handle writes to the file",
discrim_key="model",
discrim_title="Write modelling method",
variants=(
dict(
discrim_val="to_file",
title="Write to host file",
description=None,
fields=(("filename", str, Field(title="Path to host file")),),
),
dict(
discrim_val="from_plugin",
title="Read from a custom PyPlugin",
description=None,
fields=(
("plugin", str, Field(title="Name of the loaded PyPlugin")),
("function", Optional[str], Field(title="Function to call", default="write")),
),
),
dict(
discrim_val="discard",
title="Discard write",
description=None,
fields=(),
),
dict(
discrim_val="default",
title="Default",
description=None,
fields=(),
),
),
)
IoctlCommand = _union(
class_name="IoctlCommand",
title="Ioctl",
description=None,
discrim_key="model",
discrim_title="ioctl modelling method",
variants=(
dict(
discrim_val="return_const",
title="Return a constant",
description=None,
fields=(("val", int, Field(title="Constant to return")),),
),
dict(
discrim_val="symex",
title="Symbolic execution",
description=None,
fields=(),
),
dict(
discrim_val="from_plugin",
title="ioctl from a custom PyPlugin",
description=None,
fields=(
("plugin", str, Field(title="Name of the loaded PyPlugin")),
("function", Optional[str], Field(title="Function to call", default="ioctl")),
),
)
),
)
Star = Literal["*"]
Ioctls = _newtype(
class_name="Ioctls",
type_=Dict[Union[int, Star], IoctlCommand],
title="ioctl",
description="How to handle ioctl() calls",
default=dict(),
examples=[
{
"*": dict(
model="return_const",
val=0,
),
"1000": dict(
model="return_const",
val=5,
),
},
{
"*": dict(
model="return_const",
),
},
{
"model": "from_plugin",
"plugin": "my_plugin",
"function": "ioctl_handler",
},
],
)
[docs]
class Pseudofile(PartialModelMixin, BaseModel):
"""How to emulate a device file"""
model_config = ConfigDict(title="File emulation spec", extra="forbid")
name: Annotated[
Optional[str],
Field(
None,
title="MTD name",
description="Name of an MTD device (ignored for non-mtd)",
examples=["flash", "uboot"],
),
]
size: Annotated[
Optional[int],
Field(
None,
title="File size",
description="Size of the pseudofile to be reported by stat(). This must be specified for mmap() on the pseudofile to work.",
examples=[1, 0x1000],
),
]
read: Optional[Read] = None
write: Optional[Write] = None
ioctl: Optional[Ioctls] = None
Pseudofiles = _newtype(
class_name="Pseudofiles",
type_=dict[str, Pseudofile],
title="Pseudo-files",
description="Device files to emulate in the guest",
)
Patches = _newtype(
class_name="Patches",
type_=list[str],
title="Patches",
description="List of paths to patch files",
)
NVRAM = _newtype(
class_name="NVRAM",
type_=dict[
str,
_newtype(class_name="NVRAMVal", type_=Union[str, int], title="NVRAM value"),
],
title="NVRAM",
description="NVRAM values to add to the guest",
default=dict(),
)
UBootEnv = _newtype(
class_name="UBootEnv",
type_=dict[
str,
_newtype(
class_name="UBootEnvVal",
type_=str,
title="Value",
description="Value of the U-Boot environment variable",
),
],
title="U-Boot environment",
description="U-Boot environment variables to set in the guest",
default=dict(),
)
LibInjectAliasTarget = _newtype(
class_name="LibInjectAliasTarget",
type_=str,
title="Injected library alias target",
description="This is the name of the target function that the alias points to.",
examples=["nvram_init", "true", "false"],
)
LibInjectAliases = _newtype(
class_name="LibInjectAliases",
type_=dict[str, LibInjectAliasTarget],
title="Injected library aliases",
description="Mapping between names of external library functions and names of functions defined in the injected library. This allows replacing arbitrary library functions with your own code.",
default=dict(),
examples=[
dict(fputs="false", nvram_load="nvram_init"),
],
)
[docs]
class LibInject(PartialModelMixin, BaseModel):
"""Library functions to be intercepted"""
model_config = ConfigDict(title="Injected library configuration", extra="forbid")
aliases: Annotated[
Optional[LibInjectAliases],
Field(
None,
title="Function names to alias to existing library function shims",
descriptions="Mapping between new names (e.g., my_nvram_get) and existing library function shims (e.g., nvram_get)",
),
]
extra: Annotated[
Optional[StrLines],
Field(
None,
title="Extra injected library code",
description="Custom source code for library functions to intercept and model",
),
]
StaticFileAction = _union(
class_name="StaticFileAction",
title="Static filesystem action",
description=None,
discrim_key="type",
discrim_title="Type of file action",
variants=(
dict(
discrim_val="inline_file",
title="Add inline file",
description="Add a file with contents specified inline in this config",
fields=(
("mode", int, Field(title="Permissions of file")),
("contents", str, Field(title="Contents of file")),
),
),
dict(
discrim_val="host_file",
title="Copy host file",
description="Copy a file from the host into the guest",
fields=(
("mode", int, Field(title="Permissions of file")),
("host_path", str, Field(title="Host path")),
),
),
dict(
discrim_val="dir",
title="Add directory",
description=None,
fields=(("mode", int, Field(title="Permissions of directory")),),
),
dict(
discrim_val="symlink",
title="Add symbolic link",
description=None,
fields=(("target", str, Field(title="Target linked path")),),
),
dict(
discrim_val="dev",
title="Add device file",
description=None,
fields=(
(
"devtype",
Literal["char", "block"],
Field(title="Type of device file"),
),
("major", int, Field(title="Major device number")),
("minor", int, Field(title="Minor device number")),
(
"mode",
int,
Field(title="Permissions of device file"),
),
),
),
dict(
discrim_val="delete",
title="Delete file",
description=None,
fields=(),
),
dict(
discrim_val="move",
title="Move file",
description=None,
fields=(
(
"from",
str,
Field(title="File to be moved to the specified location"),
),
("mode", Optional[int], Field(title="Permissions of target file", default=None)),
),
),
dict(
discrim_val="shim",
title="Shim file",
description=None,
fields=(
(
"target",
str,
Field(title="Target file we want the shim to be symlinked to"),
),
),
),
dict(
discrim_val="binary_patch",
title="Patch binary file",
description="Make a patch to a binary file at the specified offset. This can either be arbitrary bytes specified as a hex string, or assembly code that will be automatically assembled in the specified mode.",
fields=(
(
"file_offset",
int,
Field(title="File offset (integer)"),
),
(
"hex_bytes",
Optional[str],
Field(
default=None,
title="Bytes to write at offset (hex string)",
examples=["DEADBEEF", "90 90"],
),
),
(
"asm",
Optional[str],
Field(
default=None,
title="Assembly code to write at offset (runs through keystone)",
examples=["nop", "mov r0, #0xdeadbeef"],
),
),
(
"mode",
Optional[str],
Field(
default=None,
title="Assembly mode",
examples=["arm", "thumb"],
),
),
)
),
),
)
[docs]
class StaticFiles(RootModel):
"""Files to create in the guest filesystem"""
root: dict[str, StaticFileAction]
model_config = ConfigDict(
title="Static files",
json_schema_extra=dict(
examples=[
{},
{
"/path/to/file": dict(
type="file",
contents="Hello world!",
)
},
{
"/path/to/symlink/source": dict(
type="symlink",
target="/path/to/symlink/target",
)
},
{
"/dev/some_device": dict(
type="dev",
devtype="char",
major=1,
minor=2,
mode=0o666,
)
},
]
),
)
[docs]
class Plugin(PartialModelMixin, BaseModel):
model_config = ConfigDict(title="Plugin", extra="allow")
description: Annotated[Optional[str], Field(None, title="Plugin description")]
depends_on: Annotated[Optional[str], Field(None, title="Plugin dependency")]
enabled: Annotated[
bool,
Field(True, title="Enable this plugin (default depends on plugin)"),
]
version: Annotated[Optional[str], Field(None, title="Plugin version")]
[docs]
class ExternalNetwork(PartialModelMixin, BaseModel):
"""Configuration for NAT for external connections"""
model_config = ConfigDict(title="Set up NAT for outgoing connections", extra="forbid")
mac: Optional[str] = Field(
title="MAC Address for external interface",
default="52:54:00:12:34:56",
description="MAC Address for external network interface"
)
# Not supported until QEMU 4.0+
# net: Optional[str] = Field(
# default="10.0.2.0/24",
# description="Net for external interface (e.g., 10.0.2.0/24). Host will accessible via .2"
# )
pcap: Optional[bool] = Field(
title="pcap file name",
default=None,
description="Whether to capture traffic over the external net in a pcap file. The file will be called 'ext.pcap' in the output directory. Capture disabled if unset."
)
[docs]
class Network(PartialModelMixin, BaseModel):
"""Configuration for networks to attach to guest"""
model_config = ConfigDict(title="Network Configuration", extra="forbid")
external: ExternalNetwork = Field(default_factory=ExternalNetwork)
[docs]
class Main(PartialModelMixin, BaseModel):
"""Configuration file for config-file-based rehosting with IGLOO"""
model_config = ConfigDict(title="Penguin Configuration", extra="forbid")
core: Core
patches: Optional[Patches] = None
env: Env
pseudofiles: Pseudofiles
nvram: NVRAM
netdevs: List[str] = NetDevs
uboot_env: Optional[UBootEnv] = None
blocked_signals: List[int] = BlockedSignalsField
lib_inject: LibInject
static_files: StaticFiles
plugins: Annotated[dict[str, Plugin], Field(title="Plugins")]
network: Optional[Network] = None
Patch = create_partial_model(Main, recursive=True)