From 0650eefa9e6e1f9e3d582bbd409622067a01135d Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Tue, 12 Nov 2024 16:35:58 -0500 Subject: [PATCH 1/9] Initial development --- .gitignore | 1 + LICENSE | 15 + PKG-INFO | 38 - README.md | 46 ++ README.txt | 27 - pyproject.toml | 21 + setup.py | 15 - space/commands.py | 75 -- space/gems.py | 116 --- space/gemsdir.py | 40 -- space/links.py | 132 ---- space/parameters.py | 399 ----------- space/queries.py | 42 -- space/shell.py | 94 --- space/system.py | 153 ---- {space => src/space}/__init__.py | 47 +- src/space/assets.py | 122 ++++ src/space/commands.py | 44 ++ src/space/console_queries.py | 28 + src/space/constants.py | 66 ++ {space => src/space}/errors.py | 8 +- src/space/gems.py | 15 + src/space/links.py | 15 + src/space/native_procedures.py | 30 + src/space/parameters.py | 750 ++++++++++++++++++++ src/space/procedure_engines.py | 40 ++ src/space/procedures.py | 21 + src/space/shell.py | 251 +++++++ src/space/silent_queries.py | 26 + src/space/space_pythons.py | 96 +++ src/space/space_queries.py | 87 +++ src/space/system.py | 28 + {space => src/space}/times.py | 87 ++- {scripts => test}/ConfigureFEP.py | 33 +- {scripts => test}/PassSetup.py | 22 +- test/README.md | 20 + {scripts => test}/SetMomentumWheelSpeed.py | 39 +- {data => test/data}/SpacePythonDataset.yaml | 6 +- test/demo/DemoAsset.py | 114 +++ test/demo/DemoCommand.py | 54 ++ test/demo/DemoParameter.py | 72 ++ test/demo/DemoProcedure.py | 20 + test/demo/DemoProcedureEngine.py | 25 + test/demo/DemoSpacePython.py | 42 ++ test/demo/__init__.py | 0 {space => test/demo}/loader.py | 85 +-- 46 files changed, 2232 insertions(+), 1275 deletions(-) create mode 100644 .gitignore create mode 100644 LICENSE delete mode 100644 PKG-INFO create mode 100644 README.md delete mode 100644 README.txt create mode 100644 pyproject.toml delete mode 100644 setup.py delete mode 100644 space/commands.py delete mode 100644 space/gems.py delete mode 100644 space/gemsdir.py delete mode 100644 space/links.py delete mode 100644 space/parameters.py delete mode 100644 space/queries.py delete mode 100644 space/shell.py delete mode 100644 space/system.py rename {space => src/space}/__init__.py (52%) create mode 100644 src/space/assets.py create mode 100644 src/space/commands.py create mode 100644 src/space/console_queries.py create mode 100644 src/space/constants.py rename {space => src/space}/errors.py (89%) create mode 100644 src/space/gems.py create mode 100644 src/space/links.py create mode 100644 src/space/native_procedures.py create mode 100644 src/space/parameters.py create mode 100644 src/space/procedure_engines.py create mode 100644 src/space/procedures.py create mode 100644 src/space/shell.py create mode 100644 src/space/silent_queries.py create mode 100644 src/space/space_pythons.py create mode 100644 src/space/space_queries.py create mode 100644 src/space/system.py rename {space => src/space}/times.py (68%) rename {scripts => test}/ConfigureFEP.py (58%) rename {scripts => test}/PassSetup.py (64%) create mode 100644 test/README.md rename {scripts => test}/SetMomentumWheelSpeed.py (67%) rename {data => test/data}/SpacePythonDataset.yaml (87%) create mode 100644 test/demo/DemoAsset.py create mode 100644 test/demo/DemoCommand.py create mode 100644 test/demo/DemoParameter.py create mode 100644 test/demo/DemoProcedure.py create mode 100644 test/demo/DemoProcedureEngine.py create mode 100644 test/demo/DemoSpacePython.py create mode 100644 test/demo/__init__.py rename {space => test/demo}/loader.py (64%) diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9b1c8b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/dist diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..510937b --- /dev/null +++ b/LICENSE @@ -0,0 +1,15 @@ +Object Management Group RF-Limited License +https://www.omg.org/cgi-bin/doc.cgi?ipr + +Each Obligated Party in an RF on Limited Terms IPR Mode Adoption or Revision +Process covenants that it will grant to an unrestricted number of applicants +a royalty and fee free, nonexclusive, worldwide, non-sublicensable, perpetual +patent license to its Essential Claims on fair, reasonable, and +non-discriminatory terms to make, have made, use, import, offer to sell, sell, +and otherwise directly or indirectly distribute Covered Implementations of +such OMG Formal Specification, provided that it may not impose any further +conditions or restrictions beyond those specifically mentioned in Appendix B +on the use of any technology or intellectual property rights or the behavior +of the Licensee, but may include reasonable, customary terms relating to +operation or maintenance of the license relationship, including choice of law +and dispute resolution. diff --git a/PKG-INFO b/PKG-INFO deleted file mode 100644 index e6e7e3e..0000000 --- a/PKG-INFO +++ /dev/null @@ -1,38 +0,0 @@ -Metadata-Version: 1.1 -Name: SpacePython -Version: 1.1.0 -Summary: Skeleton for SpacePython implementation -Home-page: UNKNOWN -Author: Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort -Author-email: space@omg.org, bkizzort@harris.com -License: LICENSE.txt -Description: =========== - SpacePython - =========== - - The space package defines SpacePython, a high level interface to a Spacecraft - Operations Center for spacecraft monitoring and control. The scripts included - in the package exercise the normative interfaces for SpacePython and should be - runnable by any SpacePython-compliant implementation. - - Each function and class definition in a space module that is required for a - SpacePython implementation is imported by the space package __init__.py file, - and is also marked in the module with the comment "#Normative". If a class is - normative, then all of its methods are normative, unless they are explicitly - marked non-normative. There are helper classes and module variables that are - part of the skeleton implementation, but are not marked #Normative and are - not required in a compliant implementation. - - The included dataset, SpacePythonDataset.yaml, - provides command, directive, and parameter lists to - allow running the procedures in the skeleton, but should be replaced with - the database definition formats used by the Spacecraft Operations Center - software. The yaml format is not a required input format, but is provided - only to allow running the example scripts. SpacePythonDataset.yaml should - be copied to the user's home directory for the example package loader to find - it. - - ============================= - -Platform: UNKNOWN -Requires: yaml diff --git a/README.md b/README.md new file mode 100644 index 0000000..561fe0e --- /dev/null +++ b/README.md @@ -0,0 +1,46 @@ +# SpacePython + +Managed by the Object Management Group + +Project Website: https://www.omg.org/solm/ + +## Package Description + +The space package defines SpacePython, a high level interface to a Spacecraft +Operations Center for spacecraft monitoring and control. The scripts included +in the test directory package exercise the normative interfaces for SpacePython +and should be runnable by any SpacePython-compliant implementation with a +similar operating configuration. + +Each function and class definition in a space module that is required for a +SpacePython implementation is marked as an abstract class or function. + +Within the test directory, the included dataset, SpacePythonDataset.yaml, +provides command, directive, and parameter lists to allow running the +example procedures, but should be replaced with +the database definition formats used by the Spacecraft Operations Center +software. The yaml format (and the pyyaml module) is not a required input +format, but is required for running the example scripts. + +## Package Build Instructions + +First, make sure that you have latest pip installed + +``` +python3 -m pip install --upgrade pip +``` + +Second, install the build tooling + +``` +python3 -m pip install --upgrade build +``` + +Lastly, perform a build + +``` +python3 -m build +``` + +## Reporting Issues +If you have issues, please share them with the Object Management Group via our issue tracker at https://issues.omg.org/ \ No newline at end of file diff --git a/README.txt b/README.txt deleted file mode 100644 index 8ad90f0..0000000 --- a/README.txt +++ /dev/null @@ -1,27 +0,0 @@ -=========== -SpacePython -=========== - -The space package defines SpacePython, a high level interface to a Spacecraft -Operations Center for spacecraft monitoring and control. The scripts included -in the package exercise the normative interfaces for SpacePython and should be -runnable by any SpacePython-compliant implementation. - -Each function and class definition in a space module that is required for a -SpacePython implementation is imported by the space package __init__.py file, -and is also marked in the module with the comment "#Normative". If a class is -normative, then all of its methods are normative, unless they are explicitly -marked non-normative. There are helper classes and module variables that are -part of the skeleton implementation, but are not marked #Normative and are -not required in a compliant implementation. - -The included dataset, SpacePythonDataset.yaml, -provides command, directive, and parameter lists to -allow running the procedures in the skeleton, but should be replaced with -the database definition formats used by the Spacecraft Operations Center -software. The yaml format is not a required input format, but is provided -only to allow running the example scripts. SpacePythonDataset.yaml should -be copied to the user's home directory for the example package loader to find -it. - -============================= diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..170b481 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,21 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "space" +version = "1.2.0" +authors = [ + { name="Space Domain Task Force", email="space@omg.org" } +] +description = "SpacePython core interface" +readme = "README.md" +requires-python = ">=3.6" +classifiers = [ + "Programming Language :: Python :: 3", + "Operating System :: OS Independent" +] + +[project.urls] +Homepage = "https://www.omg.org/solm/" +Issues = "https://issues.omg.org" diff --git a/setup.py b/setup.py deleted file mode 100644 index 9ba208e..0000000 --- a/setup.py +++ /dev/null @@ -1,15 +0,0 @@ -from distutils.core import setup -setup(name='SpacePython', - version='1.1.0', - description='Skeleton for SpacePython implementation', - requires=['yaml'], - author='Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort', - author_email='space@omg.org, bkizzort@harris.com', - packages=['space'], - data_files=[('data', ['data/SpacePythonDataset.yaml'])], - scripts=['scripts/SetMomentumWheelSpeed.py', - 'scripts/PassSetup.py', - 'scripts/ConfigureFEP.py'], - license='LICENSE.txt', - long_description=open('README.txt').read(), - ) diff --git a/space/commands.py b/space/commands.py deleted file mode 100644 index c0ac9b1..0000000 --- a/space/commands.py +++ /dev/null @@ -1,75 +0,0 @@ -''' -Commands and CommandRequests may be sent through a Link -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -from .errors import UnknownParameterError -class Command(object): #Normative - ''' - The Command class incorporates the command name a list of typed - range-limited arguments. - ''' - def __init__(self, name, link, args=dict()): #Non-normative - ''' - The Command constructor is intended for the internal creation - of the command catalog. Use Link.lookupCommand() to find a - command catalog entry. - ''' - self.name = name - self.link = link - self.arg = args - def setValues(self, **args): - '''Set the argument values from the Keyword=Value pairs passed - ''' - params = list(args.keys()) - for param in params: - if param in self.arg: - self.arg[param].setValue(args[param]) - else: - raise UnknownParameterError('Specified command argument {0} not defined for {1}' - .format(param, self.name)) - def __repr__(self): - return "Command('{0}')".format(self.name) - def send(self, _flags=dict()): - '''Send the command to the link with the defined argument values - ''' - args = dict() - for argName in list(self.arg.keys()): - args[argName] = self.arg[argName].value() - self.link.send(self.name, _flags=_flags, **args) -class CommandRequest(object): #Normative - ''' - The CommandRequest class incorporates a command and several settable - request flags - ''' - def __init__(self, command): #Flag attribute names are normative, __init__ is not - ''' - A CommandRequest is constructed from a Command that is returned from - the Link.lookupCommand() or Link.commands() methods. - ''' - self.command = command - self.releaseAt = None - self.preAuthorized = False - self.noEncryption = False - self.ignoreConstraints = False - self.ignoreReceipt = False - self.ignoreVerification = False - def __repr__(self): - return "CommandRequest('{0}')".format(self.command.name) - def send(self): - '''Send this CommandRequest to the Link associated with the Command - ''' - flags = dict() - if self.releaseAt is not None: - flags['releaseAt'] = self.releaseAt - if self.preAuthorized: - flags['preAuthorized'] = True - if self.noEncryption: - flags['noEncryption'] = True - if self.ignoreConstraints: - flags['ignoreConstraints'] = True - if self.ignoreReceipt: - flags['ignoreReceipt'] = True - if self.ignoreVerification: - flags['ignoreVerification'] = True - self.command.link.send(self.command.name, _flags=flags) diff --git a/space/gems.py b/space/gems.py deleted file mode 100644 index 0b21af1..0000000 --- a/space/gems.py +++ /dev/null @@ -1,116 +0,0 @@ -''' -The GemsDevice class is used to associate with a specific -ground equipment. -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -import re -from space import log -from .errors import GemsError -# Initialize the list of known devices -devices_ = dict() - -class GemsDevice: #Normative - def __init__(self, device): - ''' - Establish communication with a ground device. - ''' - # The implementation should validate that the device exists - # and load any necessary data structures to support future - # get/set/directive requests. - if hasattr(device, 'name'): - self.device = device - devices_[device.name] = device - else: - if device in devices_: - self.device = devices_[device] - else: - raise GemsError('GemsDevice {0} is not defined'.format(device)) - - def __repr__(self): - return "space.GemsDevice('{name}')".format(name=self.device.name) - - def get(self, parameters=[]): - '''get(parameters) refreshes the parameter values by reading from the device - ''' - # This method is expected to cause a poll of the device - # to get current values. If parameters are constantly - # polled by the ground system, then it is an opportunity - # to refresh current values for a running script - if len(parameters) > 0: - out = 'Getting {0} parameters:'.format(self.device.name) - for param in parameters: - out += ' {name}'.format(name=param) - log.info(out) - else: - raise GemsError('No Gems Parameters specified on get') - - def set(self, **arguments): - '''Set the value for the parameter=value pairs specified - ''' - params = list(arguments.keys()) - if len(params) > 0: - out = 'Setting {0} parameters:'.format(self.device.name) - for param in params: - out += ' {name}={value}'.format(name=param, value=arguments[param]) - log.info(out) - else: - raise GemsError('No Gems Parameters specified on set') - - def lookupDirective(self, name): - '''Return a directive with the specified name, if the - device has a defined directive. - ''' - if name in self.device.dSet: - return self.device.dSet[name] - else: - return None - - def directives(self, regexp=''): - '''Return a list of directives associated with this - device and subsystem whose names pass the provided - filter. Default values return all known parameters. - ''' - keys = list(self.device.dSet.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - - def lookupParameter(self, name): - '''Lookup a parameter associated with this device. - ''' - if name in self.device.pSet: - return self.device.pSet[name] - else: - return None - - def parameters(self, regexp=''): - '''Return a list of parameters associated with this - device and subsystem whose names pass the provided - filters. Default values return all known parameters. - ''' - keys = list(self.device.pSet.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - - def send(self, directive, **arguments): - '''Send a directive with the specified parameters to - the device - ''' - # If this is a Directive from the catalog, use the specific send method - # to pick up the arguments - if hasattr(directive, 'send'): - directive.send() - # Should validate the directive name, parameter names and values - # if possible - else: - log.info('Sending {dir} to GemsDevice {dev}'.format(dir=directive, dev=self.device.name)) - params = list(arguments.keys()) - if len(params) > 0: - out = ' Directive parameters:' - for param in params: - out += ' {name}={value}'.format(name=param, value=arguments[param]) - log.info(out) diff --git a/space/gemsdir.py b/space/gemsdir.py deleted file mode 100644 index b0d41d2..0000000 --- a/space/gemsdir.py +++ /dev/null @@ -1,40 +0,0 @@ -''' -GEMS Directives -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -from .errors import UnknownParameterError -class GemsDirective(object): #Normative - ''' - GEMS directives are similar to spacecraft commands. There is a directive name - and a set of parameter=value pairs that can be sent to the GEMS device to change - device configuration. - ''' - def __init__(self, name, device, args=dict()): #Non-normative - ''' - The Gems Directive constructor is intended for the internal creation - of a directive catalog. Use GemsDevice.lookupDirective() to find a - catalog entry. - ''' - self.name = name - self.device = device - self.arg = args - def setValues(self, **args): - '''Set the Parameter values from the Keyword=Value pairs passed - ''' - params = list(args.keys()) - for param in params: - if param in self.arg: - self.arg[param].setValue(args[param]) - else: - raise UnknownParameterError('GEMS Parameter {0} not defined for {1}' - .format(param, self.name)) - def __repr__(self): - return "Directive('{0}')".format(self.name) - def send(self): - '''Send the directive to the device with the defined parameter values - ''' - args = dict() - for argName in list(self.arg.keys()): - args[argName] = self.arg[argName].value() - self.device.send(self.name, **args) diff --git a/space/links.py b/space/links.py deleted file mode 100644 index 1d4aa34..0000000 --- a/space/links.py +++ /dev/null @@ -1,132 +0,0 @@ -''' -The Link and Downlink classes are used to associate with a spacecraft -uplink/downlink and downlink only, respectively. -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -import re -from space import log -from .errors import IllegalLinkError -from .commands import CommandRequest -# Create a dictionary for the known links -links_ = dict() - -class Link(object): #Normative - ''' - The Link class represents an uplink/downlink path for a spaceSystem - ''' - # Define the Link state list. Only the first state means that the - # link is valid (allows commanding and provides telemetry) - states=['UP', 'DOWN', 'ESTABLISHING'] - - def __init__(self, spaceSystem): - ''' - Associate with a Link based on the specified spaceSystem name. - ''' - # The implementation should validate that the spaceSystem exists - # and load any necessary data structures to support future - # command and telemetry requests. Link setup/teardown are not - # necessarily associated with the creation of a Link object, - # but are usually performed by native procedures. - if hasattr(spaceSystem, 'name'): - self.system = spaceSystem - links_[spaceSystem.name] = spaceSystem - self.system.storeLink(self) - else: - if spaceSystem in links_: - self.system = links_[spaceSystem] - else: - raise IllegalLinkError('SpaceSystem {0} is not linked' - .format(spaceSystem)) - self.uplink = True - - def __repr__(self): - return "space.Link('{name}')".format(name=self.system.name) - - def send(self, command, _flags=dict(), **arguments): - '''Send a command to the linked spacecraft - send is called with a command name and optional keyword=value arguments - ''' - # If this is a Command or CommandRequest, need to use the specific send method - # to pick up the arguments and flags - if hasattr(command, 'send'): - command.send() - # Should validate the command name, parameter names and values - # if possible - else: - log.info('Sending {cmd} to spaceSystem {sys}'.format(cmd=command, sys=self.system.name)) - params = list(arguments.keys()) - if len(params) > 0: - out = ' Command arguments:' - for param in params: - out += ' {name}={value}'.format(name=param, value=arguments[param]) - log.info(out) - if len(_flags) > 0: - out = ' Command flags:' - for flag in list(_flags.keys()): - out += ' {name}={value}'.format(name=flag, value=_flags[flag]) - log.info(out) - def lookupParameter(self, name): - '''Lookup a parameter associated with this Link/spaceSystem - ''' - if name in self.system.pSet: - return self.system.pSet[name] - else: - return None - - def parameters(self, regexp=''): - '''Return a list of parameters with names passing the regexp filter. - The default value results in a list of all parameters, which is not - recommended due to the potential list size - ''' - keys = list(self.system.pSet.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - - def lookupCommand(self, name): - '''Lookup a command associated with this Link/spaceSystem - ''' - if name in self.system.cSet: - return self.system.cSet[name] - else: - return None - - def commands(self, regexp=''): - '''Return a list of commands with names passing the regexp filter. - The default value results in a list of all commands, which is not - recommended due to the potential list size. - ''' - keys = list(self.system.cSet.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - - def createCommandRequest(self, command): - return CommandRequest(command) - - def state(self): - '''Return the current state of the Link - Possible states are defined in states class attribute - ''' - return 'UP' - -class Downlink(Link): #Normative - ''' - Associate with a Downlink (telemetry only). - ''' - def __init__(self, spaceSystem): - Link.__init__(self, spaceSystem) - self.uplink = False - - # Override the methods that are invalid for a Downlink - def send(self, command, args): - raise IllegalLinkError('{0} does not support commanding' - .format(self.system.name)) - - def lookupCommand(self, name): - raise IllegalLinkError('{0} does not support commanding' - .format(self.system.name)) - diff --git a/space/parameters.py b/space/parameters.py deleted file mode 100644 index ae17a78..0000000 --- a/space/parameters.py +++ /dev/null @@ -1,399 +0,0 @@ -''' -Parameters are telemetry items for spacecraft and ground equipment or -are items defined by the ground system itself. -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -import re -from .times import SpecificTime, TimeInterval -from .errors import UndefinedTypeError, IllegalValueError -# define an integer conversion for unsigned types -def unsigned(value): - i = int(value) # Let python do all the checking - if i < 0: - i = -i - return i - -# -# The typeNames dictionary defines the list of allowed Parameter type names -# and provides a conversion function for each type. -typeNames = {'boolean': bool, 'byte': int, 'ubyte': unsigned, \ - 'short': int, 'ushort': unsigned, 'int': int, \ - 'uint': unsigned, 'long': int, 'ulong': unsigned,\ - 'float': float, 'double': float, 'string': str,\ - 'posixTime': SpecificTime.fromStr,'hexBitField': int,\ - 'uTime': SpecificTime.fromStr, 'interval': TimeInterval.fromStr} - -class Parameter: #Normative - '''Base Class for all ExternalParameter classes in SpacePython - ''' - def __init__(self, name, dType='double', **kwds): - '''Create a new Parameter instance - ''' - self.name = name - if dType in typeNames: - self.dType = dType - else: - raise UndefinedTypeError('Could not create parameter of data type <{0}>'.format(dType)) - self.description = None - self.multiplicity = None - self.restriction = None - self.units = None - self.value_ = None - self.raw_ = None - self.time_ = None - options = list(kwds.keys()) - for option in options: - if hasattr(self, option): - setattr(self, option, kwds[option]) - def value(self): - '''Return the current value of the Parameter or None if no value has been - reported - ''' - # Currently returns a static attribute, but should interface with - # the control system to provide the last reported value - return self.value_ - def raw(self): - '''Return the current raw value of the Parameter or None if no value has - been reported - ''' - # Currently returns a static attribute, but should interface with - # the control system to provide the last reported value - return self.raw_ - def setValue(self, value): - '''Set the value of the Parameter, validating against any restrictions. - This method will raise an exception if the new value does not meet the - restrictions on the Parameter value. - ''' - # If the value supplied is not of the specified type, try to convert it using the type converter - value = typeNames[self.dType](value) - for restriction in self.restriction: - if not restriction.validate(value): - raise IllegalValueError('Violates restriction {0}'.format(restriction)) - self.value_ = value - self.time_ = SpecificTime.now() - def report(self): - '''Return a tuple of (value, timestamp) or (None, None) if no reported - value - ''' - return (self.value_, self.time_) - def __str__(self): - if self.value_ is not None: - return str(self.value_) - else: - return self.__repr__() - def __repr__(self): - r = 'Parameter({0}, dType={1}'.format(self.name, self.dType) - if self.description is not '': - r = r + ', description="{0}"'.format(self.description) - if self.multiplicity is not None: - r = r + ', multiplicity={0}'.format(self.multiplicity) - if self.value_ is not None: - r = r + ', value_={0}'.format(self.value_) - r = r + ')' - return r -class GemsParameter(Parameter): #Normative - '''Creates a GEMS device parameter - ''' - def __init__(self, name, device=None, dType='double', **kwds): - self.device = device #Active device connection for sets/gets - if 'writable' in kwds: - self.writable = kwds['writable'] - else: - self.writable = True - Parameter.__init__(self, name, dType, **kwds) - -class XtceParameter(Parameter): #Normative - def __init__(self, name, link=None, dType='double', **kwds): - self.link = link #Active link for data refresh - if 'writable' in kwds: - self.writable = kwds['writable'] - else: - self.writable = True - Parameter.__init__(self, name, dType, **kwds) - -class GroundParameter(Parameter): #Normative - '''Creates a Ground system parameter - ''' - def __init__(self, name, dType='double', **kwds): - if 'writable' in kwds: - self.writable = kwds['writable'] - else: - self.writable = True - Parameter.__init__(self, name, dType, **kwds) - -# Instances of the Restriction can be added to a Parameter so that value changes can be -# validated -import calendar #Needed for time conversions -class Restriction(object): - '''Base class for all value restrictions. - ''' - def __init__(self): - pass - -class EnumerationR(Restriction): - '''Limits a string Parameter to a list of values - ''' - def __init__(self, names=list()): - self.names = names - def validate(self, value): - try: - self.names.index(value) - return True - except: - return False - def __repr__(self): - return 'EnumerationR({0})'.format(self.names) - -class FractionDigitsR(Restriction): - '''Restricts the number of digits after the decimal for a float Parameter. - Not really a limit on the value but could be used to control conversions to - and from a string - ''' - def __init__(self, length): - self.length = length - def validate(self, value): - return True - def __repr__(self): - return 'FractionDigitsR({0})'.format(self.length) - -class LengthR(Restriction): - '''Requires a string Parameter to have a specific length - ''' - def __init__(self, length): - self.length = length - def validate(self, value): - if len(value) == self.length: - return True - else: - return False - def __repr__(self): - return 'LengthR({0})'.format(self.length) - -class MaxExclusiveR(Restriction): - '''Requires that an integer or floating parameter be less than a value - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - if value < self.maxVal: - return True - else: - return False - def __repr__(self): - return 'MaxExclusiveR({0})'.format(self.maxVal) - -class MaxInclusiveR(Restriction): - '''Requires that an integer or floating parameter not exceed a value - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - if value <= self.maxVal: - return True - else: - return False - def __repr__(self): - return 'MaxInclusiveR({0})'.format(self.maxVal) - -class MaxLengthR(Restriction): - '''Requires that a string Parameter not exceed a specified length - ''' - def __init__(self, length): - self.length = length - def validate(self, value): - if len(value) <= self.length: - return True - else: - return False - def __repr__(self): - return 'MaxLengthR({0})'.format(self.length) - -class MaxSecondsExclusiveR(Restriction): - '''Requires that the seconds portion of a time Parameter not be less than a - value - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - if getattr(value, 'total_seconds', None): - # TimeInterval and datetime.timedelta will return a total_seconds - seconds = value.total_seconds() - else: - # SpecificTime and datetime.datetime need to be converted to seconds - tt = value.utctimetuple() - seconds = calendar.timegm(tt) - if seconds < self.maxVal: - return True - else: - return False - def __repr__(self): - return 'MaxSecondsExclusiveR({0})'.format(self.maxVal) - -class MaxSecondsInclusiveR(Restriction): - '''Requires that the nanoseconds portion of a time Parameter not exceed a - value - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - if getattr(value, 'total_seconds', None): - # TimeInterval and datetime.timedelta will return a total_seconds - seconds = value.total_seconds() - else: - # SpecificTime and datetime.datetime need to be converted to seconds - tt = value.utctimetuple() - seconds = calendar.timegm(tt) - if seconds <= self.maxVal: - return True - else: - return False - def __repr__(self): - return 'MaxSecondsInclusiveR({0})'.format(self.maxVal) - -class MaxNanosR(Restriction): - '''Requires that the nanoseconds portion of a time Parameter not exceed a - value - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - if getattr(value, 'nanos', None): - #SpecificTime and TimeInterval have a nanos() method - nanos = value.nanos() - else: - #datetime.timedelta and datetime.datetime only have microseconds - nanos = value.microseconds*1000 - if nanos <= self.maxVal: - return True - else: - return False - def __repr__(self): - return 'MaxNanosR({0})'.format(self.maxVal) - -class MinExclusiveR(Restriction): - '''Requires that an integer or floating parameter be less than a value - ''' - def __init__(self, minVal): - self.minVal = minVal - def validate(self, value): - if value > self.minVal: - return True - else: - return False - def __repr__(self): - return 'MinExclusiveR({0})'.format(self.minVal) - -class MinInclusiveR(Restriction): - '''Requires that an integer or floating parameter not exceed a value - ''' - def __init__(self, minVal): - self.minVal = minVal - def validate(self, value): - if value >= self.minVal: - return True - else: - return False - def __repr__(self): - return 'MinInclusiveR({0})'.format(self.minVal) - -class MinLengthR(Restriction): - '''Requires that a string Parameter not exceed a specified length - ''' - def __init__(self, length): - self.length = length - def validate(self, value): - if len(value) >= self.length: - return True - else: - return False - def __repr__(self): - return 'MinLengthR({0})'.format(self.length) - -class MinSecondsExclusiveR(Restriction): - '''Requires that the seconds portion of a time Parameter not be less than a - value - ''' - def __init__(self, minVal): - self.minVal = minVal - def validate(self, value): - if getattr(value, 'total_seconds', None): - # TimeInterval and datetime.timedelta will return a total_seconds - seconds = value.total_seconds() - else: - # SpecificTime and datetime.datetime need to be converted to seconds - tt = value.utctimetuple() - seconds = calendar.timegm(tt) - if seconds > self.minVal: - return True - else: - return False - def __repr__(self): - return 'MinSecondsExclusiveR({0})'.format(self.minVal) - -class MinSecondsInclusiveR(Restriction): - '''Requires that the nanoseconds portion of a time Parameter not exceed a - value - ''' - def __init__(self, minVal): - self.minVal = minVal - def validate(self, value): - if getattr(value, 'total_seconds', None): - # TimeInterval and datetime.timedelta will return a total_seconds - seconds = value.total_seconds() - else: - # SpecificTime and datetime.datetime need to be converted to seconds - tt = value.utctimetuple() - seconds = calendar.timegm(tt) - if seconds >= self.minVal: - return True - else: - return False - def __repr__(self): - return 'MinSecondsInclusiveR({0})'.format(self.minVal) - -class MinNanosR(Restriction): - '''Requires that the nanoseconds portion of a time Parameter not exceed a - value - ''' - def __init__(self, minVal): - self.minVal = minVal - def validate(self, value): - if getattr(value, 'nanos', None): - #SpecificTime and TimeInterval have a nanos() method - nanos = value.nanos() - else: - #datetime.timedelta and datetime.datetime only have microseconds - nanos = value.microseconds*1000 - if nanos >= self.minVal: - return True - else: - return False - def __repr__(self): - return 'MinNanosR({0})'.format(self.minVal) - -class PatternR(Restriction): - '''Requires that a string Parameter match a specified pattern - ''' - def __init__(self, pattern): - self.pattern = pattern - self.re = re.compile(pattern) - def validate(self, value): - if self.re.match(value): - return True - else: - return False - def __repr__(self): - return 'PatternR({0})'.format(self.pattern) - -class TotalDigitsR(Restriction): - '''Restricts the total number of digits for a float or integer Parameter. - Not really a limit on the value but could be used to control conversions to - and from a string - ''' - def __init__(self, maxVal): - self.maxVal = maxVal - def validate(self, value): - return True - def __repr__(self): - return 'TotalDigitsR({0})'.format(self.maxVal) diff --git a/space/queries.py b/space/queries.py deleted file mode 100644 index 8f5c94f..0000000 --- a/space/queries.py +++ /dev/null @@ -1,42 +0,0 @@ -''' -operatorQuery prompts the operator for inputs. -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -# The simplicity of this query implementation does not support -# a or input from the operator, but it is -# anticipated that a GUI-based query could, so exceptions are -# defined for the interface. -from .errors import QueryAbortedError, QueryCanceledError -from .shell import Namespace -# -# This provides a simple console input implementation of the operator -# query. It is expected that an integrated procedure environment -# would provide GUI dialog instead. -# -def operatorQuery(prompt='', **parameters): #Normative - '''Accepts an optional prompt string and keyword=value pairs. - If there are no keyword=value pairs, no values will be requested - from the operator. A default value may be supplied for the keyword - otherwise a value of '' should be used. - If no keyword=value pairs are specified and the prompt string is empty, - the operator will be asked to continue before returning. - Returns an space.shell.Namespace object with values provided by the operator - ''' - namespace = Namespace() - if len(parameters) > 0: - if prompt != '': - print(prompt, '\n') - for name in list(parameters.keys()): - pr_str = 'New value for %s, or for default (%s) ' % (name,parameters[name]) - response = input(pr_str) - if response != '': - namespace.__setattr__(name, response) - else: - namespace.__setattr__(name, parameters[name]) - else: # No values to get - if prompt != '': - response = input(prompt) - else: - response = input(' to continue') - return namespace diff --git a/space/shell.py b/space/shell.py deleted file mode 100644 index 245cebb..0000000 --- a/space/shell.py +++ /dev/null @@ -1,94 +0,0 @@ -'''Shell command line parsing of SOLM Parameters -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -# -import sys -class Namespace(object): - '''A Namespace object stores attributes for parsed input. - ''' - pass -# -def parseArgs(progname, description='', parameters=[], args=None): - '''Parse the input arguments according to the parameter list - uses sys.argv[1:] if no arguments are supplied. Returns a - Namespace object with attributes named after the supplied list - of parameters. - ''' - if args is None: - args = sys.argv[1:] - parser = Parser(progname, description, parameters) - return parser.parse(args) -# -class Parser(object): - '''Internal class for command line parse - ''' - def __init__(self, progname, description, parameters): - self.progname = progname - self.description= description - self.result = Namespace() - self.parameters = parameters - self.parms = dict() - for parm in parameters: - self.result.__setattr__(parm.name, parm.value_) - self.parms[parm.name] = parm - def parse(self, args): - positional = True # Assume the parameters are positional - index = 0 - for arg in args: - if positional and index >= len(self.parms): - self.error('extra argument %s' % arg) - if arg.startswith('--'): - positional = False - equals = arg.find('=') - if equals < 0: - if arg == '--help': - self.print_usage() - sys.exit(0) - else: - argname = arg - value = '' - else: - argname = arg[0:equals] - value = arg[equals+1:] - name = argname[2:] - if name in self.parms: - self.parseValue(self.parms[name], value) - else: - self.error('unrecognized argument %s' % argname) - elif positional: - self.parseValue(self.parameters[index], arg) - index += 1 - else: - self.error('cannot use positional after keyword argument') - self.checkComplete() - return self.result - def parseValue(self, parm, value): - #save old value so we don't change callers parameters - oldval = parm.value_ - try: - parm.setValue(value) - except Exception as e: - self.error('{0} for {1}'.format(e, parm.name)) - setattr(self.result, parm.name, parm.value_) - parm.value_ = oldval - def checkComplete(self): - for parm in self.parameters: - if getattr(self.result, parm.name) is None: - self.error('missing parameter %s' % parm.name) - def usage(self): - guide = self.progname - for parm in self.parameters: - guide += ' --%s=<%s>' % (parm.name, parm.dType) - guide += '\n' - return guide - def error(self, message): - sys.stderr.write(message) - sys.stderr.write('\n') - self.print_usage() - sys.exit(2) - def print_usage(self): - sys.stderr.write(self.usage()) - sys.stderr.write(self.description) - - diff --git a/space/system.py b/space/system.py deleted file mode 100644 index 8b68d88..0000000 --- a/space/system.py +++ /dev/null @@ -1,153 +0,0 @@ -''' -The System module implements the ProcedureEnvironment methods of SOLM -''' -__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' -__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' -import re -import inspect -# -from space import log -from .times import SpecificTime -from .errors import VerifyError -from . import loader -from . import links -from . import gems -# SpacePython script return values -FAILED = -1 # Script failure return -SUCCESSFUL = 0 # Script success return - -class NativeProcedure(object): - '''Class to emulate a loaded SpacePython module for native procedures - ''' - def __init__(self, name='noname', version='1.0.0' ): - self.__doc__ = 'Native Procedure' - self.__scriptname__ = name - self.__version__ = version - self.__duration__ = None - self.__modified__ = None - self.__parser__ = None - - def invoke(self, args): - '''Internal function to log native procedure name and calling arguments - ''' - log.info('Invoking native procedure {0}'.format(self.__scriptname__)) - if args is not None: - logstr = ' with arguments (' - for name, value in args.__dict__.items(): - logstr += '{0}={1}'.format(name, value) - logstr += ')' - log.info(logstr) - -def verify(boolean): #Normative - '''Verify(boolean) - Returns True if boolean value is True, - A False raises an exception which must be caught by procedure - if the procedure is to continue - ''' - # The reasons for calling verify rather than a simple - # if not boolean: - # raise Exception - # is that it allows the TT&C system to log the verification step and - # provides a short-hand notation - frame = inspect.stack()[1] # Get the stack frame of the caller - line = frame[0].f_lineno # Get the local variables of the caller - log.info('Verify at line %d is %s' % (line, str(boolean))) - del frame - if not boolean: - raise VerifyError('Verify at line %d is False' % line) - return True - -def now(): #Normative - '''Return the current time as a SpecificTime - ''' - return SpecificTime.now() - -def today(): #Normative - '''Return the first valid time of the current day as - a SpecificTime. This can be used with TimeIntervals to - calculate a relative time. - ''' - return SpecificTime.today() - -def lookupParameter(name): #Normative - '''Lookup a parameter associated with the control system - ''' - return None - -def parameters(regexp='', subsystem=''): #Normative - '''Return a list of control system parameter names with names - passing the regexp filter and in the specified subsystem. The - default values result in a list of all parameters, which is not - recommended due to the potential list size - ''' - return [] - -def links(regexp=''): #Normative - '''Return a list of defined SpaceSystems with names passing the regexp filter. - The default value results in list of all SpaceSystems with telemetry, - command, and/or procedure definitions. - ''' - keys = list(links.links_.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - -def activeLinks(): #Normative - '''Return a dict() of SpaceSystem Links and/or Downlinks that are - currently active, with SpaceSystem names as keys - ''' - return links.links_ - -def equipment(regexp=''): #Normative - '''Return a list of GEMS devices with names passing the regexp filter. - The default value results in a list of all devices. - ''' - keys = list(gems.devices_.keys()) - if regexp != '': - does_it = re.compile(regexp) - keys = list(filter(does_it.match, keys)) - return keys - -def activeEquipment(): #Normative - '''Return a dict() of GemsDevice objects that are - currently active, with the equipment names as keys - ''' - return gems.devices_ - -def lookupDirective(name): #Normative - '''Return the system specific directive with the specified name or None - ''' - return None - -def directives(regexp=''): #Normative - '''Return a list of control system specific directives passing the - regexp filter. The default value results in a list of all - specific directives. - ''' - return [] - -def procedures(regexp='', spaceSystem=''): #Normative - '''Return a list of procedures passing the regexp filter associated - with the specified SpaceSystem. The default values return a list - of all procedures that are general, i.e. not specific to a - SpaceSystem - ''' - return [] - -def loadProcedure(name, spaceSystem=''): #Normative - '''Loads a named procedure from the procedure catalog. If spaceSystem - is provided, spaceSystem-specific procedures will be searched first - ''' -# -# Current implementation only emulates a native procedure execution -# - return NativeProcedure(name) - -# -# The following code initializes a set of mappings -# for testing the framework with simple scripts. This interface is non-normative. -# It is expected that the framework code will be modified to access the command, telemetry, -# and equipment lists directly from the ground system software - -loader.loadFromYaml() diff --git a/space/__init__.py b/src/space/__init__.py similarity index 52% rename from space/__init__.py rename to src/space/__init__.py index 8151273..c21334f 100644 --- a/space/__init__.py +++ b/src/space/__init__.py @@ -1,4 +1,5 @@ -'''The space package defines SpacePython, a high level interface to a +''' +The space package defines SpacePython, a high level interface to a Spacecraft Operations Center for spacecraft monitoring and control. The scripts included in the package exercise the normative interfaces for SpacePython and should be runnable by any SpacePython-compliant @@ -8,9 +9,12 @@ databases. import logging log = logging.getLogger(__name__) # +from .assets import Asset +# +from .constants import SUCCESSFUL, FAILED, MixedFlagValue, MixedArgumentValue, MixedParameterValue, NullableMixedParameterValue, isSupportedParameterType, getParameterFunction +# from .errors import SpacePythonException -from .errors import GemsError -from .errors import IllegalLinkError +from .errors import IllegalAssetError from .errors import IllegalValueError from .errors import QueryCanceledError from .errors import QueryAbortedError @@ -21,39 +25,18 @@ from .errors import UnknownParameterError from .errors import VerificationError from .errors import VerifyError # -from .commands import CommandRequest from .commands import Command # from .gems import GemsDevice -# -from .gemsdir import GemsDirective -# from .links import Link -from .links import Downlink -# -from .parameters import Parameter -from .parameters import GemsParameter -from .parameters import XtceParameter -from .parameters import GroundParameter -# -from .queries import operatorQuery -# -from .shell import Namespace, parseArgs -# -from .system import activeEquipment -from .system import activeLinks -from .system import directives -from .system import equipment -from .system import FAILED -from .system import links -from .system import loadProcedure -from .system import lookupDirective -from .system import lookupParameter -from .system import now -from .system import parameters -from .system import procedures -from .system import SUCCESSFUL -from .system import today +from .parameters import Parameter, Restriction +from .space_pythons import SpacePython, spacePython +from .space_queries import SpaceQuery, spaceQuery, operatorQuery +from .procedures import Procedure +from .procedure_engines import ProcedureEngine +# +from .shell import ParserParameter, parseArgs +# from .system import verify # from .times import SpecificTime diff --git a/src/space/assets.py b/src/space/assets.py new file mode 100644 index 0000000..08a34b9 --- /dev/null +++ b/src/space/assets.py @@ -0,0 +1,122 @@ +''' +Asset represents space and ground assets within the control system. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from typing import Any +from .parameters import Parameter +from .commands import Command +from .constants import MixedFlagValue + +class Asset(ABC): #Normative + ''' + Asset represents space and ground assets within the control system. + ''' + + @abstractmethod + def lookupParameter(self, parameterName:str) -> Parameter | None: + '''Lookup a parameter associated with this Asset + + :param self: Self reference + :type self: + :param parameterName: Parameter name + :type parameterName: str + ''' + pass + + @abstractmethod + def findParameters(self, regexp:str='') -> list[Parameter]: + '''Return a list of parameters with names passing the regexp filter. + The default value results in a list of all parameters, which is not + recommended due to the potential list size + + :param self: Self reference + :type self: + :param regexp: Parameter name regular expression + :type regexp: str + ''' + pass + + @abstractmethod + def updateParameters(self, parameterList:list[str | Parameter]=[]) -> None: + '''Refreshes the parameter values. If implementing system always provides + latest value, this function may be a no-op. Refreshing values does not ensure + that the value or any aspects of the sample has changed. + + :param self: Self reference + :type self: + :param parameterList: List of parameters + :type parameterList: list[str | Parameter] + ''' + pass + + @abstractmethod + def setParameters(self, **valueMap:Any) -> None: #dict[str, MixedParameterValue] + '''Sets the provided parameters, provided via a dictionary, with key of + parameter name and value representing the new finished value. + + :param self: Self reference + :type self: + :param valueMap: Keyword arguments of parameters + :type valueMap: Any + ''' + pass + + @abstractmethod + def lookupCommand(self, commandName:str) -> Command | None: + '''Lookup a command associated with this Asset + + :param self: Self reference + :type self: + :param commandName: Command name + :type commandName: str + ''' + pass + + @abstractmethod + def findCommands(self, regexp:str='') -> list[Command]: + '''Return a list of commands with names passing the regexp filter. + The default value results in a list of all commands, which is not + recommended due to the potential list size. + + :param self: Self reference + :type self: + :param regexp: Command name regular expression + :type regexp: str + ''' + pass + + @abstractmethod + def send(self, command:Command | str, _flags:dict[str, MixedFlagValue]=dict(), **args:Any) -> None: + '''Send a command to the Asset. + send is called with a command name, flags, and optional keyword=value arguments + + :param self: Self reference + :type self: + :param command: Command name or object + :type command: + :param _flags: Flags for the command + :type _flags: + :param args: Keyword of command arguments + :type args: Any + ''' + pass + + @abstractmethod + def name(self) -> str: + '''Returns the name of the Asset. + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def state(self) -> str: + '''Return the current state of the Asset. + + :param self: Self reference + :type self: + ''' + pass diff --git a/src/space/commands.py b/src/space/commands.py new file mode 100644 index 0000000..9db0a82 --- /dev/null +++ b/src/space/commands.py @@ -0,0 +1,44 @@ +''' +Commands are sent via an Asset +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from typing import Any +from .constants import MixedFlagValue + +class Command(ABC): #Normative + ''' + The Command class incorporates the command name a list of typed + range-limited arguments. + ''' + @abstractmethod + def setValues(self, **args:Any) -> None: + '''Set the argument values from the Keyword=Value pairs passed + + :param self: Self reference + :type self: + :param args: Keywords of command arguments (name=value) + :type args: Any + ''' + pass + + @abstractmethod + def name(self) -> str: + '''Returns the name of the command + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def send(self, _flags:dict[str, MixedFlagValue]=dict()) -> None: + '''Send the command to the asset with the defined argument values + + :param self: Self reference + :type self: + :param _flags: Flags for the command (optional) + :type _flags: + ''' + pass diff --git a/src/space/console_queries.py b/src/space/console_queries.py new file mode 100644 index 0000000..ff2c005 --- /dev/null +++ b/src/space/console_queries.py @@ -0,0 +1,28 @@ +''' +Console Query is an implementation of spaceQuery for command-line input. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from typing import Any +from .space_queries import SpaceQuery +from .constants import MixedParameterValue + +class ConsoleQuery(SpaceQuery): + ''' + Console Query is an implementation of spaceQuery for command-line input. + ''' + + def operatorQuery(self, prompt:str='', **parameters:Any) -> dict[str, MixedParameterValue]: + '''Prompts the user for input utilizing a command line prompt. + + :param self: Self reference + :type self: + :param prompt: Prompt for the interaction + :type prompt: str + :param parameters: Keywords representing each input as part of this prompt interaction + :type parameters: Any + ''' + res:dict[str, MixedParameterValue] = dict() + res["result"] = input(prompt + ": ") + + return res diff --git a/src/space/constants.py b/src/space/constants.py new file mode 100644 index 0000000..02c9a2d --- /dev/null +++ b/src/space/constants.py @@ -0,0 +1,66 @@ +''' +This file contains constants and support functions for Space Python. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from datetime import datetime +from typing import Callable, Any#, Union +from .times import SpecificTime, TimeInterval + +FAILED = -1 # Failure +SUCCESSFUL = 0 # Success + +# for parameters (and procedure arguments) +type MixedParameterValue = int | float | str | datetime +type NullableMixedParameterValue = MixedParameterValue | None + +# for command arguments +type MixedArgumentValue = int | float | str + +# for flags +type MixedFlagValue = int | float | str + +# unsigned function +def unsigned(value:MixedParameterValue) -> int: + ''' + Handles conversion of number to an unsigned representation + + :param value: Input field + :type value: MixedParameterValue + ''' + # handle specific time being passed into unsigned function + if isinstance(value, datetime): + value = 0 + + i = int(value) + if i < 0: + i = -i + return i + +VALID_PARAMETER_TYPES:dict[str, Any] = { 'boolean': bool, 'byte': int, 'ubyte': unsigned, \ + 'short': int, 'ushort': unsigned, 'int': int, \ + 'uint': unsigned, 'long': int, 'ulong': unsigned, \ + 'float': float, 'double': float, 'string': str, \ + 'posixTime': SpecificTime.fromStr, 'hexBitField': int, \ + 'uTime': SpecificTime.fromStr, 'interval': TimeInterval.fromStr } + +def isSupportedParameterType(type:str) -> bool: + '''Returns if the provided data type is supported by Space Python. + + :param type: Data type + :type type: str + ''' + if VALID_PARAMETER_TYPES.get(type, None) != None: + return True + return False + +def getParameterFunction(type:str) -> Callable[[Any], Any]: + '''Returns a conversion function for the provided parameter type. + Expected to be invoked with a single argument that takes the input data and + returns the converted value. + + :param type: Data Type + :type type: str + ''' + + return VALID_PARAMETER_TYPES.get(type, None) \ No newline at end of file diff --git a/space/errors.py b/src/space/errors.py similarity index 89% rename from space/errors.py rename to src/space/errors.py index 3b83c89..ffbd2da 100644 --- a/space/errors.py +++ b/src/space/errors.py @@ -21,12 +21,8 @@ class VerificationError(SpacePythonException): #Normative '''Command action failed verification telemetry check ''' pass -class GemsError(SpacePythonException): #Normative - '''An error occurred accessing a GEMS device - ''' - pass -class IllegalLinkError(SpacePythonException): #Normative - '''The referenced Link does not exist or does not accept commands +class IllegalAssetError(SpacePythonException): #Normative + '''The referenced Asset does not exist ''' pass class UndefinedTypeError(SpacePythonException): #Normative diff --git a/src/space/gems.py b/src/space/gems.py new file mode 100644 index 0000000..5234821 --- /dev/null +++ b/src/space/gems.py @@ -0,0 +1,15 @@ +''' +The GemsDevice class is a legacy support class that maps to Asset objects. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from .assets import Asset +from .space_pythons import spacePython + +def GemsDevice(device_name:str) -> Asset: + '''The GemsDevice function is a legacy support method that creates Asset objects. + + :param device_name: Device name + :type device_name: str + ''' + return spacePython().lookupAsset(device_name) diff --git a/src/space/links.py b/src/space/links.py new file mode 100644 index 0000000..87c2da6 --- /dev/null +++ b/src/space/links.py @@ -0,0 +1,15 @@ +''' +The Links class is a legacy support class that maps to Asset objects. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from .assets import Asset +from .space_pythons import spacePython + +def Link(link_name:str) -> Asset: + '''The Link function is a legacy support method that creates Asset objects. + + :param link_name: Link name + :type link_name: str + ''' + return spacePython().lookupAsset(link_name) diff --git a/src/space/native_procedures.py b/src/space/native_procedures.py new file mode 100644 index 0000000..819ea55 --- /dev/null +++ b/src/space/native_procedures.py @@ -0,0 +1,30 @@ +''' +The NativeProcedure class provides support for launching other procedures. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from .procedures import Procedure +from .constants import MixedParameterValue + +class NativeProcedure(Procedure): #Normative + '''The NativeProcedure class provides support for launching other procedures. + ''' + def __init__(self, name:str=''): + ''' + Constructor for Native Procedures + + :param self: Self reference + :type self: + :param name: Name of the procedure + :type name: str ''' + self._name = name + + def invoke(self, args:dict[str, MixedParameterValue]) -> None: + '''Method that executes the procedure with provided arguments + + :param self: Self reference + :type self: + :param args: Keywords of procedure arguments + :type args: dict[str, MixedParameterValue] + ''' + print("Running procedure " + self._name) \ No newline at end of file diff --git a/src/space/parameters.py b/src/space/parameters.py new file mode 100644 index 0000000..a15fcdc --- /dev/null +++ b/src/space/parameters.py @@ -0,0 +1,750 @@ +''' +Parameters are values sampled/known to an Asset or defined within the procedure +language implementation. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from typing import Any +import re +from .times import SpecificTime, TimeInterval +from datetime import datetime, timedelta +import calendar #Needed for time conversions +from datetime import datetime +from .constants import MixedParameterValue, NullableMixedParameterValue + +class Parameter(ABC): #Normative + ''' Represents all parameters within SpacePython + ''' + @abstractmethod + def value(self) -> NullableMixedParameterValue: + '''Return the current value of the Parameter or None if no value has been + reported + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def raw(self) -> NullableMixedParameterValue: + '''Return the current raw value of the Parameter or None if no value has + been reported + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def name(self) -> str: + '''Returns the name of the Parameter. + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def setValue(self, value: MixedParameterValue) -> None: + '''Set the value of the Parameter, validating against any restrictions. + This method will raise an exception if the new value does not meet the + restrictions on the Parameter value. + + :param self: Self reference + :type self: + :param value: Value + :type value: MixedParameterValue + ''' + pass + + @abstractmethod + def sample(self) -> dict[str, MixedParameterValue]: + '''Return a dictionary of information about current sample, including + time and values. + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def type(self) -> str: + '''Returns the data type of the Parameter. + + :param self: Self reference + :type self: + ''' + pass + +# Instances of the Restriction can be added to a Parameter so that value changes can be +# validated +class Restriction(object): + '''Base class for all value restrictions. + ''' + def __init__(self): + ''' + Restriction constructor + + :param self: Self reference + :type self: + ''' + pass + + @abstractmethod + def validate(self, value:Any) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: Any + ''' + pass + +class EnumerationR(Restriction): + '''Limits a string Parameter to a list of values + ''' + def __init__(self, names:list[str]=list()): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param names: List of values + :type names: list[str] + ''' + self.names = names + def validate(self, value:str) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: str + ''' + try: + self.names.index(value) + return True + except: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'EnumerationR({0})'.format(self.names) + +class FractionDigitsR(Restriction): + '''Restricts the number of digits after the decimal for a float Parameter. + Not really a limit on the value but could be used to control conversions to + and from a string + ''' + def __init__(self, length:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param length: Length limit + :type length: int + ''' + self.length = length + def validate(self, value: int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: + ''' + return True + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'FractionDigitsR({0})'.format(self.length) + +class LengthR(Restriction): + '''Requires a string Parameter to have a specific length + ''' + def __init__(self, length:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param length: Numeric length + :type length: int + ''' + self.length = length + def validate(self, value: str) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: str + ''' + if len(value) == self.length: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'LengthR({0})'.format(self.length) + +class MaxExclusiveR(Restriction): + '''Requires that an integer or floating parameter be less than a value + ''' + def __init__(self, maxVal: int | float): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maximum numeric value + :type maxVal: int | float + ''' + self.maxVal = maxVal + def validate(self, value: int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: + ''' + if value < self.maxVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxExclusiveR({0})'.format(self.maxVal) + +class MaxInclusiveR(Restriction): + '''Requires that an integer or floating parameter not exceed a value + ''' + def __init__(self, maxVal:int | float): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maximum numeric value + :type maxVal: + ''' + self.maxVal = maxVal + def validate(self, value:int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: + ''' + if value <= self.maxVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxInclusiveR({0})'.format(self.maxVal) + +class MaxLengthR(Restriction): + '''Requires that a string Parameter not exceed a specified length + ''' + def __init__(self, length:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param length: Maximum length + :type length: int + ''' + self.length = length + def validate(self, value:str) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: str + ''' + if len(value) <= self.length: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxLengthR({0})'.format(self.length) + +class MaxSecondsExclusiveR(Restriction): + '''Requires that the seconds portion of a time Parameter not be less than a + value + ''' + def __init__(self, maxVal:int | float): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maximum number of seconds + :type maxVal: + ''' + self.maxVal = maxVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, TimeInterval) or isinstance(value, timedelta): + # TimeInterval and datetime.timedelta will return a total_seconds + seconds = value.total_seconds() + else: + # SpecificTime and datetime.datetime need to be converted to seconds + tt = value.utctimetuple() + seconds = calendar.timegm(tt) + if seconds < self.maxVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxSecondsExclusiveR({0})'.format(self.maxVal) + +class MaxSecondsInclusiveR(Restriction): + '''Requires that the nanoseconds portion of a time Parameter not exceed a + value + ''' + def __init__(self, maxVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maxiumum nanoseconds value + :type maxVal: int + ''' + self.maxVal = maxVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, TimeInterval) or isinstance(value, timedelta): + # TimeInterval and datetime.timedelta will return a total_seconds + seconds = value.total_seconds() + else: + # SpecificTime and datetime.datetime need to be converted to seconds + tt = value.utctimetuple() + seconds = calendar.timegm(tt) + if seconds <= self.maxVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxSecondsInclusiveR({0})'.format(self.maxVal) + +class MaxNanosR(Restriction): + '''Requires that the nanoseconds portion of a time Parameter not exceed a + value + ''' + def __init__(self, maxVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maximum nanoseconds value + :type maxVal: int + ''' + self.maxVal = maxVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, SpecificTime) or isinstance(value, TimeInterval): + #SpecificTime and TimeInterval have a nanos() method + nanos = value.nanos() + elif isinstance(value, timedelta): + #datetime.timedelta has microseconds + nanos = value.microseconds*1000 + else: + #datetime.datetime has microsecond + nanos = value.microsecond*1000 + if nanos <= self.maxVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MaxNanosR({0})'.format(self.maxVal) + +class MinExclusiveR(Restriction): + '''Requires that an integer or floating parameter be less than a value + ''' + def __init__(self, minVal:int | float): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param minVal: Minimum value + :type minVal: int | float + ''' + self.minVal = minVal + def validate(self, value:int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: int | float + ''' + if value > self.minVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinExclusiveR({0})'.format(self.minVal) + +class MinInclusiveR(Restriction): + '''Requires that an integer or floating parameter not exceed a value + ''' + def __init__(self, minVal:int | float): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param minVal: Minimum value + :type minVal: int | float + ''' + self.minVal = minVal + def validate(self, value:int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: int | float + ''' + if value >= self.minVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinInclusiveR({0})'.format(self.minVal) + +class MinLengthR(Restriction): + '''Requires that a string Parameter not exceed a specified length + ''' + def __init__(self, length:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param length: Minimum string length + :type length: int + ''' + self.length = length + def validate(self, value:str) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: str + ''' + if len(value) >= self.length: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinLengthR({0})'.format(self.length) + +class MinSecondsExclusiveR(Restriction): + '''Requires that the seconds portion of a time Parameter not be less than a + value + ''' + def __init__(self, minVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param minVal: Minimum seconds + :type minVal: int ''' + self.minVal = minVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, TimeInterval) or isinstance(value, timedelta): + # TimeInterval and datetime.timedelta will return a total_seconds + seconds = value.total_seconds() + else: + # SpecificTime and datetime.datetime need to be converted to seconds + tt = value.utctimetuple() + seconds = calendar.timegm(tt) + if seconds > self.minVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinSecondsExclusiveR({0})'.format(self.minVal) + +class MinSecondsInclusiveR(Restriction): + '''Requires that the nanoseconds portion of a time Parameter not exceed a + value + ''' + def __init__(self, minVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param minVal: Minimums seconds + :type minVal: int + ''' + self.minVal = minVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, TimeInterval) or isinstance(value, timedelta): + # TimeInterval and datetime.timedelta will return a total_seconds + seconds = value.total_seconds() + else: + # SpecificTime and datetime.datetime need to be converted to seconds + tt = value.utctimetuple() + seconds = calendar.timegm(tt) + if seconds >= self.minVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinSecondsInclusiveR({0})'.format(self.minVal) + +class MinNanosR(Restriction): + '''Requires that the nanoseconds portion of a time Parameter not exceed a + value + ''' + def __init__(self, minVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param minVal: Minimum nanoseconds value + :type minVal: int + ''' + self.minVal = minVal + def validate(self, value:TimeInterval | timedelta | SpecificTime | datetime) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: TimeInterval | timedelta | SpecificTime | datetime + ''' + if isinstance(value, SpecificTime) or isinstance(value, TimeInterval): + #SpecificTime and TimeInterval have a nanos() method + nanos = value.nanos() + elif isinstance(value, timedelta): + #datetime.timedelta has microseconds + nanos = value.microseconds*1000 + else: + #datetime.datetime has microseconds + nanos = value.microsecond*1000 + if nanos >= self.minVal: + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'MinNanosR({0})'.format(self.minVal) + +class PatternR(Restriction): + '''Requires that a string Parameter match a specified pattern + ''' + def __init__(self, pattern:str): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param pattern: Regular expression pattern + :type pattern: str + ''' + self.pattern = pattern + self.re = re.compile(pattern) + def validate(self, value:str) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: str + ''' + if self.re.match(value): + return True + else: + return False + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'PatternR({0})'.format(self.pattern) + +class TotalDigitsR(Restriction): + '''Restricts the total number of digits for a float or integer Parameter. + Not really a limit on the value but could be used to control conversions to + and from a string + ''' + def __init__(self, maxVal:int): + ''' + Restriction constructor + + :param self: Self reference + :type self: + :param maxVal: Maximum number of digits + :type maxVal: int + ''' + self.maxVal = maxVal + def validate(self, value:int | float) -> bool: + ''' + Validation function + + :param self: Self reference + :type self: + :param value: Input value to check + :type value: int | float + ''' + return True + def __repr__(self) -> str: + ''' + Returns class representation + + :param self: Self reference + :type self: + ''' + return 'TotalDigitsR({0})'.format(self.maxVal) diff --git a/src/space/procedure_engines.py b/src/space/procedure_engines.py new file mode 100644 index 0000000..cf7b5b1 --- /dev/null +++ b/src/space/procedure_engines.py @@ -0,0 +1,40 @@ +''' +Procedure Engines allow for access to procedures and sub-procedures. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from .procedures import Procedure + +class ProcedureEngine(ABC): #Normative + '''Procedure Engines allow for access to procedures and sub-procedures. + ''' + @abstractmethod + def loadProcedure(self, name:str, spaceSystem:str='') -> Procedure: + '''Loads a named procedure from the procedure catalog. If spaceSystem + is provided, spaceSystem-specific procedures will be searched first. + + :param self: Self reference + :type self: + :param name: Name of the procedure + :type name: str + :param spaceSystem: Procedure related system + :type spaceSystem: str + ''' + pass + + @abstractmethod + def findProcedures(self, regexp:str='', spaceSystem:str='') -> list[Procedure]: + '''Return a list of procedures passing the regexp filter associated + with the specified SpaceSystem. The default values return a list + of all procedures that are general, i.e., not specific to a + SpaceSystem + + :param self: Self reference + :type self: + :param regexp: Regular expression of procedure names + :type regexp: str + :param spaceSystem: Procedure related system + :type spaceSystem: str + ''' + pass diff --git a/src/space/procedures.py b/src/space/procedures.py new file mode 100644 index 0000000..d7ba353 --- /dev/null +++ b/src/space/procedures.py @@ -0,0 +1,21 @@ +''' +Procedure represents a procedure within the procedure engine. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from .constants import MixedParameterValue + +class Procedure(ABC): #Normative + '''Procedure represents a procedure within the procedure engine. + ''' + @abstractmethod + def invoke(self, args:dict[str, MixedParameterValue]) -> None: + '''Invoke the procedure to execute with provided arguments. + + :param self: Self reference + :type self: + :param args: Keyword arguments to pass into procedure + :type args: dict[str, MixedParameterValue] + ''' + pass \ No newline at end of file diff --git a/src/space/shell.py b/src/space/shell.py new file mode 100644 index 0000000..72c4dd4 --- /dev/null +++ b/src/space/shell.py @@ -0,0 +1,251 @@ +''' +Shell command line parsing of SOLM Parameters +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +# +import sys +from space import Parameter, MixedParameterValue, NullableMixedParameterValue +from typing import Any +from datetime import datetime +from .parameters import Parameter + +class ParserParameter(Parameter): + '''ParserParameter implements Parameters as used for the procedure execution invoke method. + ''' + def __init__(self, name:str, dataType:str, **flags:Any): + ''' + ParserParameter constructor + + :param self: Self reference + :type self: + :param name: Parameter name + :type name: str + :param dataType: Data type + :type dataType: str + :param flags: Keywords of flags + :type flags: Any + ''' + self._name = name + self._type = dataType + self._value = None + + def value(self) -> NullableMixedParameterValue: + ''' + Return the current value of the Parameter or None if no value has been + reported + + :param self: Self reference + :type self: + ''' + return self._value + + def raw(self) -> NullableMixedParameterValue: + '''Return the current raw value of the Parameter or None if no value has + been reported + + :param self: Self reference + :type self: + ''' + return self._value + + def name(self) -> str: + '''Returns the name of the Parameter. + + :param self: Self reference + :type self: + ''' + return self._name + + def setValue(self, value:MixedParameterValue): + '''Set the value of the Parameter, validating against any restrictions. + This method will raise an exception if the new value does not meet the + restrictions on the Parameter value. + + :param self: Self reference + :type self: + :param value: Value + :type value: MixedParameterValue + ''' + self._value = value + + def sample(self) -> dict[str, MixedParameterValue]: + '''Return a dictionary of information about current sample, including + time and values. + + :param self: Self reference + :type self: + ''' + return dict() + + def type(self) -> str: + '''Returns the data type of the Parameter. + + :param self: Self reference + :type self: + ''' + return self._type + +def parseArgs(progname:str, description:str | None='', parameters:list[ParserParameter]=[], args:list[str] | None = None) -> dict[str, MixedParameterValue]: + '''Parse the input arguments according to the parameter list + uses sys.argv[1:] if no arguments are supplied. Returns a + dictionary object with attributes named after the supplied list + of parameters. + + :param progname: Program name + :type progname: str + :param description: Program description + :type description: str | None + :param parameters: List of application definitions + :type parameters: list[ParserParameter] + :param args: List of program arguments + :type args: list[str] | None + ''' + if args is None: + args = sys.argv[1:] + parser = Parser(progname, description, parameters) + return parser.parse(args) + +# +class Parser(object): + '''Internal class for command line parse + ''' + def _int_null(self, arg:NullableMixedParameterValue) -> int: + ''' + Returns current value of zero if non-numeric + + :param self: Self reference + :type self: + :param arg: Input field to cast + :type arg: NullableMixedParameterValue + ''' + # handle special case for null + if arg == None: + return 0 + if isinstance(arg, datetime): + return 0 + return int(arg) + def __init__(self, progname:str, description:str|None, parameters:list[ParserParameter]): + ''' + Parser constructor + + :param self: Self reference + :type self: + :param progname: Program name + :type progname: str + :param description: Program description + :type description: + :param parameters: List of parameters + :type parameters: list[ParserParameter] + ''' + self.progname = progname + self.description= description + self.result:dict[str, MixedParameterValue] = dict() + self.parameters = parameters + self.parms:dict[str, Parameter] = dict() + for parm in parameters: + if parm.value() != None: + self.result[parm.name()] = self._int_null(parm.value()) + self.parms[parm.name()] = parm + def parse(self, args:list[str]) -> dict[str, MixedParameterValue]: + ''' + Performs parsing using provided arguments. + + :param self: Self reference + :type self: + :param args: Arguments for parser + :type args: list[str] + ''' + positional = True # Assume the parameters are positional + index = 0 + for arg in args: + if positional and index >= len(self.parms): + self.error('extra argument %s' % arg) + if arg.startswith('--'): + positional = False + equals = arg.find('=') + if equals < 0: + if arg == '--help': + self.print_usage() + sys.exit(0) + else: + argname = arg + value = '' + else: + argname = arg[0:equals] + value = arg[equals+1:] + name = argname[2:] + + if name in self.parms.keys(): + self.parseValue(self.parms[name], value) + else: + self.error('unrecognized argument %s' % argname) + elif positional: + self.parseValue(self.parameters[index], arg) + index += 1 + else: + self.error('cannot use positional after keyword argument') + self.checkComplete() + return self.result + def parseValue(self, parm:Parameter, value:MixedParameterValue): + ''' + Attempt to apply provided value to provided parameter + + :param self: Self reference + :type self: + :param parm: Parameter + :type parm: Parameter + :param value: Value to assign to parameter + :type value: MixedParameterValue + ''' + try: + parm.setValue(value) + except Exception as e: + self.error('{0} for {1}'.format(e, parm.name())) + if parm.value() != None: + self.result[parm.name()] = self._int_null(parm.value()) + def checkComplete(self): + ''' + Finalize parsing after parse is completed. Will throw error if required arguments were not provided. + + :param self: Self reference + :type self: + ''' + for parm in self.parameters: + if not parm.name() in self.result: + self.error('missing parameter %s' % parm.name()) + def usage(self) -> str: + ''' + Returns application usage information. + + :param self: Self reference + :type self: + ''' + guide = self.progname + for parm in self.parameters: + guide += ' --%s=<%s>' % (parm.name(), parm.type()) + guide += '\n' + return guide + def error(self, message:str) -> None: + ''' + Outputs error message to standard error. + + :param self: Self reference + :type self: + :param message: Error message + :type message: str + ''' + sys.stderr.write(message) + sys.stderr.write('\n') + self.print_usage() + sys.exit(2) + def print_usage(self) -> None: + ''' + Outputs application usage and application description to standard error. + + :param self: Self reference + :type self: + ''' + sys.stderr.write(self.usage()) + if self.description != None: + sys.stderr.write(self.description) diff --git a/src/space/silent_queries.py b/src/space/silent_queries.py new file mode 100644 index 0000000..cf0ca00 --- /dev/null +++ b/src/space/silent_queries.py @@ -0,0 +1,26 @@ +''' +Silent Query suppresses all operator input as necessary in a background application +where no mechanism to prompt user is available. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm)' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from typing import Any +from .space_queries import SpaceQuery +from .constants import MixedParameterValue + +class SilentQuery(SpaceQuery): + '''SilentQuery does not prompt and always returns an empty set + ''' + + def operatorQuery(self, prompt:str='', **parameters:Any) -> dict[str,MixedParameterValue]: + '''Prompts the user for input utilizing a command line prompt. + + :param self: Self reference + :type self: + :param prompt: Prompt for the interaction + :type prompt: str + :param parameters: Keywords representing each input as part of this prompt interaction + :type parameters: Any + ''' + res:dict[str, MixedParameterValue] = dict() + return res diff --git a/src/space/space_pythons.py b/src/space/space_pythons.py new file mode 100644 index 0000000..5199aec --- /dev/null +++ b/src/space/space_pythons.py @@ -0,0 +1,96 @@ +''' +Space Python provides functionality to author common procedures that can be executed across multiple control +system software applications. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from importlib import import_module +import os +from .assets import Asset +from .procedure_engines import ProcedureEngine +from .constants import MixedFlagValue + +class SpacePython(ABC): #Normative + ''' + Space Python provides functionality to author common procedures that can be executed across multiple control + system software applications. Each control system implements this class and its supporting classes to provide + integration with their software platform. + ''' + + @abstractmethod + def lookupAsset(self, name:str) -> Asset: + '''Look up a parameter via the provided name. + + :param self: Self reference + :type self: + :param name: Asset name + :type name: str + ''' + pass + + @abstractmethod + def findAssets(self, regexp:str='', _flags:dict[str, MixedFlagValue]=dict()) -> list[Asset]: + '''Return a list of defined SpaceSystems with names passing the regexp filter. + The default value results in list of all Assets. + + :param self: Self reference + :type self: + :param regexp: Regular expression of asset names + :type regexp: str + :param _flags: Flags for the query (optional) + :type _flags: dict[str, MixedFlagValue] + ''' + pass + + @abstractmethod + def procedureEngine(self) -> ProcedureEngine: + '''Returns implementation of Procedure Engine sub-interface + + :param self: Self reference + :type self: + ''' + pass + + @classmethod + def instance(cls, module_name:str='') -> 'SpacePython': + '''Returns an instance of the implementing class of provided module name + or determined based on environment. + + :param cls: Class reference + :type cls: + :param module_name: Module name (optional) + :type module_name: str + ''' + + # if variable is not provided, default to environment variable + if module_name == '': + module_name = os.getenv("SPACEPYTHON_DEFAULT_MODULE", '') + + if module_name == '': + raise Exception('No default SpacePython module defined via $SPACEPYTHON_DEFAULT_MODULE') + + import_module(module_name) + + for c in SpacePython.__subclasses__(): + if(module_name == c.__module__): + return c() + + raise Exception('Factory could not find a suitable SpacePython implementation') + + @classmethod + def availableImplementations(cls) -> list[type['SpacePython']]: + '''Returns a list of all known implementations. + + :param cls: Class reference + :type cls: + ''' + return SpacePython.__subclasses__() + +def spacePython(module_name:str='') -> SpacePython: + '''Factory method to provide an implementation of Space Python. + + :param module_name: Module name (optional) + :type module_name: str + ''' + return SpacePython.instance(module_name) diff --git a/src/space/space_queries.py b/src/space/space_queries.py new file mode 100644 index 0000000..4ddd070 --- /dev/null +++ b/src/space/space_queries.py @@ -0,0 +1,87 @@ +''' +Space Query prompts the operator for inputs. +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from abc import ABC, abstractmethod +from importlib import import_module +import os +from typing import Any +from .constants import MixedParameterValue + +class SpaceQuery(ABC): #Normative + ''' + SpaceQuery provides a common class for displaying prompts for inputs from the operator. + Implementations may exist for different ways that procedures may run, such as via a console + interface, desktop interface, or web interface. + ''' + + @classmethod + def instance(cls, module_name:str='') -> 'SpaceQuery': + '''Returns an instance of the implementing class of provided module name + or determined based on environment. + + :param cls: Class reference + :type cls: + :param module_name: Module name (optional) + :type module_name: str + ''' + + # if variable is not provided, default to environment variable + if module_name == '': + module_name = os.getenv("SPACEQUERY_DEFAULT_MODULE", 'space.console_queries') + + import_module(module_name) + + for c in SpaceQuery.__subclasses__(): + if(module_name == c.__module__): + return c() + + raise Exception('Factory could not find a suitable SpaceQuery implementation') + + @classmethod + def availableImplementations(cls) -> list[type['SpaceQuery']]: + '''Returns a list of all known implementations. + + :param cls: Description + :type cls: + ''' + return SpaceQuery.__subclasses__() + + @abstractmethod + def operatorQuery(self, prompt:str='', **parameters:dict[str, MixedParameterValue]) -> dict[str, MixedParameterValue]: + '''Accepts an optional prompt string and keyword=value pairs. + If there are no keyword=value pairs, no values will be requested + from the operator. A default value may be supplied for the keyword + otherwise a value of '' should be used. + If no keyword=value pairs are specified and the prompt string is empty, + the operator will be asked to continue before returning. + Returns an space.shell.Namespace object with values provided by the operator + + :param self: Self reference + :type self: + :param prompt: Prompt for the interaction + :type prompt: str + :param parameters: Keywords representing each input as part of this prompt interaction + :type parameters: Any + ''' + pass + +def spaceQuery(module_name:str='') -> SpaceQuery: + '''Factory method to provide an implementation of the operatorQuery capability. + + :param module_name: Module name (optional) + :type module_name: str + ''' + return SpaceQuery.instance(module_name) + +def operatorQuery(prompt:str='', **parameters:Any) -> dict[str, MixedParameterValue]: + '''Accessor function that provides default implementation of operatorQuery capability. + + :param prompt: Prompt for the interaction + :type prompt: str + :param parameters: Keywords representing each input as part of this prompt interaction + :type parameters: Any + ''' + instance = spaceQuery().instance() + return instance.operatorQuery(prompt, **parameters) diff --git a/src/space/system.py b/src/space/system.py new file mode 100644 index 0000000..a59445f --- /dev/null +++ b/src/space/system.py @@ -0,0 +1,28 @@ +''' +Implements Space Python language functions +''' +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +import inspect +from .errors import VerifyError + +def verify(boolean:bool) -> bool: #Normative + '''Verify(boolean) + Returns True if boolean value is True, + A False raises an exception which must be caught by procedure + if the procedure is to continue + + :param boolean: Verification function + :type boolean: bool + ''' + # The reasons for calling verify rather than a simple + # if not boolean: + # raise Exception + # is that it allows the TT&C system to log the verification step and + # provides a short-hand notation + frame = inspect.stack()[1] # Get the stack frame of the caller + line = frame[0].f_lineno # Get the local variables of the caller + del frame + if not boolean: + raise VerifyError('Verify at line %d is False' % line) + return True \ No newline at end of file diff --git a/space/times.py b/src/space/times.py similarity index 68% rename from space/times.py rename to src/space/times.py index 06ad8c9..f7e2845 100644 --- a/space/times.py +++ b/src/space/times.py @@ -8,34 +8,56 @@ __copyright__ = 'Object Management Group under RF-Limited license (https://www.o import inspect import datetime, time from .errors import TimeoutError +from typing import Callable class SpecificTime(datetime.datetime): #Normative '''SpecificTime(year, month, day[, hour[, minute[, second[, microsecond]]]) Represents a specific time for timetags and time expressions ''' - def dayOfYear(self): + def dayOfYear(self) -> int: '''Return the day of the year with January 1 as 1. + + :param self: Self reference + :type self: ''' - begin = datetime.datetime(self.year, 1, 1) - diff = self - begin - return (diff.days + 1) - def nanos(self): + return self.toordinal() + def nanos(self) -> int: '''Return the nanoseconds + + :param self: Self reference + :type self: ''' - return self.microseconds*1000 + return self.microsecond*1000 @classmethod - def today(cls): + def today(cls) -> 'SpecificTime': + ''' + Returns an instance for today. + + :param cls: Class member + :type cls: ''' t = datetime.date.today() return cls(t.year, t.month, t.day) @classmethod - def now(cls): + def now(cls, tz:datetime.tzinfo | None = None) -> 'SpecificTime': + ''' + Returns an instance representing the current time. + + :param cls: Class member + :type cls: + :param tz: Time zone + :type tz: ''' t = datetime.datetime.now() return cls(t.year, t.month, t.day, t.hour, t.minute,\ t.second, t.microsecond) @classmethod - def fromStr(cls, strval): + def fromStr(cls, strval:str) -> 'SpecificTime': '''Convert from a string representation to a SpecificTime Expected format:YYYY-MM-DDTHH:MM:SS.NNNNNN + + :param cls: Class member + :type cls: + :param strval: Input time string + :type strval: str ''' strval = strval.strip() if len(strval) <= 10: @@ -46,7 +68,7 @@ class SpecificTime(datetime.datetime): #Normative t = datetime.datetime.strptime(strval, '%Y-%m-%dT%H:%M:%S.%f') return cls(t.year, t.month, t.day, t.hour, t.minute,\ t.second, t.microsecond) - def __str__(self): + def __str__(self) -> str: '''Converts a SpecificTime to the default string format ''' return self.strftime('%Y-%m-%dT%H:%M:%S.%f') @@ -55,18 +77,29 @@ class TimeInterval(datetime.timedelta): #Normative '''TimeInterval([days[, seconds[, microseconds[, milliseconds[, minutes[, hours[, weeks]]]]]]]) Represents a positive (future) or negative (elapsed) relative time interval for time expressions ''' - def asSeconds(self): - ''' Return entire interval as seconds + def asSeconds(self) -> float: + '''Return entire interval as seconds + + :param self: Self reference + :type self: ''' return self.total_seconds() - def nanos(self): - ''' Return nanoseconds in the second + def nanos(self) -> int: + '''Return nanoseconds in the second + + :param self: Self reference + :type self: ''' return self.microseconds*1000 @classmethod - def fromStr(cls, strval): + def fromStr(cls, strval:str) -> 'TimeInterval': '''Convert from a string representation to a TimeInterval Expected format:[s]DTHH:MM:SS.NNNNNNNNN + + :param cls: Class reference + :type cls: + :param strval: Input time string + :type strval: str ''' isNegative = False days = hours = mins = secs = nsecs = 0 @@ -105,8 +138,11 @@ class TimeInterval(datetime.timedelta): #Normative else: dt = cls(days, seconds=secs, microseconds=nsecs/1000) return dt - def __str__(self): + def __str__(self) -> str: '''Converts a TimeInterval to the default string format + + :param self: Self reference + :type self: ''' seconds = self.seconds hours = seconds / 3600 @@ -116,10 +152,17 @@ class TimeInterval(datetime.timedelta): #Normative .format(self.days, hours, minutes, seconds, self.microseconds) -def waitFor(boolean, timeout=5, pollPeriod=0.1): #Normative +def waitFor(boolean:Callable[[],bool], timeout:float=5, pollPeriod:float=0.1) -> bool: #Normative '''Wait for the provided Boolean function to become true Default timeout of 5 seconds and default polling interval of 100 milliseconds is used unless overridden in the call. + + :param boolean: Verifier function + :type boolean: Callable[[],bool] + :param timeout: Timeout for the verifier in seconds + :type timeout: float + :param pollPeriod: Frequency of polling in seconds + :type pollPeriod: float ''' frame = inspect.stack()[1] line = frame[0].f_lineno @@ -132,13 +175,19 @@ is used unless overridden in the call. timeout -= pollPeriod return True -def wait(seconds): #Normative +def wait(seconds:float) -> None: #Normative '''Wait for the specified number of seconds + + :param seconds: Number of seconds to wait + :type seconds: float ''' time.sleep(seconds) -def waitUntil(specificTime): #Normative +def waitUntil(specificTime:SpecificTime) -> None: #Normative '''Wait for a SpecificTime - returns immediately if time is in the past + + :param specificTime: Time reference to wait until + :type specificTime: SpecificTime ''' now = SpecificTime.now() delta = (specificTime - now).total_seconds() diff --git a/scripts/ConfigureFEP.py b/test/ConfigureFEP.py similarity index 58% rename from scripts/ConfigureFEP.py rename to test/ConfigureFEP.py index a5125af..71f1993 100644 --- a/scripts/ConfigureFEP.py +++ b/test/ConfigureFEP.py @@ -1,41 +1,50 @@ #!/usr/bin/python3 ''' This SpacePython module provides an example of a spacecraft - operations procedure that configures a GEMS device and then + operations procedure that configures a device and then verifies the configuration. ''' -from space import TimeInterval, SpecificTime, GemsDevice +from space import TimeInterval, SpecificTime, spacePython from space import wait, verify, VerifyError, FAILED, SUCCESSFUL -__version__ = '1.1.0' +from space import MixedParameterValue, ParserParameter +__version__ = '1.2.0' __author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' __copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' __scriptname__ = 'ConfigureFEP' __duration__ = TimeInterval.fromStr(':3.0') -__modified__ = SpecificTime.fromStr('2024-04-25T12:10') +__modified__ = SpecificTime.fromStr('2024-11-12T12:00') # -def invoke(args): +def invoke(args:dict[str, MixedParameterValue]) -> int: '''Configure the Front-End Equipment for a pass. ''' # -# Connect to the GEMS Device - equipment = GemsDevice('FE1') +# Connect to the device + equipment = spacePython().lookupAsset('FE1') syncword = equipment.lookupParameter('Syncword') + if syncword == None: + print('Syncword was not found') + return FAILED + # Set the synchronization pattern newPattern = 0xC744 - equipment.send('ChangeSync', Syncword=newPattern) + equipment.send('ChangeSync', dict(), Syncword=newPattern) wait(2) # Wait for the change to take effect - equipment.get(['Syncword']) + equipment.updateParameters(['Syncword']) try: verify(syncword.value()==newPattern) print('Sync pattern is {0:#X}'.format(syncword.value())) except VerifyError: - print('Sync pattern is {0:#X} instead of {1:#X}'\ - .format(syncword.value(), newPattern)) + if syncword.value() != None: + print('Sync pattern is {0:#X} instead of {1:#X}'\ + .format(syncword.value(), newPattern)) + else: + print('Sync pattern is (No value) instead of {0:#X}'\ + .format(newPattern)) return FAILED return SUCCESSFUL # # Boilerplate to allow running as a shell script -__parameters__ = [] +__parameters__:list[ParserParameter] = [] # If invoked from the command line, configure logger, parse arguments, and invoke if __name__ == '__main__': import logging diff --git a/scripts/PassSetup.py b/test/PassSetup.py similarity index 64% rename from scripts/PassSetup.py rename to test/PassSetup.py index 7d18584..2f4d823 100644 --- a/scripts/PassSetup.py +++ b/test/PassSetup.py @@ -4,27 +4,27 @@ operations procedure that queries the operator, invokes a native procedure and waits for an expression to become true. ''' -from space import TimeInterval, SpecificTime, Link, operatorQuery -from space import loadProcedure, TimeoutError, waitFor +from space import TimeInterval, SpecificTime, spacePython, operatorQuery +from space import MixedParameterValue, ParserParameter, TimeoutError, waitFor from space import SUCCESSFUL, FAILED -__version__ = '1.1.0' +__version__ = '1.2.0' __author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' __copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' __scriptname__ = 'PassSetup' __duration__ = TimeInterval.fromStr(':30.0') -__modified__ = SpecificTime.fromStr('2024-04-25T12:10') +__modified__ = SpecificTime.fromStr('2024-11-12T12:00') # -def invoke(args): - '''Setup for a pass and wait for Link to come up. +def invoke(arg:dict[str, MixedParameterValue]) -> int: + '''Setup for a pass and wait for connection to be available. ''' # # Invoke the EstablishContact native procedure with the parameter "string" # String selection will be default '1' or supplied by operator - result = operatorQuery('Select RF string for SAT1', string='1') - establishContact = loadProcedure('EstablishContact') + result:dict[str, MixedParameterValue] = operatorQuery('Select RF string for SAT1', string=1) + establishContact = spacePython().procedureEngine().loadProcedure('EstablishContact') establishContact.invoke(result) -# Wait for the link to be established (or timeout) - sat1 = Link('SAT1') +# Wait for the connection to be established (or timeout) + sat1 = spacePython().lookupAsset('SAT1') try: waitFor(lambda:sat1.state()=='UP') except TimeoutError: @@ -33,7 +33,7 @@ def invoke(args): return SUCCESSFUL # # Boilerplate to allow running as a shell script, -__parameters__ = [] +__parameters__:list[ParserParameter] = [] # If invoked from the command line, configure logger, parse arguments, and invoke if __name__ == '__main__': import logging diff --git a/test/README.md b/test/README.md new file mode 100644 index 0000000..61fdd27 --- /dev/null +++ b/test/README.md @@ -0,0 +1,20 @@ +## SpacePython Examples + +Example SpacePython example procedure scripts and an example implementation backing are included in the test directory. + +Test procedures include: + - ConfigureFEP.py + - PassSetup.py + - SetMomentumWheelSpeed.py + +A configuration yaml file is included in the data sub-directory. + +An example implementation backing is provided in the data sub-directory. + +Once the space module has been installed, run the samples with the following example usage: + +``` +SPACEPYTHON_DEFAULT_MODULE=demo.DemoSpacePython python3 ConfigureFEP.py +``` + +This example demonstrates specifying a default module of demo.DemoSpacePython, which is the example implementation backing included. \ No newline at end of file diff --git a/scripts/SetMomentumWheelSpeed.py b/test/SetMomentumWheelSpeed.py similarity index 67% rename from scripts/SetMomentumWheelSpeed.py rename to test/SetMomentumWheelSpeed.py index bc95dbd..92c765d 100644 --- a/scripts/SetMomentumWheelSpeed.py +++ b/test/SetMomentumWheelSpeed.py @@ -7,13 +7,14 @@ ''' # The preceding documentation corresponds to the HeaderComment within # the metamodel. -from space import TimeInterval, SpecificTime, Link -from space import SUCCESSFUL +from space import TimeInterval, SpecificTime, spacePython +from space import SUCCESSFUL, MixedParameterValue, NullableMixedParameterValue, ParserParameter, FAILED +from datetime import datetime # The following are commonly accepted metadata items for python scripts # __version__ is required for SpacePython. # __version__ corresponds to Procedure.version in SOLM # The other metadata items are optional -__version__ = '1.1.0' +__version__ = '1.2.0' __author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' __copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' # The following module metadata are SpacePython required elements @@ -21,7 +22,15 @@ __copyright__ = 'Object Management Group under RF-Limited license (https://www.o # in SOLM __scriptname__ = 'SetMomentumWheelSpeed' __duration__ = TimeInterval(0,5) # 5 seconds -__modified__ = SpecificTime.fromStr('2024-04-25T12:10') +__modified__ = SpecificTime.fromStr('2024-11-12T12:00') + +def int_null(arg:NullableMixedParameterValue): + # handle special case for null + if isinstance(arg, datetime): + return 0 + if arg == None: + return 0 + return int(arg) # # The invoke function is the required signature for a SpacePython @@ -31,7 +40,7 @@ __modified__ = SpecificTime.fromStr('2024-04-25T12:10') # variable argument list, SpacePython invoke() uses the Namespace convention # of a simple object with attributes named with the argument name, for cleaner # reference syntax -def invoke(args): +def invoke(args:dict[str, MixedParameterValue]) -> int: '''Change the current momentum wheel speed by the positive or negative rpm specified by the keyword parameter, SpeedIncrement. @@ -41,10 +50,19 @@ def invoke(args): # procedure, but is required by SpacePython to gain access to Parameters, # Commands, and Directives for the spacecraft, control system, # and equipment managed by the procedure. - sat1 = Link('SAT1') + sat1 = spacePython().lookupAsset('SAT1') MomentumWheelState = sat1.lookupParameter('MomentumWheelState') + if MomentumWheelState == None: + print('MomentumWheelState was not found') + return FAILED MomentumWheelSpeed = sat1.lookupParameter('MomentumWheelSpeed') + if MomentumWheelSpeed == None: + print('MomentumWheelSpeed was not found') + return FAILED setWheelSpeed = sat1.lookupCommand('SetWheelSpeed') + if setWheelSpeed == None: + print('SetWheelSpeed was not found') + return FAILED # # The "core" of the procedure example # @@ -54,16 +72,17 @@ def invoke(args): # if MomentumWheelState.value() == 'Off': sat1.send('MomentumWheelOn') #Simple invocation of named command -# Set an argument value for Command from the Link catalog and send it. - setWheelSpeed.setValues(WheelSpeed=(MomentumWheelSpeed.value() + args.SpeedIncrement)) +# Set an argument value for Command and send it. + if MomentumWheelSpeed.value() != None and not isinstance(args['SpeedIncrement'],datetime): + setWheelSpeed.setValues(WheelSpeed=(int_null(MomentumWheelSpeed.value()) + int(args['SpeedIncrement']))) sat1.send(setWheelSpeed) return SUCCESSFUL # # End of core procedure # # Boilerplate to allow running as a shell script -from space.parameters import Parameter, MinInclusiveR, MaxInclusiveR -__parameters__ = [Parameter('SpeedIncrement', 'int', restriction=[MinInclusiveR(-10000), MaxInclusiveR(10000)])] +from space.parameters import MinInclusiveR, MaxInclusiveR +__parameters__:list[ParserParameter] = [ParserParameter('SpeedIncrement', 'int', restriction=[MinInclusiveR(-10000), MaxInclusiveR(10000)])] # If invoked from the command line, configure logger, parse arguments, and invoke if __name__ == '__main__': import logging diff --git a/data/SpacePythonDataset.yaml b/test/data/SpacePythonDataset.yaml similarity index 87% rename from data/SpacePythonDataset.yaml rename to test/data/SpacePythonDataset.yaml index 0413660..27686ac 100644 --- a/data/SpacePythonDataset.yaml +++ b/test/data/SpacePythonDataset.yaml @@ -4,8 +4,8 @@ ParameterSet: - BATVOLT: [double, {description: "Battery Voltage"} ] - BATTEMP: [double, {description: "Battery temperature"} ] - - MomentumWheelState: [double, {description: "Power state of momentum wheel", value_: On, restriction: !Restriction [Enumeration, [Off, On]]}] - - MomentumWheelSpeed: [int, {description: "Spin rate", units: RPM, value_: 2000}] + - MomentumWheelState: [double, {description: "Power state of momentum wheel", _value: On, restriction: !Restriction [Enumeration, [Off, On]]}] + - MomentumWheelSpeed: [int, {description: "Spin rate", units: RPM, _value: 2000}] CommandSet: - MomentumWheelOn: [] - SetWheelSpeed: [WheelSpeed: [int, {description: "Spin rate", units: RPM, restriction: !Restriction [MaxInclusive, 12000, MinInclusive, -12000]}]] @@ -14,7 +14,7 @@ ParameterSet: - Synced: [boolean,{description: "Link state", restriction: !Restriction [Enumeration, [Locked, Searching, "No Carrier" ]]}] - TlmRate: [double, {description: "Telemetry data rate", units: bps} ] - - Syncword: [int, {value_: 0xA5A5}] + - Syncword: [int, {_value: 0xA5A5}] DirectiveSet: - RadioOn: [] - ChangeSync: [Syncword: [int, {description: "Sync Pattern"}]] diff --git a/test/demo/DemoAsset.py b/test/demo/DemoAsset.py new file mode 100644 index 0000000..05d8ad4 --- /dev/null +++ b/test/demo/DemoAsset.py @@ -0,0 +1,114 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from space import Asset, Parameter, Command, MixedFlagValue, SpacePythonException, log +from typing import Any +import re +from .DemoParameter import DemoParameter +from .DemoCommand import DemoCommand + +class DemoAsset(Asset): + + def __init__(self, name: str, parameters:dict[str, DemoParameter]= dict(), commands:dict[str, DemoCommand] = dict()): + self._name = name + self._parameters:dict[str, DemoParameter] = parameters + + for c in list(commands.values()): + c.setAsset(self) + + self._commands:dict[str, DemoCommand] = commands + + assets_[name] = self + + def lookupParameter(self, parameterName:str) -> Parameter | None: + if parameterName in self._parameters: + return self._parameters[parameterName] + else: + return None + + def findParameters(self, regexp:str='') -> list[Parameter]: + keys = list(self._parameters.keys()) + out_list:list[Parameter] = [] + if regexp != '': + does_it = re.compile(regexp) + keys = list(filter(does_it.match, keys)) + + # build output + for k in keys: + out_list.append(self._parameters[k]) + else: + out_list = list(self._parameters.values()) + return out_list + + def updateParameters(self, parameterList:list[str | Parameter]=[]) -> None: + # This method is expected to cause a poll of the device + # to get current values. If parameters are constantly + # polled by the ground system, then it is an opportunity + # to refresh current values for a running script + if len(parameterList) > 0: + out = 'Getting {0} parameters:'.format(self._name) + for param in parameterList: + out += ' {name}'.format(name=param) + log.info(out) + else: + raise SpacePythonException('No Parameters specified on updateParameters') + + def setParameters(self, **valueMap:Any) -> None: #dict[str, MixedParameterValue] + params = list(valueMap.keys()) + if len(params) > 0: + out = 'Setting {0} parameters:'.format(self._name) + for param in params: + out += ' {name}={value}'.format(name=param, value=valueMap[param]) + log.info(out) + + # set value in local table + for param in params: + p = self.lookupParameter(param) + if p != None: + p.setValue(valueMap[param]) + else: + raise SpacePythonException('No Parameters specified on set') + + def lookupCommand(self, commandName:str) -> Command | None: + if commandName in self._commands: + return self._commands[commandName] + else: + return None + + def findCommands(self, regexp:str='') -> list[Command]: + keys = list(self._commands.keys()) + out_list:list[Command] = [] + if regexp != '': + does_it = re.compile(regexp) + keys = list(filter(does_it.match, keys)) + + # build output + for k in keys: + out_list.append(self._commands[k]) + else: + out_list = list(self._commands.values()) + return out_list + + def __repr__(self): + return "Command('{0}')".format(self.name) + + def send(self, command:Command | str, _flags:dict[str, MixedFlagValue]=dict(), **args:Any) -> None: + cmd = command + if isinstance(cmd, str): + cmd_name = cmd + cmd = self.lookupCommand(cmd) + if cmd == None: + raise SpacePythonException('Provided command {0} does not exist'.format(cmd_name)) + + if args: + cmd.setValues(**args) + + cmd.send() + + def name(self) -> str: + return self._name + + def state(self) -> str: + return "UP" + +# storage of known assets +assets_:dict[str,DemoAsset] = dict() diff --git a/test/demo/DemoCommand.py b/test/demo/DemoCommand.py new file mode 100644 index 0000000..bbfdf6b --- /dev/null +++ b/test/demo/DemoCommand.py @@ -0,0 +1,54 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from space import Command, MixedFlagValue, Parameter, UnknownParameterError, log, MixedParameterValue, Asset +from typing import Any + +class DemoCommand(Command): + + def __init__(self, name:str, asset_name:str, args:dict[str, Parameter]): + self._name = name + self._asset_name = asset_name + self._args = args + + def setAsset(self, asset:Asset) -> None: + self._asset = asset + + def setValues(self, **args:Any) -> None: + print("DemoCommand values set for " + self._name) + + params = list(args.keys()) + for param in params: + if param in self._args: + self._args[param].setValue(args[param]) + else: + raise UnknownParameterError('Specified command argument {0} not defined for {1}' + .format(param, self.name)) + + # also update asset values + newVal:dict[str, MixedParameterValue] = dict() + for param in params: + val = self._args[param].value() + if val != None: + newVal[param] = val + self._asset.setParameters(**newVal) + + def __repr__(self): + return "DemoCommand('{0}')".format(self._name) + + def name(self) -> str: + return self._name + + def send(self, _flags:dict[str, MixedFlagValue]=dict()) -> None: + log.info('Sending {cmd} to asset {sys}'.format(cmd=self._name, sys=self._asset_name)) + params = list(self._args.keys()) + if len(params) > 0: + out = ' Command arguments:' + for param in params: + out += ' {name}={value}'.format(name=param, value=self._args[param]) + log.info(out) + + if len(_flags) > 0: + out = ' Command flags:' + for flag in list(_flags.keys()): + out += ' {name}={value}'.format(name=flag, value=_flags[flag]) + log.info(out) diff --git a/test/demo/DemoParameter.py b/test/demo/DemoParameter.py new file mode 100644 index 0000000..e43fe3c --- /dev/null +++ b/test/demo/DemoParameter.py @@ -0,0 +1,72 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from space import Parameter, MixedParameterValue, NullableMixedParameterValue, isSupportedParameterType, UndefinedTypeError, getParameterFunction, SpecificTime, IllegalValueError, Restriction +from typing import Any + +class DemoParameter(Parameter): + def __init__(self, name:str, type:str='str', **kwds:Any): + self._name = name + if isSupportedParameterType(type): + self._type = type + else: + raise UndefinedTypeError('Could not create parameter of data type <{0}>'.format(type)) + + self._description = '' + self._multiplicity:NullableMixedParameterValue = None + self._restriction:list[Restriction] = [] + self._units = None + self._value:NullableMixedParameterValue = None + self._raw = None + self._time = None + + options = list(kwds.keys()) + for option in options: + if hasattr(self, option): + setattr(self, option, kwds[option]) + + def value(self) -> NullableMixedParameterValue: + return self._value + + def raw(self) -> NullableMixedParameterValue: + return self._raw + + def name(self) -> str: + return self._name + + def setValue(self, value: MixedParameterValue) -> None: + # If the value supplied is not of the specified type, try to convert it using the type converter + value = getParameterFunction(self._type)(value) + for restriction in self._restriction: + if not restriction.validate(value): + raise IllegalValueError('Violates restriction {0}'.format(restriction)) + self._value = value + self._time = SpecificTime.now() + + def sample(self) -> dict[str, MixedParameterValue]: + out:dict[str, MixedParameterValue] = dict() + if self._value != None: + out["value"] = self._value + if self._time != None: + out["time"] = self._time + + return out + + def type(self) -> str: + return self._type + + def __str__(self): + if self._value is not None: + return str(self._value) + else: + return self.__repr__() + + def __repr__(self): + r = 'Parameter({0}, dType={1}'.format(self._name, self._type) + if self._description != '': + r = r + ', description="{0}"'.format(self._description) + if self._multiplicity is not None: + r = r + ', multiplicity={0}'.format(self._multiplicity) + if self._value is not None: + r = r + ', _value={0}'.format(self._value) + r = r + ')' + return r \ No newline at end of file diff --git a/test/demo/DemoProcedure.py b/test/demo/DemoProcedure.py new file mode 100644 index 0000000..a6fcce7 --- /dev/null +++ b/test/demo/DemoProcedure.py @@ -0,0 +1,20 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +from space import Procedure, MixedParameterValue, log + +class DemoProcedure(Procedure): + '''Class to emulate a loaded SpacePython module for native procedures + ''' + def __init__(self, name:str): + self._name = name + + def invoke(self, args:dict[str, MixedParameterValue]) -> None: + '''Internal function to log native procedure name and calling arguments + ''' + log.info('Invoking native procedure {0}'.format(self._name)) + + logstr = ' with arguments (' + for name, value in args.items(): + logstr += '{0}={1}'.format(name, value) + logstr += ')' + log.info(logstr) \ No newline at end of file diff --git a/test/demo/DemoProcedureEngine.py b/test/demo/DemoProcedureEngine.py new file mode 100644 index 0000000..fe51164 --- /dev/null +++ b/test/demo/DemoProcedureEngine.py @@ -0,0 +1,25 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +import re +from space import ProcedureEngine, Procedure +from .DemoProcedure import DemoProcedure + +class DemoProcedureEngine(ProcedureEngine): + procedures_:dict[str,DemoProcedure] = dict() + + def loadProcedure(self, name:str, spaceSystem:str='') -> Procedure: + return DemoProcedure(name) + + def findProcedures(self, regexp:str='', spaceSystem:str='') -> list[Procedure]: + keys = list(self.procedures_.keys()) + out_list:list[Procedure] = [] + if regexp != '': + does_it = re.compile(regexp) + keys = list(filter(does_it.match, keys)) + + # build output + for k in keys: + out_list.append(self.procedures_[k]) + else: + out_list = list(self.procedures_.values()) + return out_list diff --git a/test/demo/DemoSpacePython.py b/test/demo/DemoSpacePython.py new file mode 100644 index 0000000..90c6991 --- /dev/null +++ b/test/demo/DemoSpacePython.py @@ -0,0 +1,42 @@ +__author__ = 'Space Domain Task Force (https://www.omg.org/solm/index.htm), Brad Kizzort' +__copyright__ = 'Object Management Group under RF-Limited license (https://www.omg.org/cgi-bin/doc.cgi?ipr)' +import re +from space import SpacePython, Asset, ProcedureEngine, MixedFlagValue, IllegalAssetError +from .DemoAsset import assets_ +from .DemoProcedureEngine import DemoProcedureEngine + +class DemoSpacePython(SpacePython): + + def lookupAsset(self, name:str) -> Asset: + if name in assets_: + return assets_[name] + else: + raise IllegalAssetError('Asset {0} does not exist'.format(name)) + + def findAssets(self, regexp:str='', _flags:dict[str, MixedFlagValue]=dict()) -> list[Asset]: + keys = list(assets_.keys()) + out_list:list[Asset] = [] + if regexp != '': + does_it = re.compile(regexp) + keys = list(filter(does_it.match, keys)) + + # build output + for k in keys: + out_list.append(assets_[k]) + else: + out_list = list(assets_.values()) + return out_list + + def procedureEngine(self) -> ProcedureEngine: + return DemoProcedureEngine() + +#def getAssetList() -> dict[str,DemoAsset]: +# return assets_ + +# +# The following code initializes a set of mappings +# for testing the framework with simple scripts. This interface is non-normative. +# It is expected that the framework code will be modified to access the command, telemetry, +# and equipment lists directly from the ground system software +from . import loader +loader.loadFromYaml() \ No newline at end of file diff --git a/test/demo/__init__.py b/test/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/space/loader.py b/test/demo/loader.py similarity index 64% rename from space/loader.py rename to test/demo/loader.py index b2a7f21..8aa4e8c 100644 --- a/space/loader.py +++ b/test/demo/loader.py @@ -8,21 +8,19 @@ import os.path import sys import yaml -from space import log -from .parameters import Parameter, XtceParameter, GemsParameter -from .commands import Command -from .gemsdir import GemsDirective -from .links import Link -from .gems import GemsDevice +from space import log, Restriction +from .DemoParameter import DemoParameter +from .DemoCommand import DemoCommand +from .DemoAsset import DemoAsset -def loadFromYaml(specFile=None): +def loadFromYaml(specFile:str | None = None) -> None: '''Load the datasets from the optionally specified file name. If no file name is specified or the specified file does not exist, it will try to load from SpacePythonDataset.yaml in the current working directory or the users home directory. ''' # Build the list of paths that could be used to initialize - datasetList = list() + datasetList:list[str] = list() if specFile is not None: datasetList.append(specFile) datasetList.extend(['SpacePythonDataset.yaml', @@ -45,53 +43,40 @@ def loadFromYaml(specFile=None): loader.add_constructor('!Restriction', restriction_constructor) # Load the datasets for item in loader.get_data(): - if isinstance(item, SpaceSystem): - Link(item) - elif isinstance(item, Device): - GemsDevice(item) + DemoAsset(item.name, item.pSet, item.cSet) class SpaceSystem(object): - def __init__(self, name, pSet, cSet): + def __init__(self, name:str, pSet:dict[str, DemoParameter], cSet:dict[str, DemoCommand]): self.name = name self.pSet = pSet self.cSet = cSet - def storeLink(self, link): - for param in self.pSet.values(): - param.link = link - for cmd in self.cSet.values(): - cmd.link = link class Device(object): - def __init__(self, name, pSet, dSet): + def __init__(self, name:str, pSet:dict[str, DemoParameter], cSet:dict[str, DemoCommand]): self.name = name self.pSet = pSet - self.dSet = dSet - def storeDevice(self, device): - for param in self.pSet.values(): - param.device = device - for cmd in self.dSet.values(): - cmd.device = device + self.cSet = cSet def ss_constructor(loader, node): ssMapping = loader.construct_mapping(node, True) name = ssMapping['name'] if name is None: - log.warn('No mapping to SpaceSystem name') + log.warning('No mapping to SpaceSystem name') ps = ssMapping['ParameterSet'] if ps is None or len(ps) <=0: - log.warn('No ParameterSet mapping for {0}'.format(name)) + log.warning('No ParameterSet mapping for {0}'.format(name)) cs = ssMapping['CommandSet'] if cs is None or len(cs) <=0: - log.warn('No CommandSet mapping for {0}'.format(name)) - pSet = dict() + log.warning('No CommandSet mapping for {0}'.format(name)) + pSet:dict[str, DemoParameter] = dict() for pMap in ps: pName = list(pMap.keys())[0] defSeq = pMap[pName] - xp = XtceParameter(pName, None, defSeq[0], **defSeq[1]) + xp = DemoParameter(pName, defSeq[0], **defSeq[1]) if pName in pSet: - log.warn('Duplicate parameter name {0} in {1}'.format(pName, name)) + log.warning('Duplicate parameter name {0} in {1}'.format(pName, name)) pSet[pName] = xp - cSet = dict() + cSet:dict[str, DemoCommand] = dict() for command in cs: cName = list(command.keys())[0] paramSeq = command[cName] @@ -99,11 +84,11 @@ def ss_constructor(loader, node): for pMap in paramSeq: pName = list(pMap.keys())[0] defSeq = pMap[pName] - p = Parameter(pName, defSeq[0], **defSeq[1]) + p = DemoParameter(pName, defSeq[0], **defSeq[1]) if pName in args: - log.warn('Duplicate parameter name {0} in command {1}'.format(pName, cName)) + log.warning('Duplicate parameter name {0} in command {1}'.format(pName, cName)) args[pName] = p - cmd = Command(cName, None, args) + cmd = DemoCommand(cName, name, args) if cName in cSet: log.warn('Duplicate command name {0} in {1}'.format(cName, name)) cSet[cName] = cmd @@ -113,42 +98,42 @@ def dev_constructor(loader, node): dev = loader.construct_mapping(node, True) name = dev['name'] if name is None: - log.warn('No mapping to GemsDevice name') + log.warning('No mapping to GemsDevice name') ps = dev['ParameterSet'] if ps is None or len(ps) <=0: - log.warn('No ParameterSet mapping for {0}'.format(name)) + log.warning('No ParameterSet mapping for {0}'.format(name)) cs = dev['DirectiveSet'] if cs is None or len(cs) <=0: log.debug('No DirectiveSet mapping for {0}'.format(name)) - pSet = dict() + pSet:dict[str, DemoParameter] = dict() for pMap in ps: pName = list(pMap.keys())[0] defSeq = pMap[pName] - xp = GemsParameter(pName, None, defSeq[0], **defSeq[1]) + xp = DemoParameter(pName, defSeq[0], **defSeq[1]) if pName in pSet: - log.warn('Duplicate parameter name {0} in {1}'.format(pName, name)) + log.warning('Duplicate parameter name {0} in {1}'.format(pName, name)) pSet[pName] = xp - cSet = dict() + cSet:dict[str, DemoCommand] = dict() for directive in cs: cName = list(directive.keys())[0] paramSeq = directive[cName] - args = dict() + args:dict[str, DemoParameter] = dict() for pMap in paramSeq: pName = list(pMap.keys())[0] defSeq = pMap[pName] - p = Parameter(pName, defSeq[0], **defSeq[1]) + p = DemoParameter(pName, defSeq[0], **defSeq[1]) if pName in args: - log.warn('Duplicate parameter name {0} in directive {1}'.format(pName, cName)) + log.warning('Duplicate parameter name {0} in directive {1}'.format(pName, cName)) args[pName] = p - drct = GemsDirective(cName, None, args) + drct = DemoCommand(cName, name, args) if cName in cSet: - log.warn('Duplicate directive name {0} in {1}'.format(cName, name)) + log.warning('Duplicate directive name {0} in {1}'.format(cName, name)) cSet[cName] = drct - return Device(name, pSet, cSet) + return Device(name, pSet, cSet) def restriction_constructor(loader, node): tokens = loader.construct_sequence(node, True) - restrictions = list() + restrictions:list[Restriction] = list() ii = 0 while ii < len(tokens): cls = tokens[ii] @@ -156,9 +141,7 @@ def restriction_constructor(loader, node): res = tokens[ii] ii += 1 cls = cls + 'R' #Append R to get class name - constructor = getattr(Parameter, cls, None) + constructor = getattr(DemoParameter, cls, None) if constructor: restrictions.append(constructor(res)) return restrictions - - -- 2.43.5 From 359d6eb15928f284a5ee54ffbfa423823c76db35 Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Tue, 12 Nov 2024 16:41:50 -0500 Subject: [PATCH 2/9] Change organization --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 170b481..7c2a232 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "hatchling.build" name = "space" version = "1.2.0" authors = [ - { name="Space Domain Task Force", email="space@omg.org" } + { name="Object Management Group", email="space@omg.org" } ] description = "SpacePython core interface" readme = "README.md" -- 2.43.5 From c8a30c6516003a610971dad93a202b0ddad94b5d Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Tue, 12 Nov 2024 17:42:30 -0500 Subject: [PATCH 3/9] Add license reference --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 7c2a232..9d43f89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ version = "1.2.0" authors = [ { name="Object Management Group", email="space@omg.org" } ] +license = {file = "LICENSE"} description = "SpacePython core interface" readme = "README.md" requires-python = ">=3.6" -- 2.43.5 From cba57529fd756e228b9ec68771c4c857d311f338 Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 13:17:45 -0500 Subject: [PATCH 4/9] Add pyyaml module dependency details --- test/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/README.md b/test/README.md index 61fdd27..1222644 100644 --- a/test/README.md +++ b/test/README.md @@ -11,6 +11,12 @@ A configuration yaml file is included in the data sub-directory. An example implementation backing is provided in the data sub-directory. +The demo requires the pyyaml module, which could be installed via the following command: + +``` +python3 -m pip install pyyaml +``` + Once the space module has been installed, run the samples with the following example usage: ``` -- 2.43.5 From 5aefd009efd15ce114ab1f59382eb060d4b8d7cd Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 13:18:32 -0500 Subject: [PATCH 5/9] Increase minimum python version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 9d43f89..bf94cbf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ authors = [ license = {file = "LICENSE"} description = "SpacePython core interface" readme = "README.md" -requires-python = ">=3.6" +requires-python = ">=3.12" classifiers = [ "Programming Language :: Python :: 3", "Operating System :: OS Independent" -- 2.43.5 From 4bbd4a9e15e85f7a79f57cb29ac56ba55a0ded8a Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 13:20:11 -0500 Subject: [PATCH 6/9] Add file close() call --- test/demo/loader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/demo/loader.py b/test/demo/loader.py index 8aa4e8c..3b2b37f 100644 --- a/test/demo/loader.py +++ b/test/demo/loader.py @@ -38,6 +38,8 @@ def loadFromYaml(specFile:str | None = None) -> None: raise Exception('Could not find dataset SpacePythonDataset.yaml') # Add the local type tag constructors loader = yaml.SafeLoader(f) + file.close() + loader.add_constructor('!SpaceSystem', ss_constructor) loader.add_constructor('!GemsDevice', dev_constructor) loader.add_constructor('!Restriction', restriction_constructor) -- 2.43.5 From a11b6ab6aca45716c65b8517e260de032b585a82 Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 13:21:14 -0500 Subject: [PATCH 7/9] Correct default field value --- test/demo/DemoParameter.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/demo/DemoParameter.py b/test/demo/DemoParameter.py index e43fe3c..c2aeb36 100644 --- a/test/demo/DemoParameter.py +++ b/test/demo/DemoParameter.py @@ -4,7 +4,7 @@ from space import Parameter, MixedParameterValue, NullableMixedParameterValue, i from typing import Any class DemoParameter(Parameter): - def __init__(self, name:str, type:str='str', **kwds:Any): + def __init__(self, name:str, type:str='string', **kwds:Any): self._name = name if isSupportedParameterType(type): self._type = type -- 2.43.5 From 1e56b179921a523bb2ce2f63160ed115ccd4f996 Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 14:40:01 -0500 Subject: [PATCH 8/9] Safer exception handling --- test/SetMomentumWheelSpeed.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/SetMomentumWheelSpeed.py b/test/SetMomentumWheelSpeed.py index 92c765d..86748f1 100644 --- a/test/SetMomentumWheelSpeed.py +++ b/test/SetMomentumWheelSpeed.py @@ -30,7 +30,10 @@ def int_null(arg:NullableMixedParameterValue): return 0 if arg == None: return 0 - return int(arg) + try: + return int(arg) + except ValueError as e: + return 0 # # The invoke function is the required signature for a SpacePython -- 2.43.5 From 05b4c921aae2faa6649be973354965cd140a447e Mon Sep 17 00:00:00 2001 From: Justin Boss Date: Mon, 20 Jan 2025 14:41:16 -0500 Subject: [PATCH 9/9] Improve language --- test/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/README.md b/test/README.md index 1222644..e808a6f 100644 --- a/test/README.md +++ b/test/README.md @@ -11,7 +11,7 @@ A configuration yaml file is included in the data sub-directory. An example implementation backing is provided in the data sub-directory. -The demo requires the pyyaml module, which could be installed via the following command: +The demo requires the pyyaml module, which can be installed via the following command: ``` python3 -m pip install pyyaml -- 2.43.5