Source code for spice.spectre.spectre

"""
=======
Spectre
=======
Spectre simulation interface package for Spectre for TheSyDeKick.

Initially written by Okko Järvinen, 2019
"""
import os
import sys
import subprocess
import pandas as pd
from collections import defaultdict
from abc import *
from thesdk import *
from spice.spice_common import *
import numpy as np
import psf_utils as psfu

[docs] class spectre(spice_common): """This class is used as instance in spice_simulatormodule property of spice class. Contains simulator dependent definitions. Parameters ---------- parent: object, None (mandatory to define). TheSyDeKick parent entity object for this simulator class. **kwargs : None """ def __init__(self, parent=None,**kwargs): if parent==None: self.print_log(type='F', msg="Parent of simulator module not given") else: self.parent=parent @property def syntaxdict(self): """ dict : Internally used dictionary for syntax conversions """ self.print_log(type='O', msg='Syntaxdict is obsoleted. Access properties directly') self._syntaxdict = { "cmdfile_ext" : self.cmdfile_ext, "resultfile_ext" : self.resultfile_ext, "commentchar" : self.commentchar, "commentline" : self.commentline, "nprocflag" : self.nprocflag, "simulatorcmd" : self.simulatorcmd, "dcsource_declaration" : self.dcsource_declaration, "parameter" : self.parameter, "option" : self.option, "include" : self.include, "dspfinclude" : self.dspfinclude, "subckt" : self.subckt, "lastline" : self.lastline, "eventoutdelim" : self.eventoutdelim, # Two spaces "csvskip" : self.csvskip } return self._syntaxdict @syntaxdict.setter def syntaxdict(self,value): self._syntaxdict=value @property def cmdfile_ext(self): """str : Extension of the command file """ return '.scs' @property def resultfile_ext(self): """str : Extension of the result file """ return '.raw' @property def commentchar(self): """str : Comment character of the simulator """ return '//' @property def commentline(self): """str : Comment line for the simulator """ return '///////////////////////\n' @property def nprocflag(self): """str : String for defining multithread execution """ return '+mt=' @property def simulatorcmd(self): """str : Simulator execution command (Default: 'ngspice') """ return 'spectre -64 +lqtimeout=0 ++aps=%s' %(self.errpreset) @property def dcsource_declaration(self): """str : DC source declaration """ return 'vsource type=dc dc=' @property def parameter(self): """str : Netlist parameter definition string """ return 'parameters' @property def option(self): """str : Netlist option definition string """ return 'options' @property def include(self): """str : Netlist include string """ return 'include' @property def dspfinclude(self): """str : Netlist dspf-file include string """ return 'dspf_include' @property def subckt(self): """str : Subcircuit include string """ return 'subckt' @property def lastline(self): """str : Last line of the simulator command file """ return '///' @property def eventoutdelim(self): """str : Delimiter for the events """ return ',' @property def csvskip(self): """Needs documentation. Lines skipped in result file : int """ return 0 @property def plflag_simcmd_prefix(self): """ Simulator specific prefix for enabling postlayout optimization Postfix comes from self.plflag (user defined) """ if not hasattr(self, '_plflag_simcmd_prefix'): self._plflag_simcmd_prefix="+postlayout" return self._plflag_simcmd_prefix @property def plflag(self): ''' Postlayout simulation accuracy/RC reduction flag. See: https://community.cadence.com/cadence_blogs_8/b/cic/posts/spectre-optimizing-spectre-aps-performance ''' if not hasattr(self, '_plflag'): self._plflag=f"=upa" return self._plflag @plflag.setter def plflag(self, val): if val in ["upa", "hpa"]: self._plflag=f'={val}' elif val=='': self._plflag='' else: self.print_log(type='W', msg='Unsupported postlayout flag: %s' % val) @property def errpreset(self): """ String Global accuracy parameter for Spectre simulations. Options include 'liberal', 'moderate' and 'conservative', in order of rising accuracy. You can set this by accesssing spice langmodule Example ------- self.spice_langmodule.errpreset='conservative' """ if not hasattr(self,'_errpreset'): self._errpreset='moderate' return self._errpreset @errpreset.setter def errpreset(self,value): self._errpreset=value @property def plotprogram(self): """ str : Sets the program to be used for visualizing waveform databases. Options are ezwave (default) or viva. """ if not hasattr(self, '_plotprogram'): if hasattr(self.parent,'plotprogram'): self._plotprogram=self.parent.plotprogram else: self._plotprogram='ezwave' return self._plotprogram @plotprogram.setter def plotprogram(self, value): if value not in [ 'ezwave', 'viva' ]: self.print_log(type='F', msg='%s not supported for plotprogram, only ezvave and viva are supported') else: self._plotprogram = value @property def plotprogcmd(self): """ str : Command to be run for interactive simulations. """ if self.plotprogram == 'ezwave': self._plotprogcmd='%s -MAXWND -LOGfile %s/ezwave.log %s &' % \ (self.plotprogram,self.parent.spicesimpath,self.parent.spicedbpath) elif self.plotprogram == 'viva': self._plotprogcmd='%s -datadir %s -nocdsinit &' % \ (self.plotprogram,self.parent.spicedbpath) else: self.print_log(type='F',msg='Unsupported plot program \'%s\'.' % self.plotprogram) return self._plotprogcmd @plotprogcmd.setter def plotprogcmd(self, value): self._plotprogcmd=value @property def spicecmd(self): """str : Simulation command string to be executed on the command line. Automatically generated. """ if not hasattr(self,'_spicecmd'): if self.parent.nproc: nprocflag = "%s%d" % (self.nprocflag,self.parent.nproc) self.print_log(type='I',msg='Enabling multithreading \'%s\'.' % nprocflag) else: nprocflag = "" if self.parent.postlayout: plflag=f"{self.plflag_simcmd_prefix}{self.plflag}" self.print_log(type='I',msg='Enabling post-layout optimization \'%s\'.' % plflag) else: plflag = '' spicesimcmd = (self.simulatorcmd + " %s %s -outdir %s " % (plflag,nprocflag,self.parent.spicesimpath)) self._spicecmd = self.parent.spice_submission+spicesimcmd+self.parent.spicetbsrc return self._spicecmd
[docs] def run_plotprogram(self): ''' Starting a parallel process for waveform viewer program. The plotting program command can be set with 'plotprogram' property. Tested for spectre and eldo. ''' # Wait for database to appear. tries = 0 while tries < 100: if os.path.exists(self.parent.spicedbpath): # More than just the logfile exists if len(os.listdir(self.parent.spicedbpath)) > 1: # Database file has something written to it filesize = [] for f in os.listdir(self.parent.spicedbpath): filesize.append(os.stat('%s/%s' % (self.parent.spicedbpath,f)).st_size) if all(filesize) > 0: break else: time.sleep(2) tries += 1 cmd=self.plotprogcmd self.print_log(type='I', msg='Running external command: %s' % cmd) try: ret=os.system(cmd) if ret != 0: self.print_log(type='W', msg='%s returned with exit status %d.' % (self.plotprogram, ret)) except: self.print_log(type='W',msg='Something went wrong while launcing %s.' % self.plotprogram) self.print_log(type='W',msg=traceback.format_exc())
[docs] def read_sp_result(self,**kwargs): """ Internally called function to read the S-parameter simulation results """ read_type=kwargs.get('read_type') try: if 'sp' in self.parent.simcmd_bundle.Members.keys(): self.extracts.Members.update({read_type: {}}) sweep=False # Get sp simulation file name for name, val in self.parent.simcmd_bundle.Members.items(): mc=val.mc if name=='sp': fname='' if len(val.sweep)!=0: for i in range(0, len(val.sweep)): sweep=True fname+='Sweep%d-*_' % i if mc: # TODO: implement. self.print_log(type='F', msg=f"Monte carlo currently not supported for \ S-parameter simulations.") fname+='mc_oppoint.dc' else: if 'sparams' in read_type: fname+=f'SPanalysis.sp' elif 'sprobe' in read_type: fname+=f'SPanalysis.sprobe.sp' else: if mc: # TODO: implement. self.print_log(type='F', msg=f"Monte carlo currently not supported for \ S-parameter simulations.") fname+='mc_oppoint.dc' else: if 'sparams' in read_type: fname+=f'SPanalysis.sp' elif 'sprobe' in read_type: fname+=f'SPanalysis.sprobe.sp' break # For distributed runs if self.parent.distributed_run: # TODO: check functionality and implement self.print_log(type='F', msg=f"Distributed runs not currently supported for \ S-parameter analyses.") path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, '[0-9]*', fname) else: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, fname) # Sort files such that the sweeps are in correct order. if sweep: num_sweeps=len(val.sweep) files=glob.glob(path) for i in range(num_sweeps): files=sorted(files,key=lambda x: self.sorter(x, i)) if len(files)>0: rd, fileptr = self.create_nested_sweepresult_dict(0,0, self.extracts.Members['sweeps_ran'],files, read_type) else: files=glob.glob(path) if len(files)>1: # This should not happen self.print_log(type='W', msg="S-parameter analysis was not a sweep, but for \ some reason multiple output files were found. \ results may be in wrong order!") result={} if len(files)>0: psf = psfu.PSF(files[0]) psfsweep=psf.get_sweep() for signal in psf.all_signals(): result[signal.name]=np.vstack((psfsweep.abscissa, psf.get_signal(f'{signal.name}').ordinate)).T rd={0:{'param':'nosweep', 'value':0, read_type:result}} self.extracts.Members[read_type].update({'results':rd}) except: self.print_log(type='W', msg=traceback.format_exc()) self.print_log(type='W', msg="Something went wrong while extracting S-parameters")
[docs] def read_stb_result(self,**kwargs): ''' Internally called function to read the stb simulation results ''' if 'stb' in self.parent.simcmd_bundle.Members.keys(): try: fname='STB_analysis.margin.stb' file=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, fname) valbegin= 'VALUE\n' eof = 'END\n' parsevals = False with open(file, 'r') as f: for line in f: if valbegin == line: parsevals=True if parsevals: if 'gainMarginInfo' in line: info = line.split("\"")[3].split(" ") gain_margin = float(info[info.index('margin')+2]) gain_margin_freq = float(info[info.index('frequency')+2]) if 'phaseMarginInfo' in line: info = line.split("\"")[3].split(" ") phase_margin = float(info[info.index('margin')+2]) phase_margin_freq = float(info[info.index('frequency')+2]) if 'stb_state' in line: if 'is stable' in line: stable = True else: stable = False if eof == line: parsevals=False analysis = 'stb_analysis' self.extracts.Members.update({analysis: {}}) self.extracts.Members[analysis].update({ f'gain_margin': (gain_margin_freq, gain_margin), f'phase_margin': (phase_margin_freq, phase_margin), f'status': stable }) except: analysis = 'stb_analysis' self.extracts.Members.update({analysis: {}}) self.extracts.Members[analysis].update({ f'gain_margin': ('Nan', 'Nan'), f'phase_margin': ('Nan', 'Nan'), f'status': 'Gain margin and Phase margin could not be read, the circuit is unstable!' }) try: ocean_command = f""" openResults("{self.parent.spicedbpath}") selectResult('stb) ocnPrint(?output "{os.path.join(self.parent.spicesimpath, '%s_loopGain_phase' % self.parent.name)}" ?numberNotation 'scientific phase(getData("loopGain"))) ocnPrint(?output "{os.path.join(self.parent.spicesimpath, '%s_loopGain_db20' % self.parent.name)}" ?numberNotation 'scientific db20(getData("loopGain"))) exit """ process = subprocess.Popen( ["ocean", "-nograph"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=ocean_command) if stderr: print(stderr, file=sys.stderr) freq, loopGain_db20 = self.get_ocnPrint_data(file_name=self.parent.name+'_loopGain_db20') freq, loopGain_phase = self.get_ocnPrint_data(file_name=self.parent.name+'_loopGain_phase') analysis = 'stb_analysis_waveforms' self.extracts.Members.update({analysis: {}}) self.extracts.Members[analysis].update({ f'loopGain_db20': np.concatenate([freq.reshape(-1,1), loopGain_db20.reshape(-1,1)], axis=1), f'loopGain_phase': np.concatenate([freq.reshape(-1,1), loopGain_phase.reshape(-1,1)], axis=1), }) except: pass
[docs] def get_ocnPrint_data(self,file_name): df = pd.read_csv(f"{os.path.join(self.parent.spicesimpath, '%s' % file_name)}", delim_whitespace=True, header=None, skiprows=3) freq = df[0].values vals = df[1].values return freq, vals
[docs] def get_input_noise_data(self): df = pd.read_csv(f"{os.path.join(self.parent.spicesimpath, '%s_in' % self.parent.name)}", delim_whitespace=True, header=None, skiprows=3) df.columns = ['Frequency', 'in'] freq = df['Frequency'].values in_noise = df['in'].values return freq, in_noise
[docs] def get_output_noise_data(self): df = pd.read_csv(f"{os.path.join(self.parent.spicesimpath, '%s_out' % self.parent.name)}", delim_whitespace=True, header=None, skiprows=3) df.columns = ['Frequency', 'out'] freq = df['Frequency'].values out_noise = df['out'].values return freq, out_noise
[docs] def read_noise_result(self,**kwargs): """ Internally called function to read the noise simulation results TODO: Implement for Eldo as well. """ try: if 'noise' in self.parent.simcmd_bundle.Members.keys(): ocean_command = f""" openResults("{self.parent.spicedbpath}") selectResult('noise) ocnPrint(?output "{os.path.join(self.parent.spicesimpath, '%s_nf' % self.parent.name)}" ?numberNotation 'scientific getData("NF")) ocnPrint(?output "{os.path.join(self.parent.spicesimpath, '%s_in' % self.parent.name)}" ?numberNotation 'scientific getData("in")) ocnPrint(?output "{os.path.join(self.parent.spicesimpath, '%s_out' % self.parent.name)}" ?numberNotation 'scientific getData("out")) noiseSummary('integrated ?resultsDir "{self.parent.spicedbpath}" ?result 'noise ?output "{os.path.join(self.parent.spicesimpath, '%s_noisesum' % self.parent.name)}" ?sort '(name) ?noiseUnit "V") exit """ process = subprocess.Popen( ["ocean", "-nograph"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) stdout, stderr = process.communicate(input=ocean_command) if stderr: print(stderr, file=sys.stderr) df = pd.read_csv(f"{os.path.join(self.parent.spicesimpath, '%s_nf' % self.parent.name)}", delim_whitespace=True, header=None, skiprows=3) df.columns = ['Frequency', 'NF'] freq = df['Frequency'].values NF = df['NF'].values freq, in_noise = self.get_input_noise_data() freq, out_noise = self.get_output_noise_data() df = pd.read_csv(f"{os.path.join(self.parent.spicesimpath, '%s_noisesum' % self.parent.name)}", sep='\s+', header=None, skipinitialspace=True, skipfooter=5, engine='python') noise_data = {} current_device = None for index, row in df.iterrows(): # Device row if all(pd.notna(row[0:4])) and all(pd.isna(row[5:10])): current_device = row[0] noise_data[current_device] = { 'Contribution_percentage': float(row[1]), 'Input_referred': float(row[2]), 'Param': {} } noise_data[current_device]['Param'][row[3]] = float(row[4]) # Device param row elif all(pd.notna(row[0:2])) and all(pd.isna(row[2:10])): param_name = row[0].strip() if param_name: noise_data[current_device]['Param'][param_name] = float(row[1]) analysis='noise' nodes=self.parent.simcmd_bundle.Members[analysis].nodes mc=self.parent.simcmd_bundle.Members[analysis].mc self.extracts.Members.update({analysis: {}}) # Get simulation result file name fnames=[] for node in nodes: if mc: # TODO: implement. self.print_log(type='F', msg=f"Monte carlo currently not yet supported for \ {analysis} simulations. Please implement.") else: if 'noise' in analysis: fnames.append(f'noise_analysis_{node}.noise') # For distributed runs for i in range(len(fnames)): if self.parent.distributed_run: # TODO: check functionality and implement self.print_log(type='F', msg=f"Distributed runs not currently supported for \ PSF file read analyses.") path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, '[0-9]*', fnames[i]) else: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, fnames[i]) files=glob.glob(path) if 'noise' in analysis: self.extracts.Members[analysis].update({ f'{nodes[i]}_freq':freq, f'{nodes[i]}_NF':NF, f'{nodes[i]}_input_ref_noise':in_noise, f'{nodes[i]}_output_ref_noise':out_noise, f'{nodes[i]}_noise_contributions':noise_data, }) # Read total input referred noise try: file=os.path.join(self.parent.spicesimpath,'%s_noisesum' % self.parent.name) with open(file, 'r') as f: for line in f: if 'Total Input Referred Noise' in line: total_input_ref_noise = float(line.split(' ')[-1]) self.extracts.Members[analysis].update({ 'total_input_ref_noise':total_input_ref_noise }) except: total_input_ref_noise = 'NaN' self.extracts.Members[analysis].update({ 'total_input_ref_noise':total_input_ref_noise }) except: self.print_log(type='W', msg=traceback.format_exc()) self.print_log(type='W', msg="Something went wrong while extracting S-parameters")
[docs] def create_nested_sweepresult_dict(self, level, fileptr, sweeps_ran_dict, files,read_type): """Documentation missing """ rd={} # Return this to upper level if level < len(sweeps_ran_dict)-1: for v in np.arange(len(sweeps_ran_dict[level]['values'])): result={} psf = psfu.PSF(files[fileptr]) fileptr += 1 psfsweep=psf.get_sweep() for signal in psf.all_signals(): result[signal.name]=np.vstack((psfsweep.abscissa, psf.get_signal(f'{signal.name}').ordinate)).T rd.update({v:{'param':sweeps_ran_dict[level]['param'], 'value':sweeps_ran_dict[level]['values'][v], read_type:result}}) return rd, fileptr
[docs] def read_oppts(self): """ Internally called function to read the DC operating points of the circuit TODO: Implement for Eldo as well. """ try: if 'dc' in self.parent.simcmd_bundle.Members.keys(): self.extracts.Members.update({'oppts' : {}}) sweep=False # Get dc simulation file name for name, val in self.parent.simcmd_bundle.Members.items(): mc = val.mc if name == 'dc': fname='' if len(val.sweep) != 0: for i in range(0, len(val.sweep)): sweep=True fname += 'Sweep%d-[0-9]*_' % i if mc: fname+='mc_oppoint.dc' else: fname+='oppoint.dc' else: if mc: fname = 'mc_oppoint*.dc' else: fname = 'oppoint*.dc' break # For distributed runs if self.parent.distributed_run: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, '[0-9]*', fname) else: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, fname) # Sort files so that sweeps are in correct order if sweep: num_sweeps = len(val.sweep) files = glob.glob(path) for i in range(num_sweeps): files = sorted(files,key=lambda x: self.sorter(x, i)) else: files = glob.glob(path) if len(files)>1:# This shoudln't happen self.print_log(type='W', msg='DC analysis was not a sweep, but multiple output files were found! Results may be in incorrect order!') valbegin = 'VALUE\n' eof = 'END\n' parsevals = False for file in files: with open(file, 'r') as f: for line in f: if line == valbegin: # Scan file until unit descriptions end and values start parsevals = True elif line != eof and parsevals: # Scan values from output until EOF line = line.replace('\"', '') parts = line.split() if len(parts) >= 3: if ':' in parts[0]: # This line contains op point parameter (e.g. vgs) dev, param = parts[0].split(':') elif ':' not in parts[0] and parts[1] == 'V': # This is a node voltage dev = parts[0] param = parts[1] val = float(parts[2]) if dev not in self.extracts.Members['oppts']: # Found new device self.extracts.Members['oppts'].update({dev : {}}) if param not in self.extracts.Members['oppts'][dev]: # Found new parameter for device self.extracts.Members['oppts'][dev].update({param : [val]}) else: # Parameter already existed, just append value. This can occur in e.g. sweeps self.extracts.Members['oppts'][dev][param].append(val) elif line == eof: parsevals = False elif 'pz' in self.parent.simcmd_bundle.Members.keys(): self.extracts.Members.update({'pz' : {}}) # Get pz simulation file name for name, val in self.parent.simcmd_bundle.Members.items(): if name == 'pz': fname = 'PZ_analysis.pz' # For distributed runs if self.parent.distributed_run: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, '[0-9]*', fname) else: path=os.path.join(self.parent.spicesimpath,'tb_%s.raw' % self.parent.name, fname) valbegin = 'VALUE\n' eof = 'END\n' parsevals = False valueline_grep = subprocess.check_output('grep -n \"VALUE\" %s' %path, shell=True) valueline = int(valueline_grep.decode('utf-8').split(':')[0])+1 eofline_grep = subprocess.check_output('grep -n \"END\" %s' %path, shell=True) eofline = int(eofline_grep.decode('utf-8').split(':')[0])-1 values_sed = subprocess.check_output('sed -n \'%s,%s p\' %s' %(valueline, eofline, path), shell=True) values_listed = values_sed.decode('utf-8').split('\n"') results = [] for v in values_listed: line = v.replace('"','') line = line.replace('\n',' ') line = line.replace('(','') line = line.replace(')','') parts = line.split() results.append(parts) for result in results: if len(result) < 4: dev = result[0] val = float(result[2]) self.extracts.Members['pz'].update({dev: val}) elif len(result) >= 4: pz = result[0] real = float(result[2]) imag = float(result[3]) q = float(result[4]) val = [real+1j*imag, q] self.extracts.Members['pz'].update({pz : val}) except: self.print_log(type='W', msg=traceback.format_exc()) self.print_log(type='W',msg='Something went wrong while extracting DC operating points.')