VaKeR CYBER ARMY
Logo of a company Server : Apache
System : Linux host44.registrar-servers.com 4.18.0-513.18.1.lve.2.el8.x86_64 #1 SMP Sat Mar 30 15:36:11 UTC 2024 x86_64
User : vapecompany ( 2719)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/utils.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import logging
import os
import shlex
import subprocess
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Optional

from clcommon.cpapi import (
    get_domains_php_info,
    get_installed_php_versions,
)

from defence360agent.contracts.config import (
    MalwareScanSchedule,
    MalwareScanScheduleInterval as Interval,
)

from imav.malwarelib.model import MalwareHit
from imav.malwarelib.scan.queue_supervisor_sync import (
    QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.utils import user_list

from imav.wordpress import (
    WP_CLI_WRAPPER_PATH,
)
from imav.wordpress.exception import PHPError

CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user"
CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"

logger = logging.getLogger(__name__)


def wp_wrapper(php_path: str, docroot: str) -> list:
    """Get wp cli common command list"""
    return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]


@lru_cache(maxsize=1)
def get_cagefs_enabled_users() -> set:
    """Get the list of users enabled for CageFS."""
    if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access(
        CAGEFS_CTL_PATH, os.X_OK
    ):
        return set()

    result = subprocess.run(
        [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True
    )
    if result.returncode != 0:
        return set()

    lines = result.stdout.strip().split("\n")
    return set(lines[1:])  # Skip the first line which is a summary


def clear_get_cagefs_enabled_users_cache():
    get_cagefs_enabled_users.cache_clear()


def build_command_for_user(username: str, args: list) -> list:
    """Build the necessary command to run the given cmdline args with specified user."""
    if username in get_cagefs_enabled_users():
        if os.path.isfile(CAGEFS_ENTER_PATH) and os.access(
            CAGEFS_ENTER_PATH, os.X_OK
        ):
            return [
                CAGEFS_ENTER_PATH,
                "--no-io-and-memory-limit",
                username,
                *args,
            ]

    return [
        "su",
        "-s",
        "/bin/bash",
        username,
        "-c",
        shlex.join(args),
    ]


def get_php_binary_path(domain: str, username: str) -> Optional[str]:
    """Determine PHP binary path for the given domain and username."""

    logger.debug(
        "Get php binary path for domain: %s, username: %s", domain, username
    )
    # Get the PHP version identifier for the given domain and username
    domains_php_info = get_domains_php_info()
    domain_info = domains_php_info.get(domain)

    if not domain_info or domain_info.get("username") != username:
        raise PHPError(
            "Unable to get PHP version for domain: {domain}, username:"
            " {username}".format(domain=domain, username=username)
        )

    php_display_version = domain_info.get("display_version")
    if not php_display_version:
        raise PHPError(
            "PHP binary was not identified for domain: {domain}, username:"
            " {username}".format(domain=domain, username=username)
        )

    # Get the PHP binary path from the installed PHP versions
    installed_php_versions = get_installed_php_versions()
    php_binary_path = None

    for php_version in installed_php_versions:
        if php_version.get("identifier") == php_display_version:
            php_binary_path = php_version.get("bin")
            break

    if not php_binary_path:
        raise PHPError(
            "PHP binary was not identified for domain: {domain}, username:"
            " {username}".format(domain=domain, username=username)
        )

    return php_binary_path


def get_malware_history(username: str) -> list:
    """
    Get malware history for the specified user.

    This is an equivalent of calling `imunify360-agent malware history list --user {username}`.
    ``
    """
    (max_count, hits) = MalwareHit.malicious_list(user=username)
    return hits


async def get_last_scan(sink, username: str) -> dict:
    """
    Get the last scan for the specified user.

    This is an equivalent of calling `imunify360-agent malware user list --user {username}`.
    """
    queue = ScanQueue(sink)
    _, users = await user_list.fetch_user_list(
        queue.get_scans_from_paths, match={username}
    )

    if not users:
        return {}

    users = user_list.sort(users, "scan_date", desc=True)
    return users[0]


def calculate_next_scan_timestamp():
    today = datetime.utcnow()

    if MalwareScanSchedule.INTERVAL == Interval.DAY:
        next_scan = today.replace(
            hour=MalwareScanSchedule.HOUR,
            minute=0,
            second=0,
            microsecond=0,
        )
        if today >= next_scan:
            next_scan += timedelta(days=1)
        return next_scan.timestamp()

    if MalwareScanSchedule.INTERVAL == Interval.WEEK:
        # today.weekday() returns 0 for Monday, 6 for Sunday, but MalwareScanSchedule.DAY_OF_WEEK uses 0 for Sunday,
        # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation.
        days_ahead = (
            MalwareScanSchedule.DAY_OF_WEEK - (today.weekday() + 1) % 7 + 7
        ) % 7
        if days_ahead == 0 and today.hour >= MalwareScanSchedule.HOUR:
            days_ahead = 7
        next_scan_date = today + timedelta(days=days_ahead)
        return next_scan_date.replace(
            hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0
        ).timestamp()

    if MalwareScanSchedule.INTERVAL == Interval.MONTH:
        next_scan_date = today.replace(
            day=MalwareScanSchedule.DAY_OF_MONTH,
            hour=MalwareScanSchedule.HOUR,
            minute=0,
            second=0,
            microsecond=0,
        )
        if today.day > MalwareScanSchedule.DAY_OF_MONTH or (
            today.day == MalwareScanSchedule.DAY_OF_MONTH
            and today.hour >= MalwareScanSchedule.HOUR
        ):
            next_month = today.month + 1 if today.month < 12 else 1
            next_scan_date = next_scan_date.replace(month=next_month)
            if next_month == 1:  # Handle year change
                next_scan_date = next_scan_date.replace(year=today.year + 1)
        return next_scan_date.timestamp()

VaKeR 2022