Source code for libsolace.items.SolaceUser

import logging
import libsolace
from libsolace.Decorators import only_on_shutdown, before, only_if_not_exists
from libsolace.plugin import Plugin, PluginResponse
from libsolace.SolaceCommandQueue import SolaceCommandQueue
from libsolace.SolaceXMLBuilder import SolaceXMLBuilder
from libsolace.util import get_key_from_kwargs
from libsolace.Exceptions import *


@libsolace.plugin_registry.register
[docs]class SolaceUser(Plugin): """Manage a Solace Client User This plugin manages Client Users within Solace. Typically you should invoke this plugin via L{SolaceAPI.SolaceAPI}. Please see L{plugin.Plugin} for how plugins are instantiated and used. """ plugin_name = "SolaceUser" api = "None" commands = None def __init__(self, **kwargs): """ Manage the SolaceUser ( client-username ) :param client_username: the username of the client :type client_username: str :param password: the password to set ( plaintext ) :type password: str :param vpn_name: the vpn name :type vpn_name: str :param client_profile: the client profile name to associate with, must exist! :type client_profile: str :param acl_profile: the acl_profile to associate with, must exist! :type acl_profile: str :param shutdown_on_apply: is shutdown permitted boolean or char :type shutdown_on_apply: bool or char b or char u :param options: not implemented yet :type options: Options :param version: if you want to override the SEMP version for some reason :type version: str :param api: the instance of the SolaceAPI if not instantiated via SolaceAPI.manage :type api: SolaceAPI :rtype: list :returns: list of requests that can be performed in a for loop. Example: >>> connection = SolaceAPI("dev") >>> self.users = [connection.manage("SolaceUser", client_username = "%s_testuser", password = "mypassword", vpn_name = "%s_testvpn", client_profile = "glassfish", acl_profile = "%s_testvpn", testmode = True, shutdown_on_apply = False version = self.version)] """ logging.info("SolaceUser: kwargs: %s " % kwargs) # get the API and pop off the kwargs self.api = get_key_from_kwargs('api', kwargs) kwargs.pop('api') self.commands = SolaceCommandQueue(version=self.api.version) # settings section self.SOLACE_QUEUE_PLUGIN = get_key_from_kwargs("SOLACE_QUEUE_PLUGIN", self.api.settings.__dict__) if kwargs == {}: logging.debug("Query Mode") return self.options = None self.client_username = get_key_from_kwargs("client_username", kwargs) self.password = get_key_from_kwargs("password", kwargs) self.vpn_name = get_key_from_kwargs("vpn_name", kwargs) self.acl_profile = get_key_from_kwargs("acl_profile", kwargs) self.client_profile = get_key_from_kwargs("client_profile", kwargs) self.testmode = get_key_from_kwargs("testmode", kwargs, default=False) self.shutdown_on_apply = get_key_from_kwargs("shutdown_on_apply", kwargs) logging.info("""UserCommands: %s, Environment: %s, Username: %s, Password: %s, vpn_name: %s, acl_profile: %s, client_profile: %s, testmode: %s, shutdown_on_apply: %s""" % (self.commands, self.api.environment, self.client_username, self.password, self.vpn_name, self.acl_profile, self.client_profile, self.testmode, self.shutdown_on_apply)) if self.testmode: logging.info('TESTMODE ACTIVE') try: self.requirements(**kwargs) except Exception, e: logging.error("Tests Failed %s" % e) raise BaseException("Tests Failed") # backwards compatibility for None options passed to still execute "add" code if self.options is None: logging.warning( "No options passed, assuming you meant 'add', please update usage of this class to pass a " "OptionParser instance") try: # Check if user already exists, if not then shutdown immediately after creating the user self.get(**kwargs)[0]['rpc-reply']['rpc']['show']['client-username']['client-usernames']['client-username'] except (AttributeError, KeyError, MissingClientUser): logging.info("User %s doesn't exist, using shutdown_on_apply to True for user" % self.client_username) kwargs['shutdown_on_apply'] = True self.create_user(**kwargs) self.shutdown(**kwargs) self.set_client_profile(**kwargs) self.set_acl_profile(**kwargs) self.no_guarenteed_endpoint(**kwargs) self.no_subscription_manager(**kwargs) self.set_password(**kwargs) self.no_shutdown(**kwargs)
[docs] def requirements(self, **kwargs): """ Call the tests before create is attempted, checks for profiles in this case :rtype: None :returns: nothing """ logging.info('Pre-Provision Tests') self.check_client_profile_exists(**kwargs) self.check_acl_profile_exists(**kwargs)
[docs] def get(self, **kwargs): """ Get a username from the appliance, return a dict Example >>> connection = SolaceAPI("dev") >>> reply = connection.manage("SolaceUser").get(client_username="default", vpn_name="default") >>> reply[0]['rpc-reply']['rpc']['show']['client-username']['client-usernames']['client-username']['client-username'] u'default' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :rtype: list :returns: the user as a dict from the appliance """ client_username = get_key_from_kwargs("client_username", kwargs) vpn_name = get_key_from_kwargs("vpn_name", kwargs) logging.info("Getting user: %s vpn: %s" % (client_username, vpn_name)) self.api.x = SolaceXMLBuilder("Getting user %s" % client_username, version=self.api.version) self.api.x.show.client_username.name = client_username self.api.x.show.client_username.vpn_name = vpn_name self.api.x.show.client_username.detail # enqueue to validate self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) self.commands.commands.pop() # do the request now response = self.api.rpc(PluginResponse(str(self.api.x), **kwargs)) logging.debug("SRH: %s" % response[0]) if response[0]['rpc-reply']['rpc']['show']['client-username']['client-usernames'] == 'None': raise MissingClientUser("Primary: No such user %s" % client_username) elif response[1] is not None and response[1]['rpc-reply']['rpc']['show']['client-username']['client-usernames'] == 'None': raise MissingClientUser("Backup: No such user %s" % client_username) else: return response
@only_on_shutdown('user') @before("shutdown")
[docs] def delete(self, **kwargs): """ Delete a client user Example >>> connection = SolaceAPI("dev") >>> connection.manage("SolaceUser").delete(client_username="foo", vpn_name="bar", force=True, skip_before=True).xml '<rpc semp-version="soltr/7_1_1"><no><client-username><username>foo</username><vpn-name>bar</vpn-name></client-username></no></rpc>' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs("client_username", kwargs) vpn_name = get_key_from_kwargs("vpn_name", kwargs) self.api.x = SolaceXMLBuilder("Delete User %s" % client_username, version=self.api.version) self.api.x.no.client_username.username = client_username self.api.x.no.client_username.vpn_name = vpn_name self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def check_client_profile_exists(self, **kwargs): """ Checks if a client_profile exists on the appliance. Example: >>> api = SolaceAPI("dev") >>> client = api.manage("SolaceUser") >>> client.check_client_profile_exists(client_profile="default") True :param client_profile: the client profile to check :type client_profile: str :rtype: bool :returns: true or false """ client_profile = get_key_from_kwargs("client_profile", kwargs) logging.info('Checking if client_profile is present on devices') self.api.x = SolaceXMLBuilder("Checking client_profile %s is present on device" % client_profile, version=self.api.version) self.api.x.show.client_profile.name = client_profile response = self.api.rpc(str(self.api.x), allowfail=False) for v in response: if v['rpc-reply']['rpc']['show']['client-profile'] == None: logging.warning('client_profile: %s missing from appliance' % client_profile) return False return True
[docs] def check_acl_profile_exists(self, **kwargs): """ Checks if a acl_profiles exists on the appliance. Example: >>> api = SolaceAPI("dev") >>> client = api.manage("SolaceUser") >>> client.check_acl_profile_exists(acl_profile="myacl") False :param acl_profile: the client profile to check :type acl_profile: str :rtype: bool :returns: true or false """ acl_profile = get_key_from_kwargs('acl_profile', kwargs) logging.info('Checking if acl_profile is present on devices') self.api.x = SolaceXMLBuilder("Checking acl_profile %s is present on device" % acl_profile, version=self.api.version) self.api.x.show.acl_profile.name = acl_profile response = self.api.rpc(str(self.api.x), allowfail=False) # logging.info(response) for v in response: if v['rpc-reply']['rpc']['show']['acl-profile']['acl-profiles'] == None: logging.warning('acl_profile: %s missing from appliance' % acl_profile) return False return True
@only_if_not_exists('get', 'rpc-reply.rpc.show.client-username.client-usernames.client-username')
[docs] def create_user(self, **kwargs): """ Create client-user Example >>> api = SolaceAPI("dev") >>> xml = api.manage("SolaceUser").create_user(client_username="foo", vpn_name="bar", force=True) >>> xml.xml '<rpc semp-version="soltr/7_1_1"><create><client-username><username>foo</username><vpn-name>bar</vpn-name></client-username></create></rpc>' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name:str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) self.api.x = SolaceXMLBuilder("New User %s" % client_username, version=self.api.version) self.api.x.create.client_username.username = client_username self.api.x.create.client_username.vpn_name = vpn_name self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def shutdown(self, **kwargs): """ Shutdown the user, this method will be called by anything decorated with the @shutdown decorator. The kwarg shutdown_on_apply needs to be either True or 'u' or 'b' for this method to fire. Example >>> connection.manage("SolaceUser").shutdown(client_username="foo", vpn_name="bar", shutdown_on_apply=True).xml '<rpc semp-version="soltr/7_1_1"><client-username><username>foo</username><vpn-name>bar</vpn-name><shutdown/></client-username></rpc>' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :param shutdown_on_apply: bool / char :type shutdown_on_apply: bool / char :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) shutdown_on_apply = get_key_from_kwargs('shutdown_on_apply', kwargs) # b = both, u = user, True = forced if (shutdown_on_apply == 'b') or (shutdown_on_apply == 'u') or (shutdown_on_apply == True): self.api.x = SolaceXMLBuilder("Disabling User %s" % client_username, version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.shutdown self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs) else: logging.warning( "Not disabling User, commands could fail since shutdown_on_apply = %s" % shutdown_on_apply) return None
@only_on_shutdown('user')
[docs] def set_client_profile(self, **kwargs): """ Set the ClientProfile Example >>> connection = SolaceAPI("dev") >>> requests = [] >>> requests.append(connection.manage("SolaceUser").shutdown(client_username="default", vpn_name="default", shutdown_on_apply=True)) >>> requests.append(connection.manage("SolaceUser").set_client_profile(client_username="default", vpn_name="default", client_profile="default")) >>> requests.append(connection.manage("SolaceUser").no_shutdown(client_username="default", vpn_name="default", shutdown_on_apply=True)) >>> # [api.rpc(r) for r in requests] :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :param client_profile: the client profile to check :type client_profile: str :param shutdown_on_apply: bool / char :type shutdown_on_apply: bool / char :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) client_profile = get_key_from_kwargs('client_profile', kwargs) # Client Profile self.api.x = SolaceXMLBuilder("Setting User %s client profile to %s" % (client_username, client_profile), version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.client_profile.name = client_profile self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
@only_on_shutdown('user')
[docs] def set_acl_profile(self, **kwargs): """ Set the acl profile >>> connection = SolaceAPI("dev") >>> requests = [] >>> requests.append(connection.manage("SolaceUser").shutdown(client_username="default", vpn_name="default", shutdown_on_apply=True)) >>> requests.append(connection.manage("SolaceUser").set_acl_profile(client_username="default", vpn_name="default", acl_profile="default")) >>> requests.append(connection.manage("SolaceUser").no_shutdown(client_username="default", vpn_name="default", shutdown_on_apply=True)) >>> # [api.rpc(r) for r in requests] :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :param client_profile: the client profile to check :type client_profile: str :param shutdown_on_apply: bool / char :type shutdown_on_apply: bool / char :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) acl_profile = get_key_from_kwargs('acl_profile', kwargs) # Set client user profile self.api.x = SolaceXMLBuilder("Set User %s ACL Profile to %s" % (client_username, vpn_name), version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.acl_profile.name = acl_profile self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def no_guarenteed_endpoint(self, **kwargs): """ No guaranteed endpoint permission override Example: >>> api = SolaceAPI("dev", version="soltr/7_1_1") >>> request = api.manage("SolaceUser").no_guarenteed_endpoint(client_username="foo", vpn_name="bar") >>> request.xml '<rpc semp-version="soltr/7_1_1"><client-username><username>foo</username><vpn-name>bar</vpn-name><no><guaranteed-endpoint-permission-override/></no></client-username></rpc>' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) # No Guarenteed Endpoint self.api.x = SolaceXMLBuilder("Default User %s guaranteed endpoint override" % client_username, version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.no.guaranteed_endpoint_permission_override self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def no_subscription_manager(self, **kwargs): """ No subscription manager Example: >>> api = SolaceAPI("dev", version="soltr/7_1_1") >>> request = api.manage("SolaceUser").no_subscription_manager(client_username="foo", vpn_name="bar") >>> request.xml '<rpc semp-version="soltr/7_1_1"><client-username><username>foo</username><vpn-name>bar</vpn-name><no><subscription-manager/></no></client-username></rpc>' :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) # No Subscription Managemer self.api.x = SolaceXMLBuilder("Default User %s subscription manager" % client_username, version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.no.subscription_manager self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def set_password(self, **kwargs): """ Sets the client-user's password :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :param password: the vpn name :type password: str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = get_key_from_kwargs('client_username', kwargs) vpn_name = get_key_from_kwargs('vpn_name', kwargs) password = get_key_from_kwargs('password', kwargs) # Set User Password self.api.x = SolaceXMLBuilder("Set User %s password" % client_username, version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.password.password = password self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)
[docs] def no_shutdown(self, **kwargs): """ Enable the client-user :param client_username: the username :type client_username: str :param vpn_name: the vpn name :type vpn_name: str :rtype: plugin.PluginResponse :returns: SEMP request """ client_username = kwargs.get('client_username') vpn_name = kwargs.get('vpn_name') # Enable User self.api.x = SolaceXMLBuilder("Enable User %s" % client_username, version=self.api.version) self.api.x.client_username.username = client_username self.api.x.client_username.vpn_name = vpn_name self.api.x.client_username.no.shutdown self.commands.enqueue(PluginResponse(str(self.api.x), **kwargs)) return PluginResponse(str(self.api.x), **kwargs)