Source code for thesdk

"""
======
Thesdk
======

Superclass class of TheSyDeKick - universal System Development Kit. Provides
commmon methods and utility classes for other classes in TheSyDeKick.

Created by Marko Kosunen, marko.kosunen@aalto.fi, 2017.

Documentation instructions

Current docstring documentation style is Numpy
https://numpydoc.readthedocs.io/en/latest/format.html

This text here is to remind you that documentation is important.
However, you may find out that even the documentation of this
entity may be outdated and incomplete. Regardless of that, every day
and in every way we are getting better and better :).

"""
import sys
import os
import glob
import getpass
import time
import tempfile
import re
import abc
from abc import *
from functools import reduce
import multiprocessing

import numpy as np
import traceback
import time
import functools
import contextlib as cl
import pdb
import pickle
from datetime import datetime

#Set 'must have methods' with abstractmethod
#@abstractmethod
#Using this decorator requires that the class’s metaclass is ABCMeta or is 
#derived from it. A class that has a metaclass derived from ABCMeta cannot 
#be instantiated unless all of its abstract methods and properties are overridden.
from thesdk.bundle import Bundle
[docs] class thesdk(metaclass=abc.ABCMeta): ''' Following class attributes are set when this class imported Attributes ---------- HOME: str Directory ../../../ counting from location __init__.py file of thesdkclass. Used as a reference point for other locations CONFIGFILE: str HOME/TheSDK.config. MODULEPATHS: str List of directories under HOME/Entities that contain __init__.py file. Appended to sys.path to locate TheSyDeKick system modules logfile: str Default logfile: /tmp/TheSDK_randomstr_uname_YYYYMMDDHHMM.log Override with initlog if you want something else global_parameters: list(str) List of global parameters to be read to GLOBALS dictionary from CONFIGFILE GLOBALS: dict Dictionary of global parameters, keys defined by global_parameters, values defined in CONFIGFILE ''' #Solve for the THESDKHOME #HOME=os.getcwd() HOME=os.path.realpath(__file__) for i in range(4): HOME=os.path.dirname(HOME) print("Home of TheSDK is %s" %(HOME)) CONFIGFILE=HOME+'/TheSDK.config' print("Config file of TheSDK is %s" %(CONFIGFILE)) #This variable becomes redundant after the GLOBALS dictionary is created global_parameters=[ 'LSFSUBMISSION', 'LSFINTERACTIVE', 'ELDOLIBFILE', 'SPECTRELIBFILE', 'VLOGLIBFILE', 'VHDLLIBFILE' ] # Append all SDK python modules to path. Strategy: # 1. iterate over paths starting from Entities directory # 2.1 if Entities/<path> is not a file, check if Entities/<path>/<path>/__init__.py exists # 3. If it does, it is a SDK module -> add to path MODULEPATHS=[] root = os.path.join(HOME, 'Entities') for item in os.listdir(root): if not os.path.isfile(os.path.join(root, item)): if os.path.isfile(os.path.join(root, item, item, '__init__.py')): MODULEPATHS.append(os.path.join(root, item)) for i in list(set(MODULEPATHS)-set(sys.path)): if 'BagModules' not in i: print("Adding %s to system path" %(i)) sys.path.append(i) logfile=("/tmp/TheSDK_" + os.path.basename(tempfile.mkstemp()[1])+"_"+getpass.getuser() +"_"+time.strftime("%Y%m%d%H%M")+".log") if os.path.isfile(logfile): os.remove(logfile) print("Setting default logfile %s" %(logfile)) #Do not create the logfile here #----logfile stuff ends here # Parse the glopal parameters from a TheSDK.config to a dict # Delete parameter list as not needed any more GLOBALS={} for name in global_parameters: with open(CONFIGFILE,'r') as fid: global match match='('+name+'=)(.*)' func_list=( lambda s: re.sub(match,r'\2',s), lambda s: re.sub(r'"','',s), lambda s: re.sub(r'\n','',s) ) for line in fid: if re.match(match,line): GLOBALS[name]=reduce(lambda s, func: func(s), func_list, line) print("GLOBALS[%s]='%s'"%(name,GLOBALS[name])) fid.close() del match del global_parameters del name #----Global parameter stuff ends here
[docs] @classmethod def initlog(cls,*arg): '''Initializes logging. logfile passed as a parameter ''' if len(arg) > 0: __class__.logfile=arg[0] if os.path.isfile(__class__.logfile): os.remove(__class__.logfile) typestr="[INFO]" # Colors for stdout prints cend = '' if not cls.print_colors else '\33[0m' cblack = '' if not cls.print_colors else '\33[30m' cred = '' if not cls.print_colors else '\33[31m' cgreen = '' if not cls.print_colors else '\33[32m' cyellow = '' if not cls.print_colors else '\33[33m' cblue = '' if not cls.print_colors else '\33[34m' cviolet = '' if not cls.print_colors else '\33[35m' cbeige = '' if not cls.print_colors else '\33[36m' cwhite = '' if not cls.print_colors else '\33[37m' msg="Default logfile override. Initialized logging in %s" %(__class__.logfile) print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cgreen,typestr,cend, __class__.__name__ , msg)) fid= open(__class__.logfile, 'a') fid.write("%s %s %s: %s\n" %(time.strftime("%H:%M:%S"),typestr, __class__.__name__ , msg)) fid.close()
#Common properties @property def DEBUG(self): '''Set this to True if you want the debug messages printed ''' if not hasattr(self,'_DEBUG'): self._DEBUG = False return self._DEBUG @DEBUG.setter def DEBUG(self,value): self._DEBUG=value @property def print_relative_path(self): ''' True (default) | False If True, print all paths relative to the entity path. If False, paths are printed as is (typically absolute paths). ''' if not hasattr(self,'_print_relative_path'): self._print_relative_path = True return self._print_relative_path @print_relative_path.setter def print_relative_path(self,value): self._print_relative_path=value @property def _classfile(self): ''' Defines the location of the classfile. Returns: <path>/Entities/<entity>/<entity> ''' path = os.path.dirname( os.path.abspath( sys.modules[self.__class__.__module__].__file__)) return os.path.join( path, self.__class__.__name__ ) @property def entitypath(self): ''' Path to entity. Extracted from the location of __init__.py file. ''' if not hasattr(self, '_entitypath'): self._entitypath= os.path.dirname(os.path.dirname(self._classfile)) return self._entitypath #No setter, no deleter. @property def model(self): ''' Simulation model to be used 'py' | 'sv' | 'vhdl' | 'eldo' | 'spectre' | 'ngspice' | 'hw' | 'icarus' | 'ghdl' ''' if not hasattr(self,'_model'): self.print_log(type='F', msg='You MUST set the simulation model.') else: return self._model @model.setter def model(self,val): if val not in [ 'py', 'sv', 'vhdl', 'eldo', 'spectre', 'ngspice', 'hw', 'icarus', 'ghdl' ]: self.print_log(type='E', msg= 'Simulator model %s not supported.' %(val)) self._model=val return self._model @property def simpathroot(self): """String Simulation path root. Default self.entitypath """ if not hasattr(self,'_simpathroot'): self._simpathroot=self.entitypath return self._simpathroot @simpathroot.setter def simpathroot(self,val): self._simpathroot=val return self._simpathroot @property def simpath(self): """String Simulation path. (./simulations/<model>/<runname>) This is not meant to be set manually. Use 'simpathroot' to relocate. """ #This property is dependent, it should not be fixed in creation name = self.runname if self.runname != '' else self.load_state self._simpath = '%s/simulations/%s/%s' % (self.simpathroot,self.model,name) try: if not (os.path.exists(self._simpath)): os.makedirs(self._simpath) self.print_log(type='I',msg='Creating %s' % self._simpath) except: self.print_log(type='E',msg='Failed to create %s' % self._simpath) return self._simpath @simpath.setter def simpath(self,val): self.print_log(type='F', msg="Setting simpath has no effect. Set 'simpathroot' instead.") @property def has_lsf(self): """True | False (default) True if LSFINTERACTIVE and LSFSUBMISSION global veriables are defined in TheSDK.config. """ if ('LSFINTERACTIVE' not in thesdk.GLOBALS.keys()) or ('LSFSUBMISSION' not in thesdk.GLOBALS.keys()): self._has_lsf = False elif ( not thesdk.GLOBALS['LSFINTERACTIVE'] == '' ) and (not thesdk.GLOBALS['LSFSUBMISSION'] == ''): self._has_lsf = True else: self._has_lsf = False return self._has_lsf @property def preserve_iofiles(self): """True | False (default) If True, do not delete IO files after simulations. Useful for debugging the file IO""" if not hasattr(self,'_preserve_iofiles'): self._preserve_iofiles = False return self._preserve_iofiles @preserve_iofiles.setter def preserve_iofiles(self,value): self._preserve_iofiles=value @property def pickle_excludes(self): ''' list : Properties of entity to be excluded from pickling when saving entity state to disk. Useful for filtering out e.g. lambdas and other non-serializable objects. Default: ['_par', '_queue', 'generator', 'virtuoso_interface'] ''' if not hasattr(self, '_pickle_excludes'): self._pickle_excludes = ['_par', '_queue', 'generator', 'virtuoso_interface'] return self._pickle_excludes @pickle_excludes.setter def pickle_excludes(self, val): self._pickle_excludes=val #Common method to propagate system parameters
[docs] def copy_propval(self,*arg): ''' Method to copy attributes form parent. Example:: a=some_thesdk_class(self) Attributes listed in proplist attribute of 'some_thesdkclass' are copied from self to a, if and only if both self and a define the property mentioned in the proplist. Implemented by including following code at the end of __init__ method of every entity:: if len(arg)>=1: parent=arg[0] self.copy_propval(parent,self.proplist) self.parent =parent; ''' if len(arg)>=2: self.parent=arg[0] self.proplist=arg[1] for prop in self.proplist: if hasattr(self,prop) and hasattr(self.parent, prop): #Its nice to see how things propagate msg="Setting %s: %s to %s" %(self, prop, getattr(self.parent,prop)) self.print_log(type= 'I', msg=msg) setattr(self,prop,getattr(self.parent,prop)) else: obj = self if not hasattr(self, prop) else self.parent msg = "Property %s not defined for entity %s, omitting copy!" % (prop,obj) self.print_log(type='D',msg=msg)
#Method for logging #This is a method because it uses the logfile property
[docs] def print_log(self,**kwargs): ''' Method to print messages to 'logfile' Parameters ---------- **kwargs: type: str 'I' = Information 'D' = Debug. Enabled by setting the Debug-attribute of an instance to true 'W' = Warning 'E' = Error 'F' = Fatal, quits the execution 'O' = Obsolete, used for obsolition warnings. msg: str The messge to be printed ''' type=kwargs.get('type','I') msg=kwargs.get('msg',"Print this to log") # Converting absolute file paths to relative file paths if self.print_relative_path: msg = msg.replace(self.entitypath,'.') if hasattr(self,'parent'): msg = msg.replace(self.parent.entitypath,'.') # Colors for stdout prints cend = '' if not self.print_colors else '\33[0m' cblack = '' if not self.print_colors else '\33[30m' cred = '' if not self.print_colors else '\33[31m' cgreen = '' if not self.print_colors else '\33[32m' cyellow = '' if not self.print_colors else '\33[33m' cblue = '' if not self.print_colors else '\33[34m' cviolet = '' if not self.print_colors else '\33[35m' cbeige = '' if not self.print_colors else '\33[36m' cwhite = '' if not self.print_colors else '\33[37m' if not os.path.isfile(thesdk.logfile): typestr="[INFO]" initmsg="Initialized logging in %s" %(thesdk.logfile) print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cgreen,typestr,cend, self.__class__.__name__ , initmsg)) fid= open(thesdk.logfile, 'a') fid.write("%s %s thesdk: %s\n" %(time.strftime("%H:%M:%S"), typestr, initmsg)) fid.close() if type == 'D': if self.DEBUG: typestr="[DEBUG]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cblue,typestr,cend, self.__class__.__name__ , msg)) if hasattr(self,"logfile"): fid= open(thesdk.logfile, 'a') fid.write("%s %s %s: %s\n" %(time.strftime("%H:%M:%S"), typestr, self.__class__.__name__ , msg)) fid.close() return elif type == 'I': typestr ="[INFO]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cgreen,typestr,cend, self.__class__.__name__ , msg)) elif type =='W': typestr = "[WARNING]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cyellow,typestr,cend, self.__class__.__name__ , msg)) elif type =='E': typestr = "[ERROR]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cred,typestr,cend, self.__class__.__name__ , msg)) elif type =='O': typestr = "[OBSOLETE]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cviolet,typestr,cend, self.__class__.__name__ , msg)) elif type =='F': typestr = "[FATAL]" print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cred,typestr,cend, self.__class__.__name__ , msg)) print("Quitting due to fatal error in %s" %(self.__class__.__name__)) if hasattr(self,"logfile"): fid= open(thesdk.logfile, 'a') fid.write("%s Quitting due to fatal error in %s.\n" %( time.strftime("%H:%M:%S"), self.__class__.__name__)) fid.close() if self.par: self.queue.put({}) quit() else: typestr ="[ERROR]" msg="Incorrect message type '%s'. Choose one of 'D', 'I', 'E' or 'F'." % type print("%s %s%s%s %s: %s" %(time.strftime("%H:%M:%S"),cred,typestr,cend, self.__class__.__name__ , msg)) #If logfile set, print also there if hasattr(self,"logfile"): fid= open(thesdk.logfile, 'a') fid.write("%s %s %s: %s\n" %(time.strftime("%H:%M:%S"), typestr, self.__class__.__name__ , msg)) fid.close()
[docs] def timer(func): """Timer decorator Print execution time of member functions of classes inheriting thesdk to the logfile. The timer is applied by decorating the function to be timed with \@thesdk.timer. For example, calling a function calculate_something() belonging to an example class calculator(thesdk), would print the following:: class calculator(thesdk): @thesdk.timer def calculate_something(self): # Time-consuming calculations here print(result) return result >> calc = calculator() >> result = calc.calculate_something() 42 10:25:17 INFO at calculator: Finished 'calculate_something' in 0.758 s. >> print(result) 42 """ @functools.wraps(func) def wrapper_timer(*args, **kwargs): start = time.perf_counter() retval = func(*args, **kwargs) stop = time.perf_counter() duration = stop-start args[0].print_log(type='I',msg='Finished \'%s\' in %.03f s.' % (func.__name__,duration)) return retval return wrapper_timer
[docs] @cl.contextmanager def silence(self,show_error=True,debug=False): ''' Context manager to redirect stdout (and optional errors) to /dev/null. Useful for cleaning up verbose function outputs. The silencing can be bypassed by setting debug=True. Errors are let through by default, but error messages can be silenced also by setting show_error=False. Silences only Python outputs (external commands such as spectre can still write to stdout). To silence (prevent printing to stdout) of a section of code:: print('This is printed normally') with self.silence(): print('This will not be printed') print('This is again printed normally') ''' if not debug: with open(os.devnull, 'w') as fnull: if not show_error: with cl.redirect_stderr(fnull) as err, cl.redirect_stdout(fnull) as out: yield (err, out) else: with cl.redirect_stdout(fnull) as out: yield out else: yield
@property def par(self): """True | False (default) Property defines whether parallel run is intended or not""" if hasattr(self,'_par'): return self._par else: self._par=False return self._par @par.setter def par(self,value): self._par = value @property def queue(self): """Property holding the queue for parallel run result """ if hasattr(self,'_queue'): return self._queue else: self._queue = [] return self._queue @queue.setter def queue(self,value): self._queue = value
[docs] def run_parallel(self, **kwargs): """Run instances in parallel and collect results Usage: Takes in a set of instances, runs a given method for them, and saves result data to the original instances. Results are returned as a dictionary. The dictionary can include IOS, which are saved to IOS of the original instance. Otherwise non-IO key-value pairs are saved as members of self.extracts.Members for the original instance. This is an example of returning both IOS and other data (place at the end of your simulation method, e.g. run()):: if self.par: ret_dict = {'NF' : 25} ret_dict.update(self.IOS.Members) #Adds IOS to return dictionary self.queue.put(ret_dict) Some simulator modules can populate the extracts-bundle with generic extracted parameters. To pass this dictionary to the original instance, following example can be used:: if self.par: # Combine IOS and extracts into one dictionary ret_dict = {**self.IOS.Members,**self.extracts.Members} self.queue.put(ret_dict) Parameters ---------- **kwargs: duts: list List of instances you want to simulate method: str Method called for each instance (default: run) max_jobs: int Maximum number of concurrent jobs. Unlimited by default. """ duts=kwargs.get('duts') method=kwargs.get('method','run') max_jobs=kwargs.get('max_jobs',None) if max_jobs is None: max_jobs = len(duts) nbatch = int(np.ceil(len(duts)/max_jobs)) for j in range(nbatch): dutrange = range(j*max_jobs,(j+1)*max_jobs) if dutrange.stop > len(duts): dutrange = range(j*max_jobs,len(duts)) que=[] proc=[] for i in dutrange: self.print_log(type='I', msg='Starting parallel run %d/%d' % (i+1,len(duts))) que.append(multiprocessing.Queue()) proc.append(multiprocessing.Process(target=getattr(duts[i],method))) duts[i].par = True duts[i].queue = que[-1] proc[-1].start() n=0 for i in dutrange: ret_dict=que[n].get() # returned dictionary if ret_dict: self.print_log(type='I', msg='Saving results from parallel run of %s' %(duts[i])) for key,value in ret_dict.items(): if key in duts[i].IOS.Members: duts[i].IOS.Members[key] = value elif hasattr(duts[i],key): setattr(duts[i],key,value) else: duts[i].extracts.Members[key] = value else: if duts[i].load_state == '': name = duts[i].runname else: name = duts[i].load_state self.print_log(type='W',msg='Parallel run %d/%d failed (with name: %s). Returned dict was empty!' % (i+1, len(duts), name)) proc[n].join() n+=1
@property def IOS(self): """Type: Bundle of IO's Property holding the IOS Example: self.IOS.Members['input_A']=IO() """ if hasattr(self,'_IOS'): return self._IOS else: self._IOS = Bundle() return self._IOS @IOS.setter def IOS(self,value): self._IOS = value @property def extracts(self): """Bundle Bundle for holding the returned results from simulations that are not attributes or IOs. Example:: self.extracts.Members['sndr']=60 """ if hasattr(self,'_extracts'): return self._extracts else: self._extracts = Bundle() return self._extracts @extracts.setter def extracts(self,value): self._extracts = value @property def netlist_params(self): """List[string] List of strings containing the parameters of a netlist. List is populated by calling ecd_methods.get_params(). Empty if no parameters were found in the netlist. """ if not hasattr(self,'_netlist_params'): self._netlist_params = [] return self._netlist_params @netlist_params.setter def netlist_params(self,value): if isinstance(val, list): self._netlist_params = value else: self.print_log(type='W', msg='Cannot set property netlist_params as type %s' % type(val)) @property def print_colors(self): """True (default) | False Enable color tags in log print messages. """ if not hasattr(self,'_print_colors'): self._print_colors = True return self._print_colors @print_colors.setter def print_colors(self,value): self._print_colors = value @property def runname(self): """String Automatically generated name for the simulation. Formatted as timestamp_randomtag, i.e. '20201002103638_tmpdbw11nr4'. Can be overridden by assigning self.runname = 'myname'. Example:: self.runname = 'test' would generate the simulation files in `simulations/<model>/test/`. """ if not hasattr(self,'_runname'): self._runname='%s_%s' % \ (datetime.now().strftime('%Y%m%d%H%M%S'),os.path.basename(tempfile.mkstemp()[1])) return self._runname @runname.setter def runname(self,value): self._runname=value @property def statepath(self): """String Path where the entity state is stored and where existing states are loaded from. """ if not hasattr(self,'_statepath'): #self._statepath = '%s/states/%s/new_sweep' % (self.entitypath,self.model) self._statepath = '%s/states/%s' % (self.entitypath,self.model) return self._statepath @statepath.setter def statepath(self,value): self._statepath = value @property def statedir(self): """String Path to the most recently stored state. """ if not hasattr(self,'_statedir'): if self.runname != '': self._statedir = '%s/%s' % (self.statepath,self.runname) else: self._statedir = '%s/%s' % (self.statepath,self.load_state) return self._statedir @statedir.setter def statedir(self,value): self._statedir = value @property def save_state(self): """True | False (default) Save the entity state after simulation (including output data). Any stored state can be loaded using the matching state name passed to the `load_state` property. The state is saved to `savestatepath` by default. """ if not hasattr(self,'_save_state'): self._save_state = False return self._save_state @save_state.setter def save_state(self,value): self._save_state = value @property def load_state(self): """String (default '') Feature for loading results of previous simulation. When calling run() with this property set, the simulation is not re-executed, but the entity state and output data will be read from the saved state. The string value should be the `runname` of the desired simulation. Loading the most recent result automatically:: self.load_state = 'last' # or self.load_state = 'latest' Loading a specific past result using the `runname`:: self.load_state = '20201002103638_tmpdbw11nr4' List available results by providing any non-existent `runname`:: self.load_state = 'this_does_not_exist' """ if not hasattr(self,'_load_state'): self._load_state='' return self._load_state @load_state.setter def load_state(self,value): self._load_state=value @property def load_state_full(self): """True (default) | False Whether to load the full entity state or not. If False, only IOs are loaded in order to not override the entity state otherwise. In that case, bundles `IOS` and `extracts` are updated. """ if not hasattr(self,'_load_state_full'): self._load_state_full = True return self._load_state_full @load_state_full.setter def load_state_full(self,value): self._load_state_full = value
[docs] def _write_state(self): """Write the entity state to a binary file. This should be called after the simulation has finished. """ pathname = '%s/%s' % (self.statepath,self.runname) try: if not (os.path.exists(self.statedir)): os.makedirs(self.statedir) except: self.print_log(type='E',msg='Failed to create %s' % self.statedir) try: with open('%s/state.pickle' % self.statedir,'wb') as f: pickle.dump(self,f) self.print_log(type='I',msg='Saving state to %s' % self.statedir) except: self.print_log(type='E',msg=traceback.format_exc()) self.print_log(type='E',msg='Failed saving state to %s' % self.statedir)
[docs] def _read_state(self): """Read the entity state from a binary file. """ self.runname = self.load_state if self.runname == 'latest' or self.runname == 'last': results = glob.glob(self.statepath+'/*') latest = max(results, key=os.path.getctime) self.runname = latest.split('/')[-1] pathname = '%s/%s' % (self.statepath,self.runname) if not os.path.exists(pathname): self.print_log(type='E',msg='Existing results not found in %s' % pathname) existing = os.listdir(self.statepath) self.print_log(type='I',msg='Found results:') for f in existing: self.print_log(type='I',msg='%s' % f) try: self.print_log(type='I',msg='Loading state from %s' % pathname) with open('%s/state.pickle' % pathname,'rb') as f: obj = pickle.load(f) for name,val in obj.__dict__.items(): # For a bundle, assign the Data fields to preserve pointers if name == '_IOS' and type(val).__name__ == 'Bundle': for ioname,ioval in val.Members.items(): self.print_log(type='D',msg='Assigning data to %s at %s' % \ (ioname,hex(id(self.__dict__[name].Members[ioname])))) self.__dict__[name].Members[ioname].Data = ioval.Data elif self.load_state_full or name == '_extracts': self.print_log(type='D',msg='Loading %s' % name) self.__dict__[name] = val except: self.print_log(type='W',msg=traceback.format_exc()) self.print_log(type='F',msg='Failed loading state from %s' % pathname)
def __getstate__(self): state=self.__dict__.copy() for item in self.pickle_excludes: if item in state: del state[item] return state def __setstate__(self,state): for item in self.pickle_excludes: if item in state: del state[item] self.__dict__.update(state) @property def iofile_bundle(self): """Bundle A thesdk.Bundle containing `iofile` objects. The `iofile` objects are automatically added to this Bundle, nothing should be manually added. """ if not hasattr(self,'_iofile_bundle'): self._iofile_bundle=Bundle() return self._iofile_bundle @iofile_bundle.setter def iofile_bundle(self,value): self._iofile_bundle=value
[docs] def delete_iofile_bundle(self): """Method to delete all files in iofile bundle For each of the member of the bundle of type iofile, it calls 'remove' method. In case modifications are needed, define class for desired iofile type with remove method. """ for name, val in self.iofile_bundle.Members.items(): if self.preserve_iofiles: self.print_log(type="I", msg="Preserving iofiles for %s" %(name)) else: if val.preserve: # In case preserve flag is set by other means self.print_log(type="I", msg="Preserve_value is %s" %(val.preserve)) self.print_log(type="I", msg="Preserving file %s" %(val.file)) else: val.remove()
[docs] class IO(thesdk): ''' TheSyDeKick IO class. Child of thesdk to utilize logging method. The IOs of an entity must be defined as:: self.IOS=Bundle() self.IOS.Members['a']=IO() and referred to as:: self.IOS.Members['a'].Data ''' @property def _classfile(self): return os.path.dirname(os.path.realpath(__file__)) + "/"+__name__ def __init__(self,**kwargs): ''' Parameters ---------- **kwargs: Data: numpy_array, None Sets the Data attribute during the initialization ''' self._Data = kwargs.get('Data',None) @property def Data(self): '''Data value of this IO ''' if hasattr(self,'_Data'): return self._Data else: self._Data=None return self._Data @Data.setter def Data(self,value): self._Data=value @property def data(self): if hasattr(self,'_Data'): return self._Data else: self._Data=None self.print_log(type='O',msg='IO attribute \'data\' is obsoleted by attribute \'Data\' Will be removed in release 1.4' ) return self._Data @data.setter def data(self,value): self._Data=value def __getstate__(self): return self.__dict__.copy() def __setstate__(self,state): self.__dict__.update(state)