/
usr
/
libexec
/
kcare
/
python
/
kcarectl
/
Upload File
HOME
# Copyright (c) Cloud Linux Software, Inc # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT import json import numbers import os import time from . import config, constants, http_utils, log_utils, serverid, utils from .py23 import json_loads_nstr if False: # pragma: no cover from typing import Optional # noqa: F401 # from CLN sources: # code: 0 -> valid license (for key based, or ip based) # code: 1 -> trial license, unexpired # code: 2 -> no valid license, but there is expired trial license # code: 3 -> no valid or trial license CLN_VALID_LICENSE = 0 CLN_TRIAL_ACTIVE_LICENSE = 1 CLN_NO_LICENSE = 3 CACHE_FILE = os.path.join(constants.PATCH_CACHE, 'ipv6_preference.json') CACHE_TTL_SECONDS = 24 * 60 * 60 class IPProtoSelector(object): def is_ipv6_preferred(self): # type: () -> bool """ Choose ipv6 if it is more suitable. Checks order: - check config values (it is faster) - eportal setup and FORCE_IPVx - then check each proto availability using HEAD requests - then check if we have server_id, it means we don't expect an ip license - and finally we need to check if there is an ip license """ # each case is in a separate block for better coverage check if config.FORCE_IPV4: log_utils.logdebug('decided to use ipv4 because of config values') return False elif not config.PATCH_SERVER.endswith('kernelcare.com'): # eportal setup, we don't need to change urls log_utils.logdebug('decided to use ipv4 because of config values') return False elif config.FORCE_IPV6: log_utils.logdebug('decided to use ipv6 because of config values') return True # further checks are more expensive, use cached value if it is set cached = _read_cache() if cached is not None: log_utils.logdebug('decided to use {0} from on-disk cache'.format('ipv6' if cached else 'ipv4')) return cached result = None if not self._is_url_reachable(config.PATCH_SERVER_IPV6): log_utils.logdebug('decided to use ipv4 because ipv6 is not available') result = False elif not self._is_url_reachable(config.PATCH_SERVER): log_utils.logdebug('decided to use ipv6 because ipv4 is not available') result = True elif serverid.get_serverid(): log_utils.logdebug('decided to use ipv4 because server id was found') result = False if result is not None: _write_cache(result) return result ipv4_license = self._get_cln_license(ipv6=False) ipv6_license = self._get_cln_license(ipv6=True) if ipv4_license == CLN_VALID_LICENSE: log_utils.logdebug('decided to use ipv4 because ipv4 license was found') result = False elif ipv6_license == CLN_VALID_LICENSE: log_utils.logdebug('decided to use ipv6 because ipv6 license was found') result = True elif ipv4_license == CLN_TRIAL_ACTIVE_LICENSE: log_utils.logdebug('decided to use ipv4 because ipv4 trial license was found') result = False elif ipv6_license == CLN_TRIAL_ACTIVE_LICENSE: log_utils.logdebug('decided to use ipv6 because ipv6 trial license was found') result = True else: # we don't have any license yet result = False _write_cache(result) return result @staticmethod def _is_url_reachable(url): # type: (str) -> bool request = http_utils.http_request(url, method='HEAD', auth_string=None) # type: ignore[no-untyped-call] try: http_utils.urlopen(request, timeout=10, retry_on_500=False, retry_count=2) # type: ignore[no-untyped-call] return True except Exception as e: log_utils.logdebug('error during HEAD request to {0}: {1}'.format(url, str(e))) return False @staticmethod def _get_cln_license(ipv6): # type: (bool) -> int base_url = config.REGISTRATION_URL_IPV6 if ipv6 else config.REGISTRATION_URL # a comment from auth.py: # do not retry in case of 500 from CLN! # otherwise, CLN will die in pain because of too many requests url = base_url + '/check.plain' content = utils.nstr(http_utils.urlopen(url, retry_on_500=False).read()) # type: ignore[no-untyped-call] info = utils.data_as_dict(content) if not info or not info.get('code'): log_utils.kcarelog.error('Unexpected CLN response: {0}'.format(content)) return CLN_NO_LICENSE try: return int(info['code']) except ValueError: return CLN_NO_LICENSE ip_proto_selector = IPProtoSelector() def _read_cache(): # type: () -> Optional[bool] content = utils.try_to_read(CACHE_FILE) if not content: return None try: data = json_loads_nstr(content) except (ValueError, TypeError): log_utils.logwarn('ipv6 preference cache: malformed json {0!r}'.format(content), print_msg=False) return None if not isinstance(data, dict): log_utils.logwarn('ipv6 preference cache: unexpected payload {0!r}'.format(data), print_msg=False) return None prefer_ipv6 = data.get('prefer_ipv6') if not isinstance(prefer_ipv6, bool): log_utils.logwarn('ipv6 preference cache: unexpected prefer_ipv6 {0!r}'.format(prefer_ipv6), print_msg=False) return None cached_ts = data.get('ts') # numbers.Integral covers int and Python 2 long (the latter matters # once a 32-bit Python 2 host crosses the 2038 sys.maxint boundary); # exclude bool explicitly since it would silently round-trip as 0/1 if not isinstance(cached_ts, numbers.Integral) or isinstance(cached_ts, bool): log_utils.logwarn('ipv6 preference cache: malformed ts {0!r}'.format(cached_ts), print_msg=False) return None # negative age means the system clock jumped backwards (NTP correction etc.) # since we wrote the cache; treat it as a miss rather than as a fresh entry. age = time.time() - int(cached_ts) if age < 0 or age > CACHE_TTL_SECONDS: log_utils.logdebug('ipv6 preference cache: stale entry (age={0:.0f}s, ttl={1}s)'.format(age, CACHE_TTL_SECONDS)) return None return prefer_ipv6 def _write_cache(result): # type: (bool) -> None data = { 'prefer_ipv6': result, 'ts': int(time.time()), } try: utils.atomic_write(CACHE_FILE, json.dumps(data), ensure_dir=True) except (OSError, IOError) as e: log_utils.logwarn('failed to write ipv6 preference cache: {0}'.format(e), print_msg=False) def clear_cache(): # type: () -> None """Drop the on-disk ipv6 preference cache.""" try: os.unlink(CACHE_FILE) except OSError: pass def get_patch_server(): # type: () -> str return config.PATCH_SERVER_IPV6 if ip_proto_selector.is_ipv6_preferred() else config.PATCH_SERVER def get_registration_url(): # type: () -> str return config.REGISTRATION_URL_IPV6 if ip_proto_selector.is_ipv6_preferred() else config.REGISTRATION_URL