VaKeR CYBER ARMY
Logo of a company Server : Apache
System : Linux host44.registrar-servers.com 4.18.0-513.18.1.lve.2.el8.x86_64 #1 SMP Sat Mar 30 15:36:11 UTC 2024 x86_64
User : vapecompany ( 2719)
PHP Version : 7.4.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clconfigure/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/clconfigure/limits.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# https://cloudlinux.com/docs/LICENCE.TXT
#
import logging
import pathlib

from clconfigure import run, task
from clconfigure.packages import install_package
from clconfigure.services import (
    STATE_FAILED,
    STATE_MASKED,
    STATE_RUNNING,
    STATE_STOPPED,
    STATE_UNMASKED,
    STATE_ENABLED,
    STATE_DISABLED,
    get_service_state,
    set_service_state
)


def initialize_lvestats():
    # NOTE(vlebedev): We need to trigger 'lve-stats' scriptlets to properly initialized it's database stuff.
    install_package('lve-stats', reinstall=True)
    enable_lve_services()


@task("Changing default limits state to '{desired_state}'")
def set_default_limits_state(desired_state):
    """
    Brings default limits to given state (unlimited | default).
    May be executed more that once, does't crash on future calls
    """
    if desired_state == 'unlimited':
        run(['lvectl', 'set', 'default', 'unlimited'])
    else:
        raise RuntimeError('Another states temporary unavailable')


def disable_lve_services():
    """
    Turn off all lve-related services.
    """
    for state in [STATE_STOPPED, STATE_DISABLED, STATE_MASKED]:
        for service in ['lve', 'lve_namespaces', 'lvestats', 'lvectl']:
            set_service_state(state, service)


def enable_lve_services():
    """
    Turn on all lve-related services.
    """
    for state in [STATE_UNMASKED, STATE_RUNNING, STATE_ENABLED]:
        for service in ['lve', 'lve_namespaces', 'lvestats', 'lvectl']:
            set_service_state(state, service)


@task("Unloading lve module")
def unload_lve_module():
    """
    Restart services that still use lve device and unload lve module then
    """
    # Check if lve device is held by any process
    res = run(['lsof', '/dev/lve'])
    res_stdout_lines = res.stdout.split() if res.stdout is not None else []

    services = [
        service for service in ['mysqld', 'mariadb', 'httpd']
        if any(line.startswith(service) for line in res_stdout_lines)
    ]

    if services:
        run(['systemctl', 'restart', *services])

    res = run(['modprobe', '-rf', 'kmodlve'])
    if pathlib.Path('/sys/module/kmodlve').exists():
        logging.warning('Failed to unload the lve module. Please reboot the server.')


def apply_workaround_lve_failed():
    """
    Apply workaround for the case
    When `systemctl stop lve` makes it transition to "failed" state instead of "inactive"
    """
    if get_service_state('lve') != STATE_FAILED:
        return
    set_service_state(STATE_UNMASKED, 'lve')
    set_service_state(STATE_RUNNING, 'lve')
    set_service_state(STATE_STOPPED, 'lve')
    set_service_state(STATE_MASKED, 'lve')

VaKeR 2022