D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
opt
/
cloudlinux
/
venv
/
lib
/
python3.11
/
site-packages
/
clcagefslib
/
webisolation
/
crontab
/
Filename :
parser.py
back
Copy
# -*- coding: utf-8 -*- # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2025 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # """Parsing and serialization functions for crontab entries.""" from .constants import ( CRON_SCHEDULE_PATTERN, WEBSITE_CRON_BEGIN_PATTERN, WEBSITE_CRON_END_PATTERN, ) from .structure import ( BeginWebsite, CommentLine, CrontabEntry, CrontabStructure, EndWebsite, ParsedCrontabLine, ) def parse_crontab_line(line: bytes) -> CrontabEntry: """ Parse a crontab line into a cron entry, comment, or website marker. Args: line: A single line from crontab output (bytes) """ stripped = line.strip() # Check for website cron begin marker begin_match = WEBSITE_CRON_BEGIN_PATTERN.match(stripped) if begin_match: docroot_bytes = begin_match.group(1).strip() docroot = docroot_bytes.decode("utf-8", errors="replace") return BeginWebsite(docroot=docroot) # Check for website cron end marker if WEBSITE_CRON_END_PATTERN.match(stripped): return EndWebsite() # Handle empty lines and comments if not stripped: return CommentLine(content=line) if stripped.startswith(b"#"): return CommentLine(content=line) # Try to match cron schedule + command schedule_match = CRON_SCHEDULE_PATTERN.match(stripped) if not schedule_match: return CommentLine(content=line) schedule = schedule_match.group(1) command = schedule_match.group(2) if line.endswith(b"\n"): command += b"\n" return ParsedCrontabLine( schedule=schedule, command=command, ) def parse_crontab_structure(input_data: bytes) -> CrontabStructure: """ Parse crontab file into structured format with user records and docroot sections. Supports both old format (with isolation wrapper prefixes) and new format (with section markers). Args: input_data: bytes containing crontab entries Returns: CrontabStructure: Structure containing global_records and docroot_sections """ lines = input_data.splitlines(keepends=True) crontab_records: dict[ str | None, list[CrontabEntry], ] = {} current_docroot: str | None = None for line in lines: parsed = parse_crontab_line(line) if isinstance(parsed, BeginWebsite): current_docroot = parsed.docroot continue if isinstance(parsed, EndWebsite): current_docroot = None continue crontab_records.setdefault(current_docroot, []).append(parsed) global_records = crontab_records.pop(None, []) return CrontabStructure( global_records=global_records, docroot_sections=crontab_records, ) def write_crontab_structure(structure: CrontabStructure) -> bytes: """ Write crontab structure to bytes in new format. Format: - User records at the top - Then docroot sections with markers: ## WEBSITE CRON BEGIN /path/to/docroot ... entries ... ## WEBSITE CRON END Args: structure: CrontabStructure with user_records and docroot_sections Returns: bytes: Crontab content in new format """ result_parts = entries_to_str_list(structure.global_records) # Add docroot sections (preserve insertion order) for docroot in structure.docroot_sections.keys(): entries = structure.docroot_sections[docroot] if not entries: continue result_parts.append(f"## WEBSITE CRON BEGIN {docroot}\n".encode("utf-8")) result_parts.extend(entries_to_str_list(entries)) result_parts.append(b"## WEBSITE CRON END\n") return b"".join(result_parts) def entries_to_str_list(entries: list[CrontabEntry], without_wrapper: bool = False) -> list[bytes]: """ Converts parsed crontab entries into a string list. """ lines = [] for entry in entries: if isinstance(entry, ParsedCrontabLine): line = ( entry.schedule + b" " + (entry.get_clean_command() if without_wrapper else entry.command) ) if not line.endswith(b"\n"): line += b"\n" lines.append(line) elif isinstance(entry, CommentLine): lines.append(entry.content) return lines