D7net
Home
Console
Upload
information
Create File
Create Folder
About
Tools
:
/
proc
/
self
/
root
/
opt
/
imunify360
/
venv
/
share
/
imunify360
/
scripts
/
migrate_csf
/
Filename :
country.py
back
Copy
import subprocess from typing import Literal from csf_conf import get_csf_config_value from logger_config import get_logger, capture_exception def migrate_country_codes() -> None: logger = get_logger() try: warn_cc_allow_fitler_is_set() allowed = get_csf_config_value("CC_ALLOW") denied = get_csf_config_value("CC_DENY") allowed = allowed.split(",") if allowed else [] denied = denied.split(",") if denied else [] logger.info(f"detected {allowed} allowed country codes") logger.info(f"detected {denied} denied country codes") if "US" in denied: logger.warning(""" Detect US in denied list, if this was unintentional, please remove it by running imunify360-agent blacklist country delete US """) apply_to_imunify_lists(allowed=allowed, denied=denied) logger.info( "country codes migrated to imunify lists, please verify them below" ) print_country_codes_in_imunify_lists() except Exception as e: logger.error(f"Error during country codes migration: {e}") capture_exception(e, {"migration_type": "country_codes"}) def warn_cc_allow_fitler_is_set() -> None: logger = get_logger() value = get_csf_config_value("CC_ALLOW_FILTER") if value and value != "": logger.warning( f"CC_ALLOW_FILTER is set to {value}, this will be ignored. We don't support it yet. " "If you need it, please contact support." ) def apply_to_imunify_lists(*, allowed: list[str], denied: list[str]) -> None: logger = get_logger() logger.info("applying to imunify lists") add_country_code_to_imunify_list( list_name="whitelist", country_codes=allowed ) add_country_code_to_imunify_list( list_name="blacklist", country_codes=denied ) def add_country_code_to_imunify_list( *, list_name: Literal["whitelist", "blacklist"], country_codes: list[str] ) -> None: cmd = [ "imunify360-agent", list_name, "country", "add", "--comment", '"migrated from csf"', *country_codes, ] logger = get_logger() logger.debug(f"Executing command: {' '.join(cmd)}") res = subprocess.run(cmd, capture_output=True, text=True) # Parse stderr for warnings about countries already in list already_in_list_warnings = [] if res.stderr: for line in res.stderr.strip().split("\n"): if "is already in" in line and "list" in line: already_in_list_warnings.append(line.strip()) # Log non-critical warnings as debug info for warning in already_in_list_warnings: logger.debug(f"Imunify agent warning: {warning}") # Only log error if return code is non-zero AND there are no "already in list" warnings # or if there are other types of errors if res.returncode != 0: if already_in_list_warnings: # If all errors are about countries already being in the list, just log as info logger.info( f"Some countries from {country_codes} are already in {list_name}" ) else: # Log actual errors logger.error( f"Error adding {country_codes} to {list_name}: {res.stderr}" ) def print_country_codes_in_imunify_lists() -> None: print_country_codes_in_imunify_list(list_name="whitelist") print_country_codes_in_imunify_list(list_name="blacklist") def print_country_codes_in_imunify_list( *, list_name: Literal["whitelist", "blacklist"] ) -> None: logger = get_logger() logger.info(f"\nlisting {list_name} countries:") res = subprocess.run( [ "imunify360-agent", list_name, "country", "list", ], capture_output=True, text=True, ) if res.returncode != 0: logger.error(f"Error listing {list_name} countries: {res.stderr}") return logger.info(res.stdout)