![]() System : Linux host44.registrar-servers.com 4.18.0-513.18.1.lve.2.el8.x86_64 #1 SMP Sat Mar 30 15:36:11 UTC 2024 x86_64 User : vapecompany ( 2719) PHP Version : 7.4.33 Disable Function : NONE Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import os import shlex import subprocess from datetime import datetime, timedelta from functools import lru_cache from typing import Optional from clcommon.cpapi import ( get_domains_php_info, get_installed_php_versions, ) from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from imav.malwarelib.model import MalwareHit from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync as ScanQueue, ) from imav.malwarelib.utils import user_list from imav.wordpress import ( WP_CLI_WRAPPER_PATH, ) from imav.wordpress.exception import PHPError CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user" CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl" logger = logging.getLogger(__name__) def wp_wrapper(php_path: str, docroot: str) -> list: """Get wp cli common command list""" return [str(WP_CLI_WRAPPER_PATH), php_path, docroot] @lru_cache(maxsize=1) def get_cagefs_enabled_users() -> set: """Get the list of users enabled for CageFS.""" if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access( CAGEFS_CTL_PATH, os.X_OK ): return set() result = subprocess.run( [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True ) if result.returncode != 0: return set() lines = result.stdout.strip().split("\n") return set(lines[1:]) # Skip the first line which is a summary def clear_get_cagefs_enabled_users_cache(): get_cagefs_enabled_users.cache_clear() def build_command_for_user(username: str, args: list) -> list: """Build the necessary command to run the given cmdline args with specified user.""" if username in get_cagefs_enabled_users(): if os.path.isfile(CAGEFS_ENTER_PATH) and os.access( CAGEFS_ENTER_PATH, os.X_OK ): return [ CAGEFS_ENTER_PATH, "--no-io-and-memory-limit", username, *args, ] return [ "su", "-s", "/bin/bash", username, "-c", shlex.join(args), ] def get_php_binary_path(domain: str, username: str) -> Optional[str]: """Determine PHP binary path for the given domain and username.""" logger.debug( "Get php binary path for domain: %s, username: %s", domain, username ) # Get the PHP version identifier for the given domain and username domains_php_info = get_domains_php_info() domain_info = domains_php_info.get(domain) if not domain_info or domain_info.get("username") != username: raise PHPError( "Unable to get PHP version for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) php_display_version = domain_info.get("display_version") if not php_display_version: raise PHPError( "PHP binary was not identified for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) # Get the PHP binary path from the installed PHP versions installed_php_versions = get_installed_php_versions() php_binary_path = None for php_version in installed_php_versions: if php_version.get("identifier") == php_display_version: php_binary_path = php_version.get("bin") break if not php_binary_path: raise PHPError( "PHP binary was not identified for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) return php_binary_path def get_malware_history(username: str) -> list: """ Get malware history for the specified user. This is an equivalent of calling `imunify360-agent malware history list --user {username}`. `` """ (max_count, hits) = MalwareHit.malicious_list(user=username) return hits async def get_last_scan(sink, username: str) -> dict: """ Get the last scan for the specified user. This is an equivalent of calling `imunify360-agent malware user list --user {username}`. """ queue = ScanQueue(sink) _, users = await user_list.fetch_user_list( queue.get_scans_from_paths, match={username} ) if not users: return {} users = user_list.sort(users, "scan_date", desc=True) return users[0] def calculate_next_scan_timestamp(): today = datetime.utcnow() if MalwareScanSchedule.INTERVAL == Interval.DAY: next_scan = today.replace( hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0, ) if today >= next_scan: next_scan += timedelta(days=1) return next_scan.timestamp() if MalwareScanSchedule.INTERVAL == Interval.WEEK: # today.weekday() returns 0 for Monday, 6 for Sunday, but MalwareScanSchedule.DAY_OF_WEEK uses 0 for Sunday, # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation. days_ahead = ( MalwareScanSchedule.DAY_OF_WEEK - (today.weekday() + 1) % 7 + 7 ) % 7 if days_ahead == 0 and today.hour >= MalwareScanSchedule.HOUR: days_ahead = 7 next_scan_date = today + timedelta(days=days_ahead) return next_scan_date.replace( hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0 ).timestamp() if MalwareScanSchedule.INTERVAL == Interval.MONTH: next_scan_date = today.replace( day=MalwareScanSchedule.DAY_OF_MONTH, hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0, ) if today.day > MalwareScanSchedule.DAY_OF_MONTH or ( today.day == MalwareScanSchedule.DAY_OF_MONTH and today.hour >= MalwareScanSchedule.HOUR ): next_month = today.month + 1 if today.month < 12 else 1 next_scan_date = next_scan_date.replace(month=next_month) if next_month == 1: # Handle year change next_scan_date = next_scan_date.replace(year=today.year + 1) return next_scan_date.timestamp()