/
lib
/
python3.9
/
site-packages
/
dnf-plugins
/
Upload File
HOME
# Copyright (c) 2021, Oracle and/or its affiliates. All rights reserved. # Licensed under the Universal Permissive License v 1.0 as shown at https://opensource.org/licenses/UPL. import copy import errno import json import logging import os import sys import dnf from dnf.conf.config import PRIO_PLUGINCONFIG import dnf.exceptions from dnfpluginscore import _, logger import librepo sys.path.insert(0, '/usr/share/oracle-cloud-agent/plugins/osms') from osms import actions from osms import config from osms.i18n import ustr from osms.server import get_cacert, get_proxy, OsmsServer STORED_CHANNELS_NAME = '_osms.json' RHN_DISABLED = _("OSMS based repositories will be disabled.") COMMUNICATION_ERROR = _("There was an error communicating with OSMS server.") NOT_REGISTERED_ERROR = _("This system is not registered with OSMS server.") UPDATES_FROM_OSMS = _("This system is receiving updates from OSMS server.") GPG_KEY_REJECTED = _( "For security reasons packages from OSMS based repositories can be " "verified only with locally installed gpg keys. GPG key '%s' has been " "rejected." ) PROFILE_NOT_SENT = _("Package profile information could not be sent.") MISSING_HEADER = _("Missing required login information for OSMS: %s") MUST_BE_ROOT = _('OSMS plugin has to be run under with the root privileges.') class OsmsPlugin(dnf.Plugin): name = 'osmsplugin' def __init__(self, base, cli): super(OsmsPlugin, self).__init__(base, cli) self.base = base self.cli = cli self.stored_channels_path = os.path.join(self.base.conf.persistdir, STORED_CHANNELS_NAME) self.connected_to_osms = False self.up2date_cfg = {} self.conf = copy.copy(self.base.conf) self.parser = self.read_config(self.conf) if "main" in self.parser.sections(): options = self.parser.items("main") for (key, value) in options: self.conf._set_value(key, value, PRIO_PLUGINCONFIG) if not dnf.util.am_i_root(): logger.warning(MUST_BE_ROOT) self.conf.enabled = False if not self.conf.enabled: return if self.conf.debuglevel == 2: log_level = 'INFO' elif self.conf.debuglevel > 2: log_level = 'DEBUG' elif self.conf.debuglevel < 2: log_level = 'WARNING' init_root_logger(log_level) logger.debug('initialized OSMS plugin') self.activate_channels() def config(self): if not self.conf.enabled: return if self.cli: self.cli.demands.root_user = True def activate_channels(self, networking=True): enabled_channels = {} sslcacert = None force_http = 0 proxy_url = None proxy_username = None proxy_password = None login_info = None cached_channels = self._read_channels_file() if not networking: # no network communication, use list of channels from persistdir enabled_channels = cached_channels else: self.up2date_cfg = config.initUp2dateConfig() proxy_url, proxy_username, proxy_password = get_proxy(self.up2date_cfg, external=True) sslcacert = get_cacert(self.up2date_cfg) force_http = self.up2date_cfg['useNoSSLForPackages'] try: login_info = OsmsServer(timeout=self.conf.timeout).login() except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return system_id = config.getSystemId() if not login_info or not system_id: logger.error("%s\n%s", NOT_REGISTERED_ERROR, RHN_DISABLED) self._write_channels_file({}) return try: channels = OsmsServer(timeout=self.conf.timeout).up2date.listChannels(system_id) except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, RHN_DISABLED, e) return self.connected_to_osms = True logger.info(UPDATES_FROM_OSMS) for channel in channels: if channel['last_modified']: enabled_channels[channel['label']] = dict(channel.items()) self._write_channels_file(enabled_channels) repos = self.base.repos for channel_id, channel_dict in enabled_channels.items(): cached_channel = cached_channels.get(channel_id) cached_version = None if cached_channel: cached_version = cached_channel.get('last_modified') conf = copy.copy(self.conf) if channel_id in self.parser.sections(): options = self.parser.items(channel_id) for (key, value) in options: conf._set_value(key, value, PRIO_PLUGINCONFIG) opts = { 'conf': self.base.conf, 'proxy': proxy_url, 'proxy_username': proxy_username, 'proxy_password': proxy_password, 'timeout': conf.timeout, 'sslcacert': sslcacert, 'force_http': force_http, 'cached_version': cached_version, 'login_info': login_info, 'gpgcheck': conf.gpgcheck, 'enabled': conf.enabled, } repo = OsmsRepo(channel_dict, opts) repos.add(repo) logger.debug(enabled_channels) def transaction(self): """ Update system's profile after transaction. """ if not self.conf.enabled: return if not self.connected_to_osms: # not connected so nothing to do here return try: actions.package_refresh_list(timeout=self.conf.timeout) except Exception as e: logger.error("%s\n%s\n%s", COMMUNICATION_ERROR, PROFILE_NOT_SENT, e) def _read_channels_file(self): try: with open(self.stored_channels_path, "r") as channels_file: content = channels_file.read() channels = json.loads(content) return channels except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise except json.decoder.JSONDecodeError: pass # ignore broken json and recreate it later return {} def _write_channels_file(self, var): try: with open(self.stored_channels_path, "w") as channels_file: json.dump(var, channels_file, indent=4) except (FileNotFoundError, IOError) as e: if e.errno != errno.ENOENT: raise class OsmsRepo(dnf.repo.Repo): needed_headers = [ 'X-RHN-Server-Id', 'X-RHN-Auth-User-Id', 'X-RHN-Auth', 'X-RHN-Auth-Server-Time', 'X-RHN-Auth-Expire-Offset', ] def __init__(self, channel, opts): super(OsmsRepo, self).__init__(ustr(channel['label']), opts.get('conf')) self.name = ustr(channel['name']) self.baseurl = [url + '/GET-REQ/' + self.id for url in config.getServerlURL()] self.sslcacert = opts.get('sslcacert') self.proxy = opts.get('proxy') self.proxy_username = opts.get('proxy_username') self.proxy_password = opts.get('proxy_password') try: self.gpgkey = get_gpg_key_urls(channel['gpg_key_url']) except InvalidGpgKeyLocation as e: logger.warning(GPG_KEY_REJECTED, dnf.i18n.ucd(e)) self.gpgkey = [] if channel['last_modified'] != opts.get('cached_version'): self.metadata_expire = 1 self.login_info = opts.get('login_info') self.keepalive = 0 self.bandwidth = 0 self.retries = 1 self.throttle = 0 self.timeout = opts.get('timeout') self.gpgcheck = opts.get('gpgcheck') self.force_http = opts.get('force_http') if self.id.startswith('ocid'): logger.debug("Set module_hotfixes = True for custom repository %s", self.id) self.module_hotfixes = True if opts.get('enabled'): self.enable() else: self.disable() if hasattr(self, 'set_http_headers'): # dnf > 4.0.9 on RHEL 8, Fedora 29/30 http_headers = self.create_http_headers() if http_headers: self.set_http_headers(http_headers) def create_http_headers(self): http_headers = [] for header in self.needed_headers: if header not in self.login_info: error = MISSING_HEADER % header raise dnf.Error.RepoError(error) if self.login_info[header] in (None, ''): # This doesn't work due to bug in librepo (or even deeper in libcurl) # the workaround bellow can be removed once BZ#1211662 is fixed # http_headers.append("%s;" % header) http_headers.append("%s: \r\nX-libcurl-Empty-Header-Workaround: *" % header) else: http_headers.append("%s: %s" % (header, self.login_info[header])) up2date_cfg = config.initUp2dateConfig() if up2date_cfg['tenantId']: http_headers.append('X-Tenant-Id: %s' % up2date_cfg['tenantId']) if not self.force_http: http_headers.append("X-RHN-Transport-Capability: follow-redirects=3") # libdnf will set Content-Length = '' by default. An empty string is an # invalid value for Content-Length header. Explicitly set Content-Length to 0. http_headers.append('Content-Length: 0') return http_headers def _handle_new_remote(self, destdir, mirror_setup=True): # this function is called only on dnf < 3.6.0 (up to Fedora 29) handle = super(OsmsRepo, self)._handle_new_remote(destdir, mirror_setup) http_headers = self.create_http_headers() if http_headers: handle.setopt(librepo.LRO_HTTPHEADER, http_headers) return handle def get_gpg_key_urls(key_url_string): """ Parse the key urls and validate them. key_url_string is a space seperated list of gpg key urls that must be located in /etc/pkg/rpm-gpg/. Return a list of strings containing the key urls. Raises InvalidGpgKeyLocation if any of the key urls are invalid. """ key_urls = key_url_string.split() for key_url in key_urls: if not is_valid_gpg_key_url(key_url): raise InvalidGpgKeyLocation(key_url) return key_urls class InvalidGpgKeyLocation(Exception): pass def is_valid_gpg_key_url(key_url): proto_split = key_url.split('://') if len(proto_split) != 2: return False proto, path = proto_split if proto.lower() != 'file': return False path = os.path.normpath(path) if not path.startswith('/etc/pki/rpm-gpg/'): return False return True def init_root_logger(log_level): # DNF doesn't setup root logger. Logs to root logger go to the console by # default, which interferes with dnf cli. Disable them before # DNF introduces a solution. root_logger = logging.getLogger() if root_logger.hasHandlers(): return root_logger.propagate = 0 root_logger.addHandler(logging.NullHandler()) root_logger.setLevel(log_level)