D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib64
/
python3.11
/
site-packages
/
clcagefslib
/
webisolation
/
crontab
/
Filename :
processor.py
back
Copy
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2025 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # """Processing functions for crontab operations.""" import os import pwd import shlex import subprocess import sys from typing import BinaryIO, Callable, Optional from clcagefslib.domain import is_isolation_enabled from .constants import ISOLATION_WRAPPER from .parser import ( parse_crontab_structure, write_crontab_structure, entries_to_str_list, ) from .structure import ParsedCrontabLine from .utils import get_document_root CRONTAB_BIN = "/usr/bin/crontab" def process_list(stdout: Optional[BinaryIO] = None, stderr: Optional[BinaryIO] = None) -> int: """ Process CRONTAB_LIST command. Runs 'crontab -l' to get current crontab entries. When isolation is active (PROXYEXEC_DOCUMENT_ROOT is set), only shows entries for the current document root. Removes isolation prefixes from output. Args: stdout: Output stream buffer (defaults to sys.stdout.buffer) stderr: Error stream buffer (defaults to sys.stderr.buffer) Returns: int: Exit code from crontab command, or 1 on error """ stdout = stdout or sys.stdout.buffer stderr = stderr or sys.stderr.buffer result = subprocess.run( [CRONTAB_BIN, "-l"], capture_output=True, ) # early exit in case site isolation is not turned on if not is_isolation_enabled(pwd.getpwuid(os.getuid()).pw_name): stdout.write(result.stdout) stderr.write(result.stderr) return result.returncode if result.returncode != 0: # Pass through stderr from crontab stderr.write(result.stderr) return result.returncode document_root = get_document_root() # Parse structure and pick entries to show structure = parse_crontab_structure(result.stdout) if document_root is not None: entries_to_show = structure.docroot_sections.get(document_root, []) else: entries_to_show = structure.global_records # Convert selected entries to bytes, removing wrapper prefixes if isolation is active result_parts = entries_to_str_list(entries_to_show, without_wrapper=bool(document_root)) output_data = b"".join(result_parts) stdout.write(output_data) return 0 def process_save( stdin: Optional[BinaryIO] = None, stdout: Optional[BinaryIO] = None, stderr: Optional[BinaryIO] = None, run_func: Optional[Callable] = None, ) -> int: """ Process CRONTAB_SAVE command. Reads crontab entries from stdin. If isolation is active: 1. Gets the current full crontab 2. Removes entries for the current document root 3. Adds new entries to the current document root section 4. Merges and saves the result in new format If isolation is not active: 1. Gets the current full crontab 2. Replaces user records section with new entries 3. Preserves all docroot sections 4. Saves in new format This ensures entries for other document roots are preserved. Args: stdin: Input stream buffer (defaults to sys.stdin.buffer) stdout: Output stream buffer (defaults to sys.stdout.buffer) stderr: Error stream buffer (defaults to sys.stderr.buffer) run_func: Function to run subprocess (defaults to subprocess.run) Returns: int: Exit code from crontab command, or 1 on error """ stdin = stdin or sys.stdin.buffer stdout = stdout or sys.stdout.buffer stderr = stderr or sys.stderr.buffer run_func = run_func or subprocess.run input_data = stdin.read() document_root = get_document_root() if is_isolation_enabled(pwd.getpwuid(os.getuid()).pw_name): # Get current crontab to preserve entries list_result = run_func( [CRONTAB_BIN, "-l"], capture_output=True, ) # No existing crontab or error - start fresh # Note: returncode 1 typically means "no crontab for user", which is acceptable existing_data = list_result.stdout if list_result.returncode == 0 else b"" # Parse existing crontab into structure existing_structure = parse_crontab_structure(existing_data) # Parse new input (filtered data, no section markers) # input_data is already bytes from stdin.read() new_structure = parse_crontab_structure(input_data) # Extract entries and add wrapper prefixes for docroot sections parsed_entries = [] for entry in new_structure.global_records: if isinstance(entry, ParsedCrontabLine) and document_root: # Decode command to string for shlex.quote() command_bytes = entry.command has_newline = command_bytes.endswith(b"\n") command_str = command_bytes.rstrip(b"\n").decode("utf-8", errors="replace") # Use shlex.quote() to properly quote the command for bash -c quoted_command = shlex.quote(command_str) # Build the wrapped command: wrapper path docroot bash -c "quoted_command" # Use shlex.join() for the prefix to handle paths with spaces, then add quoted command prefix_parts = shlex.join([ISOLATION_WRAPPER, document_root, "bash", "-c"]) wrapped_command_str = f"{prefix_parts} {quoted_command}" wrapped_command = wrapped_command_str.encode("utf-8") if has_newline: wrapped_command += b"\n" parsed_entries.append( ParsedCrontabLine(schedule=entry.schedule, command=wrapped_command) ) else: # Keep as-is for comments and when document_root is None (may have wrapper prefixes) parsed_entries.append(entry) if document_root: # Isolation active: replace docroot section with new entries existing_structure.docroot_sections[document_root] = parsed_entries else: existing_structure.global_records = parsed_entries # Write in new format modified_data = write_crontab_structure(existing_structure) else: # pass through modified_data = input_data result = run_func( [CRONTAB_BIN, "-"], input=modified_data, capture_output=True, ) # Pass through any output from crontab if result.stdout: stdout.write(result.stdout) if result.stderr: stderr.write(result.stderr) return result.returncode