Viewing File: /opt/cloudlinux/venv/lib/python3.11/site-packages/clcagefslib/domain.py
#!/opt/cloudlinux/venv/bin/python3 -sbb
# -*- coding: utf-8 -*-
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
import logging
import os
import shutil
import subprocess
import tempfile
from collections import defaultdict
from pathlib import Path
from clcommon.clcagefs import setup_mount_dir_cagefs, CAGEFSCTL_TOOL
from clcommon.cpapi import cpusers
from clcommon.cpapi import docroot as get_domain_docroot
from clcommon.cpapi.cpapiexceptions import NoDomain
from .fs import user_exists
from .exceptions import UserNotFoundError
from .webisolation import admin_config, config, jail_utils
from .webisolation.config import DOCROOTS_ISOLATED_BASE
from .webisolation.jail_config_builder import write_jail_mounts_config
from .webisolation.php import reload_processes_with_docroots
from .webisolation.service import start_monitoring_service, stop_monitoring_service
from .webisolation.triggers import trigger_xray_ini_regeneration, trigger_ssa_ini_regeneration
def is_website_isolation_allowed_server_wide():
return os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER)
def is_website_isolation_feature_available():
return os.path.isfile(admin_config.WEBSITE_ISOLATION_AVAILABLE_MARKER)
def get_isolation_user_mode() -> str | None:
"""Return the current user mode for website isolation.
Returns:
``"allow_all"`` – all users allowed, denied dir lists exceptions.
``"deny_all"`` – no users allowed, allowed dir lists exceptions.
``None`` – not initialised yet.
"""
has_denied = os.path.isdir(admin_config.ISOLATION_DENIED_DIR)
has_allowed = os.path.isdir(admin_config.ISOLATION_ALLOWED_DIR)
if has_denied and has_allowed:
# Error state – both dirs present. Treat as allow_all and clean up.
logging.warning(
"Both site-isolation.users.allowed and site-isolation.users.denied "
"directories exist. Removing allowed directory, treating as allow_all mode."
)
shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True)
return "allow_all"
if has_denied:
return "allow_all"
if has_allowed:
return "deny_all"
return None
def is_website_isolation_allowed_for_user(user: str) -> bool:
"""Check whether *user* is allowed to use website isolation.
Combines the global marker with the two-mode user model:
* **allow_all** – allowed unless the user is in the denied directory.
* **deny_all** – denied unless the user is in the allowed directory.
"""
if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER):
return False
mode = get_isolation_user_mode()
if mode == "allow_all":
return not admin_config.user_in_dir(admin_config.ISOLATION_DENIED_DIR, user)
if mode == "deny_all":
return admin_config.user_in_dir(admin_config.ISOLATION_ALLOWED_DIR, user)
return False # not initialised
def _ensure_isolation_mount_and_marker():
"""Set up mount directories and the global marker if not already done."""
if not os.path.isfile(admin_config.WEBSITE_ISOLATION_MARKER):
setup_mount_dir_cagefs(
str(DOCROOTS_ISOLATED_BASE), prefix="*",
remount_cagefs=True, remount_in_background=False,
)
marker_path = Path(admin_config.WEBSITE_ISOLATION_MARKER)
marker_path.parent.mkdir(parents=True, exist_ok=True)
marker_path.touch()
# Remount kills redis processes, restart clwpos_monitoring service if it's running
subprocess.run(
["/usr/bin/systemctl", "try-restart", "clwpos_monitoring.service"],
capture_output=True,
text=True
)
PROXY_COMMANDS_PATH = "/etc/cagefs/proxy.commands"
CAGEFSCTL_USER_PROXY_ENTRY = "CAGEFSCTL_USER:noproceed=root:/usr/sbin/cagefsctl-user"
CAGEFSCTL_USER_BINARIES = [
"/usr/sbin/cagefsctl-user",
]
def ensure_proxyexec_command():
"""Register the ``cagefsctl-user`` proxyexec alias if not already present.
Appends the ``CAGEFSCTL_USER`` entry to ``/etc/cagefs/proxy.commands``
and runs ``cagefsctl --update-list`` to pull the required binaries into
the CageFS skeleton. This is a no-op when the entry already exists.
"""
try:
with open(PROXY_COMMANDS_PATH, "r", encoding="utf-8") as f:
content = f.read()
except FileNotFoundError:
content = ""
if "CAGEFSCTL_USER" in content:
return
logging.info("Registering cagefsctl-user in %s", PROXY_COMMANDS_PATH)
new_content = content
if new_content and not new_content.endswith("\n"):
new_content += "\n"
new_content += CAGEFSCTL_USER_PROXY_ENTRY + "\n"
proxy_dir = os.path.dirname(PROXY_COMMANDS_PATH)
os.makedirs(proxy_dir, exist_ok=True)
fd, tmp_path = tempfile.mkstemp(dir=proxy_dir, prefix=".proxy.commands.")
try:
with os.fdopen(fd, "w", encoding="utf-8") as f:
f.write(new_content)
os.replace(tmp_path, PROXY_COMMANDS_PATH)
except BaseException:
os.unlink(tmp_path)
raise
update_list = ("\n".join(CAGEFSCTL_USER_BINARIES) + "\n").encode()
subprocess.run(
[CAGEFSCTL_TOOL, "--wait-lock", "--update-list"],
input=update_list,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
def toggle_isolation_user_mode() -> str:
"""Flip the isolation user mode without modifying any per-user state.
Unlike :func:`allow_website_isolation_server_wide` and
:func:`deny_website_isolation_server_wide`, this function only flips
the mode indicator directories. It does **not** clean up existing
user isolation or alter the per-user exception lists.
* ``allow_all`` → ``deny_all``
* ``deny_all`` → ``allow_all``
* not initialised → ``allow_all``
Returns:
The new mode after toggling (``"allow_all"`` or ``"deny_all"``).
"""
_ensure_isolation_mount_and_marker()
current = get_isolation_user_mode()
if current == "allow_all":
new_mode = "deny_all"
os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True)
shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True)
else:
# deny_all or not initialised → allow_all
new_mode = "allow_all"
os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True)
shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True)
return new_mode
def allow_website_isolation_server_wide():
"""Switch to *allow_all* mode – all users are allowed by default."""
_ensure_isolation_mount_and_marker()
ensure_proxyexec_command()
# Create empty denied-users directory → allow_all mode indicator
os.makedirs(admin_config.ISOLATION_DENIED_DIR, mode=admin_config.DIR_MODE, exist_ok=True)
# Remove allowed-users directory (belongs to deny_all mode)
shutil.rmtree(admin_config.ISOLATION_ALLOWED_DIR, ignore_errors=True)
def deny_website_isolation_server_wide():
"""Switch to *deny_all* mode – no users are allowed by default.
Disables domain isolation for every user and switches the mode.
"""
_cleanup_all_users_isolation()
_ensure_isolation_mount_and_marker()
# Create empty allowed-users directory → deny_all mode indicator
os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True)
# Remove denied-users directory (belongs to allow_all mode)
shutil.rmtree(admin_config.ISOLATION_DENIED_DIR, ignore_errors=True)
def allow_website_isolation_for_user(username: str):
"""Allow website isolation for *username* (mode-aware).
* **allow_all** – removes *username* from the denied directory.
* **deny_all** – adds *username* to the allowed directory.
* **not initialised** – sets up infrastructure in *deny_all* mode
with *username* as the first allowed user.
"""
_ensure_isolation_mount_and_marker()
ensure_proxyexec_command()
mode = get_isolation_user_mode()
if mode == "allow_all":
admin_config.remove_user_from_dir(admin_config.ISOLATION_DENIED_DIR, username)
elif mode == "deny_all":
admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username)
else:
# Not initialised → start in deny_all mode with this user allowed
os.makedirs(admin_config.ISOLATION_ALLOWED_DIR, mode=admin_config.DIR_MODE, exist_ok=True)
admin_config.add_user_to_dir(admin_config.ISOLATION_ALLOWED_DIR, username)
def deny_website_isolation_for_user(username: str):
"""Deny website isolation for *username* (mode-aware).
* **allow_all** – adds *username* to the denied directory.
* **deny_all** – removes *username* from the allowed directory.
Also disables all domain isolation for the user.
"""
mode = get_isolation_user_mode()
if mode == "allow_all":
admin_config.add_user_to_dir(admin_config.ISOLATION_DENIED_DIR, username)
elif mode == "deny_all":
admin_config.remove_user_from_dir(admin_config.ISOLATION_ALLOWED_DIR, username)
# Clean up existing domain isolation for the user
_cleanup_user_isolation(username)
def _cleanup_user_isolation(username: str):
"""Remove all domain isolation state for a single user."""
if not user_exists(username):
return
user_cfg = config.load_user_config(username)
if not user_cfg.enabled_websites:
return
domain_docroot_map = {
d: _get_docroot_or_none(d) for d in user_cfg.enabled_websites
}
config.save_user_config(username, config=None)
write_jail_mounts_config(username, user_config=None)
reload_processes_with_docroots(
username, filter_by_docroots=list(domain_docroot_map.values())
)
for d, docroot in domain_docroot_map.items():
if docroot is None:
logging.error(
"Unable to detect document root for domain %s, "
"configuration cleanup failed. Contact CloudLinux support "
"if the error repeats.",
d,
)
continue
jail_utils.remove_website_token_directory(username, docroot)
def _cleanup_all_users_isolation():
"""Remove domain isolation state for every user that has it."""
for username in list(users_with_enabled_domain_isolation()):
try:
_cleanup_user_isolation(username)
except Exception:
logging.exception(
"Unable to disable website isolation for user %s, skipping.",
username,
)
users_left = users_with_enabled_domain_isolation()
if not users_left:
stop_monitoring_service()
def _get_docroot_or_none(domain: str):
try:
return get_domain_docroot(domain)[0]
except (NoDomain, IndexError):
return None
def is_isolation_enabled(user):
if not is_website_isolation_allowed_server_wide():
return False
try:
domains_config_path = jail_utils.get_jail_config_path(user)
except UserNotFoundError:
return False
return os.path.exists(domains_config_path)
def users_with_enabled_domain_isolation() -> dict:
users = [u for u in cpusers() if user_exists(u) and is_isolation_enabled(u)]
user_domain_pairs = {}
for user in users:
domains_with_isolation = get_websites_with_enabled_isolation(user)
if domains_with_isolation:
user_domain_pairs[user] = domains_with_isolation
return user_domain_pairs
def get_websites_with_enabled_isolation(user: str):
if not user_exists(user):
logging.warning(
"User %s not found, cannot get websites with enabled isolation", user)
return []
return config.load_user_config(user).enabled_websites
def get_docroots_of_isolated_websites() -> dict:
"""
Returns pairs user: set(docroots) for all users with website isolation enabled
Used by monitoring service to watch docroots changes to load actual list of docroot paths
instead of stale storage
"""
users_with_isolation = users_with_enabled_domain_isolation()
pairs = defaultdict(set)
for user, domains in users_with_isolation.items():
for domain in domains:
try:
dr = get_domain_docroot(domain)[0]
except (NoDomain, IndexError):
continue
pairs[user].add(dr)
return pairs
def enable_website_isolation(user, domain):
if not user_exists(user):
logging.warning(
"User %s not found, cannot enable website isolation", user)
return
user_config = config.load_user_config(user)
if domain not in user_config.enabled_websites:
user_config.enabled_websites.append(domain)
# if it crashes just let the command fail with NoDomain
# exception, it should be a very rare case because
# we validate input domain name in cagefsctl.py
document_root = get_domain_docroot(domain)[0]
# Create website token directory and overlay storage
jail_utils.create_website_token_directory(user, document_root)
jail_utils.create_overlay_storage_directory(user, document_root)
config.save_user_config(user, user_config)
# regenerate alt-php ini configuration for selector for the specific domain
subprocess.run(["cagefsctl", "--rebuild-alt-php-ini", "--domain", domain], check=True)
write_jail_mounts_config(user, user_config)
reload_processes_with_docroots(user, filter_by_docroots=[_get_docroot_or_none(domain)])
start_monitoring_service()
# Trigger xray/ssa ini regeneration for per-domain PHP selector
trigger_xray_ini_regeneration(user, domain)
trigger_ssa_ini_regeneration(user)
def regenerate_isolation_configuration(user):
if not user_exists(user):
logging.warning(
"User %s not found, cannot regenerate website isolation configuration", user)
return
user_config = config.load_user_config(user)
write_jail_mounts_config(user, user_config)
document_roots = []
for domain in user_config.enabled_websites:
document_root = _get_docroot_or_none(domain)
if document_root is None:
logging.warning(
"Unable to find document root for domain %s, "
"please contact CloudLinux support if the issue persists.",
domain,
)
continue
document_roots.append(document_root)
try:
# recreate tokens and storage e.g. when username changes
jail_utils.create_website_token_directory(user, document_root)
jail_utils.create_overlay_storage_directory(user, document_root)
except Exception as e:
logging.error("Unable to recreate token/storage for domain=%s, Error=%s", domain, e)
continue
reload_processes_with_docroots(user, filter_by_docroots=document_roots)
def disable_website_isolation(user: str, domain: str | None = None):
if not user_exists(user):
logging.warning(
"User %s not found, cannot disable website isolation", user)
return
user_config = config.load_user_config(user)
reload_docroots = None
if domain is None:
reload_docroots = [
_get_docroot_or_none(website) for website in user_config.enabled_websites
]
user_config.enabled_websites = []
elif domain in user_config.enabled_websites:
reload_docroots = [_get_docroot_or_none(domain)]
user_config.enabled_websites.remove(domain)
config.save_user_config(user, user_config)
write_jail_mounts_config(user, user_config)
if reload_docroots:
reload_processes_with_docroots(user, filter_by_docroots=reload_docroots)
for document_root in reload_docroots:
if document_root is None:
continue
jail_utils.remove_website_token_directory(user, document_root)
# get actual docroots for all users with website isolation enabled
users_with_isolation = users_with_enabled_domain_isolation()
if not users_with_isolation:
stop_monitoring_service()
Back to Directory
File Manager