"""
LiveImage Plugin
================
Generates a guest filesystem live by creating a setup script that runs on boot.
This version uses a tarball for file deployment and a single, batched hypercall for patching.
Overview
--------
- Stages files and directories for deployment using a tarball.
- Supports various file actions: delete, move, shim, dir, host_file, inline_file, binary_patch, dev, symlink.
- Efficient setup script generation using hyp_file_op for file transfer.
- Batch processing of binary patches via a single hypercall.
- Handles permissions, symlinks, device nodes, and error reporting.
Usage
-----
The plugin is loaded by the Penguin framework and manages guest filesystem setup and patching.
Arguments
---------
- ``proj_dir``: Project directory for resolving host files.
- ``conf``: Configuration dictionary specifying static files and actions.
Classes
-------
- LiveImage: Main plugin class for guest filesystem setup and patching.
"""
from penguin import Plugin, plugins
from penguin.plugin_manager import resolve_bound_method_from_class
import shutil
import os
from typing import Iterator, Dict, Tuple, Optional, Callable
from pathlib import Path
import tempfile
import keystone
import tarfile
import shlex
GEN_LIVE_IMAGE_ACTION_FINISHED = 0xf113c1df
# Hypercall to patch all files at once
BATCH_PATCH_FILES_ACTION_MAGIC = 0xf113c0e0
MAGIC_GET = 0xf113c0e1
MAGIC_PUT = 0xf113c0e2
MAGIC_GET_PERM = 0xf113c0e3
MAGIC_GETSIZE = 0xf113c0e4
REPORT_ERROR = 0xfa14487
[docs]
class LiveImage(Plugin):
"""
Generates a guest filesystem live by creating a setup script that runs on boot.
This version uses a tarball for file deployment and a single, batched hypercall for patching.
"""
def __init__(self) -> None:
self._staging_dir_obj = tempfile.TemporaryDirectory()
self.staged_dir = self._staging_dir_obj.name
self.proj_dir = Path(self.get_arg("proj_dir")).resolve()
self.script_generated = False
self.fs_generated = False
self.patch_queue = []
self.config = self.get_arg("conf")
core_config = self.config.get("core", {})
self.arch = core_config.get("arch", "intel64")
self.ensure_init = lambda *args: None
self._init_callbacks = []
[docs]
def fs_init(self, func: Optional[Callable] = None):
"""
Decorator for registering fs init callbacks.
Can be used as @plugins.live_image.fs_init
"""
def decorator(f):
self._init_callbacks = self._init_callbacks + [f]
return f
if func is not None:
return decorator(func)
return decorator
def _plan_actions(self) -> Iterator[Tuple[str, Dict]]:
"""
Sorts 'static_files' actions into a logical execution order without validation.
"""
actions = self.get_arg("conf").get("static_files", {}).items()
action_order = ["delete", "move", "shim", "dir", "host_file",
"inline_file", "binary_patch", "dev", "symlink"]
partitions = {key: [] for key in action_order}
for path, action in actions:
action_type = action.get("type")
if action_type in partitions:
partitions[action_type].append((path, action))
for action_type in action_order:
reverse_sort = action_type == "delete"
for path, action in sorted(partitions[action_type], key=lambda item: len(item[0]), reverse=reverse_sort):
yield path, action
def _generate_setup_script(self) -> str:
"""Builds an efficient setup script using hyp_file_op for file transfer, not shared_dir."""
post_tar_commands = []
paths_to_delete = []
self.patch_queue = [] # Reset for this run
move_sources_to_remove = []
staging_dir = Path(self.staged_dir)
from collections import defaultdict
mode_groups = defaultdict(list) # mode(str without leading 0) -> [paths]
needed_parent_dirs = set()
path_modes = {} # track mode per path for later glob compression
def record_mode(path: str, mode_val: int):
if mode_val is None:
return
mode_str = oct(mode_val)[2:]
mode_groups[mode_str].append(path)
path_modes[path] = mode_str
def record_parent_dirs(p: str):
# Record the immediate parent; deeper parents will be recorded as we see their children.
parent = str(Path(p).parent)
if parent != '/' and parent != '.':
needed_parent_dirs.add(parent)
shim_orig_names = set()
shim_orig_counts = {}
libnvram = staging_dir / "igloo" / "libnvram"
libnvram.mkdir(parents=True, exist_ok=True)
# --- Phase 1: Plan and Stage for Tarball ---
for file_path, action in self._plan_actions():
action_type = action.get("type")
if file_path.startswith("/dev/") or file_path == "/dev":
pass # These actions are handled post-tar
if action_type in ["dir", "host_file", "inline_file"]:
record_parent_dirs(file_path)
staged_path = staging_dir / file_path.lstrip('/')
staged_path.parent.mkdir(parents=True, exist_ok=True)
if action_type == "dir":
staged_path.mkdir(exist_ok=True)
if 'mode' in action:
record_mode(file_path, action['mode'])
elif action_type == "inline_file":
if isinstance(action['contents'], bytes):
staged_path.write_bytes(action['contents'])
else:
staged_path.write_text(action['contents'])
if 'mode' in action:
record_mode(file_path, action['mode'])
elif action_type == "host_file":
host_path_str = action['host_path']
dest_path_str = file_path
# Path resolution: relative to proj_dir if not absolute
if not os.path.isabs(host_path_str):
source_path_pattern = self.proj_dir / host_path_str
else:
source_path_pattern = Path(host_path_str)
dest_path = Path(dest_path_str)
# If the destination contains a wildcard, treat it as a directory and copy each source file into it
if '*' in dest_path.name or '?' in dest_path.name:
dest_dir = dest_path.parent
if dest_dir.is_absolute():
dest_dir_path = staging_dir / \
dest_dir.relative_to('/')
else:
dest_dir_path = staging_dir / dest_dir
dest_dir_path.mkdir(parents=True, exist_ok=True)
self.logger.debug(
f"Globbing for host_file: {source_path_pattern.parent}/{source_path_pattern.name}")
glob_matches = list(
source_path_pattern.parent.glob(source_path_pattern.name))
self.logger.debug(
f"Found {len(glob_matches)} files for host_file source glob: {source_path_pattern}")
if not glob_matches:
self.logger.warning(
f"No files matched for host_file source glob: {source_path_pattern}")
for host_src in glob_matches:
if host_src.is_file():
staged_file_path = dest_dir_path / host_src.name
shutil.copy(host_src, staged_file_path)
if 'mode' in action:
record_mode(str(Path(dest_dir) / host_src.name if dest_dir.is_absolute() else (dest_dir / host_src.name)), action['mode'])
# Only expand globs in the source, never in the destination
elif '*' in source_path_pattern.name or '?' in source_path_pattern.name:
self.logger.debug(
f"Globbing for host_file: {source_path_pattern.parent}/{source_path_pattern.name}")
glob_matches = list(
source_path_pattern.parent.glob(source_path_pattern.name))
self.logger.debug(
f"Found {len(glob_matches)} files for host_file source glob: {source_path_pattern}")
if not glob_matches:
self.logger.warning(
f"No files matched for host_file source glob: {source_path_pattern}")
for host_src in glob_matches:
if host_src.is_file():
staged_file_path = staged_path / host_src.name
staged_file_path.parent.mkdir(
parents=True, exist_ok=True)
shutil.copy(host_src, staged_file_path)
if 'mode' in action:
record_mode(str(Path(file_path) / host_src.name), action['mode'])
else: # Handle single file
host_src = source_path_pattern
if not host_src.exists():
self.logger.error(
f"Host file not found: {host_src} (static_files entry: {file_path} -> {action})")
raise FileNotFoundError(
f"Host file not found: {host_src} (static_files entry: {file_path} -> {action})")
staged_path.parent.mkdir(
parents=True, exist_ok=True)
shutil.copy(host_src, staged_path)
if 'mode' in action:
record_mode(file_path, action['mode'])
# Queue or generate commands for post-tar execution
elif action_type == "binary_patch":
self.patch_queue.append((file_path, action))
elif action_type == "delete":
paths_to_delete.append(file_path)
elif action_type == "symlink":
post_tar_commands.append(
f"ln -sf {shlex.quote(action['target'])} {shlex.quote(file_path)}")
elif action_type == "shim":
# Place .orig in /igloo/shims and ensure unique name
base_name = Path(file_path).name
orig_dir = "/igloo/shims"
orig_base = f"{base_name}.orig"
orig_path = f"{orig_dir}/{orig_base}"
if orig_path in shim_orig_names:
count = shim_orig_counts.get(orig_path, 1)
while f"{orig_dir}/{base_name}_{count}.orig" in shim_orig_names:
count += 1
orig_path = f"{orig_dir}/{base_name}_{count}.orig"
shim_orig_counts[orig_path] = count
shim_orig_names.add(orig_path)
post_tar_commands.append(
f"mv {shlex.quote(file_path)} {shlex.quote(orig_path)}")
# Symlink to the target specified in configuration
post_tar_commands.append(
f"ln -sf {shlex.quote(action['target'])} {shlex.quote(file_path)}")
elif action_type == "move":
post_tar_commands.append(
f"cp {shlex.quote(action['from'])} {shlex.quote(file_path)}")
if action.get('mode') is not None:
record_mode(file_path, action['mode'])
move_sources_to_remove.append(action['from'])
elif action_type == "dev":
# Ensure parent directory exists in the tarball
parent_dir = Path(file_path).parent
if str(parent_dir) != '.':
staged_parent_dir = staging_dir / \
str(parent_dir).lstrip('/')
staged_parent_dir.mkdir(parents=True, exist_ok=True)
dev_char = 'c' if action['devtype'] == 'char' else 'b'
paths_to_delete.append(file_path)
post_tar_commands.append(
f"mknod {shlex.quote(file_path)} {dev_char} {action['major']} {action['minor']}")
if action.get('mode') is not None:
record_mode(file_path, action['mode'])
# /igloo/shims is guaranteed to exist in the base image
igloo_shims = staging_dir / "igloo" / "shims"
igloo_shims.mkdir(parents=True, exist_ok=True)
# --- Phase 2: Ensure all staged files are readable before creating the tarball ---
for root, dirs, files in os.walk(staging_dir):
for name in files:
fpath = os.path.join(root, name)
try:
st = os.stat(fpath)
if not (st.st_mode & 0o400):
os.chmod(fpath, st.st_mode | 0o400)
except Exception as e:
self.logger.warning(
f"Could not set read permission on {fpath}: {e}")
for name in dirs:
dpath = os.path.join(root, name)
try:
st = os.stat(dpath)
if not (st.st_mode & 0o400):
os.chmod(dpath, st.st_mode | 0o400)
except Exception as e:
self.logger.warning(
f"Could not set read permission on {dpath}: {e}")
# --- Phase 2: Create and copy the tarball ---
tarball_host_path = Path(staging_dir) / "filesystem.tar"
self.logger.info(
f"Creating filesystem tarball at {tarball_host_path}...")
with tarfile.open(tarball_host_path, "w") as tar:
tar.add(staging_dir, arcname='.')
self.logger.info(
f"Filesystem tarball created at {tarball_host_path}")
# --- Phase 3: Assemble the final guest script ---
script_lines = [
"#!/igloo/boot/busybox sh",
"export PATH=/igloo/boot:$PATH",
"exec > /igloo/boot/live_image_guest.log 2>&1",
"for util in chmod echo cp mkdir rm ln mknod tar mv time stat readlink dirname sh; do /igloo/boot/busybox ln -sf /igloo/boot/busybox /igloo/boot/$util; done",
"",
"run_or_report() {",
" err_line=$1; shift",
" hex_line=$(printf '%x' \"$err_line\")",
" \"$@\"",
" rc=$?",
" if [ $rc -ne 0 ]; then",
" /igloo/boot/hyp_file_op put /igloo/boot/live_image_guest.log live_image_guest.log",
f" /igloo/boot/send_portalcall {REPORT_ERROR:#x} $hex_line",
" echo \"ERROR: Command failed: $* (exit $rc) at line $err_line ($hex_line)\" >&2",
" exit $rc",
" fi",
"}",
"",
"ensure_dir() {",
" d=\"$1\"",
" if [ -L \"$d\" ]; then",
" t=$(readlink \"$d\")",
" case \"$t\" in /*) tgt=\"$t\" ;; *) tgt=\"$(dirname \"$d\")/$t\" ;; esac",
" mkdir -p \"$tgt\"",
" else",
" mkdir -p \"$d\" || true",
" fi",
"}",
""
]
def add_run_or_report(cmd):
line_num = len(script_lines) + 1
script_lines.append(f"run_or_report {line_num} {cmd}")
if paths_to_delete:
add_run_or_report(f"rm -rf {' '.join([shlex.quote(p) for p in paths_to_delete])}")
# Use hyp_file_op to get the tarball and extract
add_run_or_report("/igloo/boot/hyp_file_op get filesystem.tar /igloo/boot/filesystem.tar")
# Replace inline symlink logic with ensure_dir calls
for d in sorted(needed_parent_dirs):
add_run_or_report(f"ensure_dir {shlex.quote(d)}")
add_run_or_report("tar x -om -f /igloo/boot/filesystem.tar -C / --no-same-permissions")
# --- Phase 4: Batch-process binary patches ---
if self.patch_queue:
patch_staging_cmds = []
patch_return_cmds = []
for i, (file_path, _) in enumerate(self.patch_queue):
shared_file_name = f"patch_{i}"
patch_staging_cmds.append(
f"/igloo/boot/hyp_file_op put {shlex.quote(file_path)} {shlex.quote(os.path.basename(shared_file_name))}")
script_lines.append("\n# Staging all files for patching")
for cmd in patch_staging_cmds:
add_run_or_report(cmd)
script_lines.append("\n# Triggering single batched patch hypercall")
add_run_or_report(f"/igloo/boot/send_portalcall {BATCH_PATCH_FILES_ACTION_MAGIC:#x}")
for i, (file_path, _) in enumerate(self.patch_queue):
shared_file_name = f"patch_{i}"
patch_return_cmds.append(
f"/igloo/boot/hyp_file_op get {shlex.quote(shared_file_name)} {shlex.quote(file_path)}")
script_lines.append("\n# Moving all patched files back")
for cmd in patch_return_cmds:
add_run_or_report(cmd)
for cmd in post_tar_commands:
add_run_or_report(cmd)
# Add grouped chmod commands with /igloo/<folder>/* compression
# Build map folder -> set of modes for its descendant paths
igloo_folder_modes = defaultdict(set) # /igloo/<folder> -> {mode_str}
igloo_folder_paths = defaultdict(list) # /igloo/<folder> -> [paths]
for p, m in path_modes.items():
if p.startswith('/igloo/'):
parts = p.split('/')
if len(parts) >= 3: # / igloo <folder> ...
folder_root = '/'.join(parts[:3]) # /igloo/<folder>
# Consider only descendants, not the folder root itself for wildcard scope
if p != folder_root:
igloo_folder_modes[folder_root].add(m)
igloo_folder_paths[folder_root].append(p)
# Determine compressible folders
compress_globs = [] # (mode_str, glob_pattern)
compressed_paths = set()
MIN_COMPRESS_COUNT = 10
for folder, modeset in igloo_folder_modes.items():
# Build per-mode lists for this folder
per_mode_paths = defaultdict(list)
for p in igloo_folder_paths[folder]:
per_mode_paths[path_modes[p]].append(p)
# Case 1: all descendants same mode -> single wildcard
if len(per_mode_paths) == 1:
mode_str = next(iter(per_mode_paths.keys()))
compress_globs.append((mode_str, f"{folder}"))
compressed_paths.update(per_mode_paths[mode_str])
else:
# Case 2: multiple modes; compress any mode with large count
for mode_str, plist in per_mode_paths.items():
if len(plist) >= MIN_COMPRESS_COUNT:
compress_globs.append((mode_str, f"{folder}"))
compressed_paths.update(plist)
# Remove compressed paths from mode_groups
for mode_str, paths in list(mode_groups.items()):
if paths:
mode_groups[mode_str] = [p for p in paths if p not in compressed_paths]
# Emit compressed glob chmods first per mode
by_mode_globs = defaultdict(list)
for mode_str, glob_pat in compress_globs:
by_mode_globs[mode_str].append(glob_pat)
for mode_str, globs in by_mode_globs.items():
# combine globs in chunks
chunk = []
for g in globs:
chunk.append(g)
if len(chunk) >= 20:
add_run_or_report(f"chmod -R {mode_str} {' '.join(chunk)}")
chunk = []
if chunk:
add_run_or_report(f"chmod -R {mode_str} {' '.join(chunk)}")
# Emit remaining explicit paths
for mode_str, paths in mode_groups.items():
if not paths:
continue
chunk = []
for p in paths:
chunk.append(shlex.quote(p))
if len(chunk) >= 50:
add_run_or_report(f"chmod -R {mode_str} {' '.join(chunk)}")
chunk = []
if chunk:
add_run_or_report(f"chmod -R {mode_str} {' '.join(chunk)}")
if move_sources_to_remove:
add_run_or_report(f"rm -f {' '.join([shlex.quote(p) for p in move_sources_to_remove])}")
add_run_or_report(f"/igloo/boot/send_portalcall {GEN_LIVE_IMAGE_ACTION_FINISHED:#x}")
return "\n".join(script_lines) + "\n"
def _apply_patch_to_file_content(self, original_content: bytes, action: Dict) -> Optional[bytes]:
"""Applies a patch to a byte string and returns the new bytes."""
if bool(action.get('hex_bytes')) == bool(action.get('asm')):
self.logger.error(
"Binary patch must have exactly one of 'hex_bytes' or 'asm'.")
return None
if action.get('asm'):
patch_bytes = self._gen_asm_patch_bytes(
action['asm'], action.get('mode'))
if patch_bytes is None:
return None
else:
patch_bytes = bytes.fromhex(action['hex_bytes'].replace(" ", ""))
file_offset = action['file_offset']
return (
original_content[:file_offset] +
patch_bytes +
original_content[file_offset + len(patch_bytes):]
)
def _gen_asm_patch_bytes(self, asm_code: str, user_mode: Optional[str]) -> Optional[bytes]:
"""Assembles assembly code into bytes using Keystone."""
if keystone is None:
self.logger.error(
"Cannot assemble code because keystone-engine is not installed.")
return None
arch_map = {"armel": keystone.KS_ARCH_ARM, "aarch64": keystone.KS_ARCH_ARM64,
"mipsel": keystone.KS_ARCH_MIPS, "mipseb": keystone.KS_ARCH_MIPS, "intel64": keystone.KS_ARCH_X86}
mode_map = {"aarch64": keystone.KS_MODE_LITTLE_ENDIAN, "mipsel": keystone.KS_MODE_MIPS32 | keystone.KS_MODE_LITTLE_ENDIAN,
"mipseb": keystone.KS_MODE_MIPS32 | keystone.KS_MODE_BIG_ENDIAN, "intel64": keystone.KS_MODE_64}
ks_arch = arch_map.get(self.arch)
if self.arch == "armel":
arm_mode = user_mode or "arm"
ks_mode = keystone.KS_MODE_THUMB if arm_mode == "thumb" else keystone.KS_MODE_ARM
ks_mode |= keystone.KS_MODE_LITTLE_ENDIAN
else:
ks_mode = mode_map.get(self.arch)
if ks_arch is None or ks_mode is None:
self.logger.error(
f"Unsupported architecture for assembly: {self.arch}")
return None
try:
ks = keystone.Ks(ks_arch, ks_mode)
encoding, _ = ks.asm(asm_code)
return bytes(encoding)
except Exception as e:
self.logger.error(
f"Keystone assembly failed for arch {self.arch}: {e}")
return None
@plugins.portalcall.portalcall(BATCH_PATCH_FILES_ACTION_MAGIC)
def _on_batch_patch_hypercall(self):
"""Handles a single hypercall to patch all files in the queue."""
self.logger.info(f"Batch patching {len(self.patch_queue)} files...")
staged_dir = getattr(self, "staged_dir", None)
if staged_dir is None:
staged_dir = tempfile.gettempdir()
for i, (guest_path, action) in enumerate(self.patch_queue):
shared_file_name = f"patch_{i}"
host_side_path = Path(staged_dir) / shared_file_name
self.logger.debug(
f"Patching guest file '{guest_path}' via staged file '{host_side_path}'")
try:
original_content = host_side_path.read_bytes()
patched_content = self._apply_patch_to_file_content(
original_content, action)
if patched_content is None:
self.logger.error(
f"Failed to generate patch for {guest_path}")
return -1
host_side_path.write_bytes(patched_content)
except Exception as e:
self.logger.error(
f"Error during file patch of {host_side_path}: {e}", exc_info=True)
return -1
self.logger.info("Batch patching completed successfully.")
return 0
@plugins.portalcall.portalcall(GEN_LIVE_IMAGE_ACTION_FINISHED)
def _on_live_image_finished(self):
self.fs_generated = True
for cb in self._init_callbacks:
cb_to_call = resolve_bound_method_from_class(cb)
cb_to_call()
return 0
@plugins.portalcall.portalcall(REPORT_ERROR)
def _on_report_error(self, line_num):
"""Handles guest error reporting via hypercall."""
self.logger.error("LiveImage guest error reported via hypercall.")
self.logger.error(
f"Error reported at line {line_num} in guest script.")
# Print gen_live_image.sh script bytes
script_bytes = self._get_gen_live_image_script_bytes()
output = "gen_live_image.sh contents:\n"
for i, line in enumerate(script_bytes.decode("utf-8", errors="replace").splitlines()):
output += f"{i+1}\t| {line.rstrip()}\n"
self.logger.error(output)
# Print last few lines from guest log
log_path = Path(self.staged_dir) / "live_image_guest.log"
try:
output = "live_image_guest.log contents:\n"
with open(log_path, "r") as f:
for line in f.readlines():
output += f"{line.rstrip()}\n"
self.logger.error(output)
except Exception as e:
self.logger.error(f"Could not read guest log: {e}")
# Stop the system
self.logger.error("Halting system due to guest error.")
self.panda.end_analysis()
def _get_gen_live_image_script_bytes(self):
if not hasattr(self, '_gen_live_image_script_bytes') or self._gen_live_image_script_bytes is None:
script_content = self._generate_setup_script()
self._gen_live_image_script_bytes = script_content.encode()
return self._gen_live_image_script_bytes
[docs]
@plugins.portalcall.portalcall(MAGIC_GET)
def portalcall_get(self, path_ptr, offset, chunk_size, buffer_ptr):
path = yield from plugins.mem.read_str(path_ptr)
self.logger.debug(
f"portalcall_get: path={path}, offset={offset}, chunk_size={chunk_size}, buffer_ptr={buffer_ptr}")
# On-demand generation for gen_live_image.sh
if path == "gen_live_image.sh":
script_bytes = self._get_gen_live_image_script_bytes()
script_len = len(script_bytes)
if offset >= script_len:
self.logger.debug(
f"portalcall_get: offset {offset} >= script_len {script_len}, returning 0 bytes")
return 0
end = min(offset + chunk_size, script_len)
data = script_bytes[offset:end]
self.logger.debug(
f"portalcall_get: gen_live_image.sh, writing {len(data)} bytes")
yield from plugins.mem.write_bytes(buffer_ptr, data)
return len(data)
staged_dir = getattr(self, "staged_dir", None)
if staged_dir is None:
staged_dir = tempfile.gettempdir()
file_path = Path(staged_dir) / path.lstrip("/")
if offset == 0 and chunk_size == 0:
if not file_path.exists():
self.logger.debug(
f"portalcall_get: file not found {file_path}")
return -1
size = file_path.stat().st_size
self.logger.debug(f"portalcall_get: file size {size}")
return size
if not file_path.exists():
self.logger.debug(f"portalcall_get: file not found {file_path}")
return -1
with open(file_path, "rb") as f:
f.seek(offset)
data = f.read(chunk_size)
self.logger.debug(
f"portalcall_get: writing {len(data)} bytes from file {file_path}")
yield from plugins.mem.write_bytes(buffer_ptr, data)
return len(data)
[docs]
@plugins.portalcall.portalcall(MAGIC_PUT)
def portalcall_put(self, path_ptr, offset, chunk_size, buffer_ptr):
path = yield from plugins.mem.read_str(path_ptr)
staged_dir = getattr(self, "staged_dir", None)
if staged_dir is None:
staged_dir = tempfile.gettempdir()
file_path = Path(staged_dir) / path.lstrip("/")
data = yield from plugins.mem.read_bytes(buffer_ptr, chunk_size)
with open(file_path, "r+b" if file_path.exists() else "wb") as f:
f.seek(offset)
f.write(data)
return len(data)
[docs]
@plugins.portalcall.portalcall(MAGIC_GET_PERM)
def portalcall_get_perm(self, path_ptr):
path = yield from plugins.mem.read_str(path_ptr)
staged_dir = getattr(self, "staged_dir", None)
if staged_dir is None:
staged_dir = tempfile.gettempdir()
file_path = Path(staged_dir) / path.lstrip("/")
if not file_path.exists():
return -1
try:
mode = file_path.stat().st_mode & 0o7777
return mode
except Exception:
return -1
[docs]
@plugins.portalcall.portalcall(MAGIC_GETSIZE)
def portalcall_getsize(self, path_ptr):
path = yield from plugins.mem.read_str(path_ptr)
if path == "gen_live_image.sh":
script_bytes = self._get_gen_live_image_script_bytes()
return len(script_bytes)
staged_dir = getattr(self, "staged_dir", None)
if staged_dir is None:
staged_dir = tempfile.gettempdir()
file_path = Path(staged_dir) / path.lstrip("/")
if not file_path.exists():
return -1
try:
size = file_path.stat().st_size
return size
except Exception:
return -1