<Copyright statement>= (U->) """ 1.This Software copyright \u00A9 Australian Synchrotron Research Program Inc, ("ASRP"). 2.Subject to ensuring that this copyright notice and licence terms appear on all copies and all modified versions, of PyCIFRW computer code ("this Software"), a royalty-free non-exclusive licence is hereby given (i) to use, copy and modify this Software including the use of reasonable portions of it in other software and (ii) to publish, bundle and otherwise re-distribute this Software or modified versions of this Software to third parties, provided that this copyright notice and terms are clearly shown as applying to all parts of software derived from this Software on each occasion it is published, bundled or re-distributed. You are encouraged to communicate useful modifications to ASRP for inclusion for future versions. 3.No part of this Software may be sold as a standalone package. 4.If any part of this Software is bundled with Software that is sold, a free copy of the relevant version of this Software must be made available through the same distribution channel (be that web server, tape, CD or otherwise). 5.It is a term of exercise of any of the above royalty free licence rights that ASRP gives no warranty, undertaking or representation whatsoever whether express or implied by statute, common law, custom or otherwise, in respect of this Software or any part of it. Without limiting the generality of the preceding sentence, ASRP will not be liable for any injury, loss or damage (including consequential loss or damage) or other loss, loss of profits, costs, charges or expenses however caused which may be suffered, incurred or arise directly or indirectly in respect of this Software. 6. This Software is not licenced for use in medical applications. """
This file implements a general STAR reading/writing utility. The basic
objects (StarFile/StarBlock
) read and write syntactically correct STAR files
including save frames.
The StarFile
class is initialised with either no arguments (a new STAR file)
or with the name of an already existing STAR file. Data items are
accessed/changed/added using the python mapping type ie to get
dataitem
you would type value = cf[blockname][dataitem]
.
The methods available for the StarFile type are:
ReadStar(filestream)
: (re)initialise using opened STAR file filestream
.
NewBlock(blockname,[block contents],replace=False)
: add new block to this object. If blockcontents
is provided, it
must be a StarBlock
object (see below). If replace is False
, attempts to
replace a pre-existing block will cause an error.
WriteOut(comment)
: return the contents of the current file as a
CIF-conformant string, with optional comment
at the beginning.
The methods available for the StarBlock type are:
GetLoopItem(itemname)
: return the value of itemname
in
current block (equivalent to using [])
AddLoopItem(data)
: add data
to the current block. data
is
a tuple consisting of a single itemname and an array of
data, or else a single data value. This method
is called when setting data using [].
RemoveLoopItem(dataname)
: remove the given dataname from the
current block. Same as typing 'del block[item]'
GetLoop(dataname)
: for looped data item dataname
, get a list
of all itemnames and values co-occurring in this loop. Returns an error
if dataname
is not in a loop.
AddLoop(dataname,data)
: add data
to the loop containing
dataname
. If dataname is not a looped item, an error is returned.
If (data) has the wrong length, an error is returned.
loops()
: return a list containing all looped names, grouped into
individual loops. This was added to facilitate validity checking and is
unlikely to be useful otherwise.
Note also that a StarFile object can be accessed as a mapping type, ie using square brackets. Most mapping operations have been implemented (see below).
We import type objects at the module level, as required by later versions of Python.
<*>= <Copyright statement> from types import * from urllib import * # for arbitrary opening import re import copy <LoopBlock class> <StarBlock class> <BlockCollection class> <StarFile class> <Define an error class> <Read in a STAR file>
STAR files and a collection of save frames look very similar, so we
abstract the common behaviour into the BlockCollection
class, and then
inherit from it to make a StarFile
object. Save frames require
no further special behaviour, and so are instances of a BlockCollection
.
This is a parameterised class, in that we specify the class of the items in the BlockCollection at initialisation time. This allows us to restrict the components of a CIF file, for example, to be CifBlocks.
<BlockCollection class>= (<-U) class BlockCollection: <Initialise BC data structures> <BC emulation of mapping type> <Add a new data section> <Merge with another block collection> <Collect all values of a single key in all blocks> <Write out to string representation>
When initialising, we take an optional type tag, which will be used when printing out as a prefix before each block name. Usually this will be either "data_" for a data block in a STAR file, or "save_" for a save frame in a data block.
We also require a class name that we use to restrict the components
of the block. If we are passed another BlockCollection object, we
cycle through all of the blocks, if necessary imposing the the proper
element_class
. This is in practice used to turn StarBlocks into
CifBlocks.
<Initialise BC data structures>= (<-U) def __init__(self,datasource=None,element_class=StarBlock,type_tag=''): self.dictionary = {} self.type_tag = type_tag self.lower_keys = [] # for efficiency self.element_class = element_class if isinstance(datasource,(DictType,BlockCollection)): for key,value in datasource.items(): if value.__class__ == element_class: self[key]=value else: self[key]= element_class(value) self.header_comment = ''
Checking block name lengths. This is not needed for a STAR block, but is useful for CIF.
<Check block name lengths>= def checklengths(self,maxlength): toolong = filter(lambda a:len(a)>maxlength, self.dictionary.keys()) if toolong: errorstring = "" for bn in toolong: errorstring += "\n" + bn raise StarError( 'Following block name(s) too long: \n' + errorstring)
Emulation of a mapping type. When called via __setitem__, we do not
check that a new blockname replaces an old block name. If you need this,
you should call NewBlock
directly.
<BC emulation of mapping type>= (<-U) def __str__(self): return self.WriteOut() def __setitem__(self,key,value): if isinstance(value,(self.element_class,DictType)): self.NewBlock(key,value,replace=True) else: raise TypeError self.lower_keys.append(key.lower()) # due to attempt to get upper/lower case treated as identical # we have a bit of cruft here def __getitem__(self,key): try: return self.dictionary[key] except KeyError: if key.lower() not in self.lower_keys: raise KeyError curr_keys = self.dictionary.keys() lower_ordered = map(lambda a:a.lower(),curr_keys) keyindex = lower_ordered.index(key.lower()) return self.dictionary[curr_keys[keyindex]] # we have to get an ordered list of the current keys, # as we'll have to delete one of them anyway def __delitem__(self,key): try: del self.dictionary[key] self.lower_keys.remove(key.lower()) except KeyError: if not self.has_key(key): raise KeyError curr_keys = self.dictionary.keys() lower_ordered = map(lambda a:a.lower(),curr_keys) keyindex = lower_ordered.index(key.lower()) del self.dictionary[curr_keys[keyindex]] def __len__(self): return len(self.dictionary) def keys(self): return self.dictionary.keys() # changes to take case independence into account def has_key(self,key): if self.dictionary.has_key(key): return 1 if key.lower() in self.lower_keys: return 1 return 0 def get(self,key,default=None): if self.dictionary.has_key(key): return self.dictionary[key] elif self.has_key(key): # take account of case return self.__getitem__(key) else: return default def clear(self): self.dictionary.clear() self.lower_keys = [] def copy(self): newcopy = self.dictionary.copy() return BlockCollection('',newcopy) def update(self,adict): for key in adict.keys(): self.dictionary[key] = adict[key] self.lower_keys.extend(map(lambda a:a.lower(),adict.keys())) def items(self): return self.dictionary.items()
Adding a new block. A new block is just a new item in our dictionary, so we add a new entry to the current list of dictionaries, and set the count of the number of loops to zero. We return the new block name in case we have changed it, so the calling routine can refer to it later. Also, there is a limit of 75 characters for the block name length, which we enforce here.
The no_replace
option will signal an error if the requested blockname
is already in the CifFile.
Note that we must take account of upper/lower case differences being irrelevant for CIFs, but that we want to preserve the original case.
<Add a new data section>= (<-U) def NewBlock(self,blockname,blockcontents=(),replace=False,fix=True): if not blockcontents: blockcontents = self.element_class() elif isinstance(blockcontents,DictType): blockcontents = self.element_class(blockcontents) if not isinstance(blockcontents,self.element_class): raise StarError( 'Block is not of required type %s, is %s' % self.element_class.__name__,blockcontents.__class__.__name__) if fix: newblockname = re.sub('[ \t]','_',blockname) else: newblockname = blockname new_lowerbn = newblockname.lower() if self.lower_keys.count(new_lowerbn): #already in CIF if not replace: raise StarError( "Attempt to replace existing block" + blockname) # generate a list of lower-case keys in correct order current_keys = self.dictionary.keys() blocknames = map(lambda a:a.lower(),current_keys) location = blocknames.index(new_lowerbn) del self.dictionary[current_keys[location]] self.lower_keys.remove(new_lowerbn) self.dictionary.update({blockname:blockcontents}) self.lower_keys.append(new_lowerbn)
Merging. We implement this for dictionary merging support. We can't
merge CifDic
objects, because the internal data structures for DDL2 and
DDL1 are different (parent-child in particular), so any merge operation
would have to first recreate the original Cif structure before proceeding.
Merging can be strict, overlay or replace. In all cases, if the block name is different, we simply add it in. If it is the same, in strict mode we flag an error, in replace mode we replace it, and in overlay mode we actually add/replace individual data items.
If the single_block list is non-empty, we assume that we should merge on the block level, using the given block names as the particular blocks to merge. This is essentially what we have to do for DDL2 dictionaries, where all the definitions are stored in save frames inside a single block.
Note also the related situation where we are in 'strict' mode, and the DDL1 dictionaries both have an "on_this_dictionary" block. So we have an extra keyword argument "idblock" which contains a blockname to ignore during merging, i.e. it will remain the same as before merging. In the future we may implement some sort of version tracking mechanism using this block.
The suggested overlay method involves adding to loops, rather than replacing them completely. Identical rows must be removed, and any key values with identical values remaining after this have to flag an error. We don't read in the ddl specifications themselves, to avoid messing with hard-coded filenames, so we require the calling function to provide us with this file (not yet implemented).
The match_att
keyword allows us to match blocks/save frames on a
particular attribute, rather than the block name itself. This means
we can do the right thing and compare _name
entries rather than
block names (the default behaviour).
<Merge with another block collection>= (<-U) def merge(self,new_bc,mode="strict",single_block=[], idblock="",match_att=""): if single_block: self.dictionary[single_block[0]].merge(new_bc[single_block[1]],mode, match_att=match_att) return None base_keys = self.keys() block_to_item = base_keys #default new_keys = new_bc.keys() if match_att: #make a blockname -> item name map block_to_item = map(lambda a:self[a].get(match_att,None),self.keys()) # print `block_to_item` for key in new_keys: if key == idblock: continue basekey = key #default value attval = new_bc[key].get(match_att,0) for ii in range(len(block_to_item)): #do this way to get looped names thisatt = block_to_item[ii] #print "Looking for %s in %s" % (attval,thisatt) if attval == thisatt or \ (isinstance(thisatt,ListType) and attval in thisatt): basekey = base_keys.pop(ii) block_to_item.remove(thisatt) break if not self.dictionary.has_key(basekey) or mode=="replace": self.dictionary[basekey] = new_bc[key] else: if mode=="strict": raise StarError( "In strict merge mode: block %s in old and block %s in new files" % (basekey,key)) elif mode=="overlay": self.dictionary[basekey].merge(new_bc[key],mode,match_att=match_att) else: raise StarError( "Merge called with unknown mode %s" % mode)
When validating DDL2-type dictionaries against the DDL spec file, we have to be able to see all values of parent data items across all save frames in order to validate parent-child relations (I've inferred this, but if I ever find a standard document this may turn out to be wrong). So this method is provided to return a list of all values taken by the given attribute within all of the blocks inside a block collection.
A flat list is returned, even if looped values happen to occur in a data block. This is because the one routine that calls this method is interested in whether or not a given value occurs, rather than how it occurs or what it occurs with. We also remove duplicate values.
<Collect all values of a single key in all blocks>= (<-U) def get_all(self,item_name): raw_values = map(lambda a:self[a].get(item_name),self.dictionary.keys()) raw_values = filter(lambda a:a != None, raw_values) ret_vals = [] for rv in raw_values: if isinstance(rv,ListType): for rvv in rv: if rvv not in ret_vals: ret_vals.append(rvv) else: if rv not in ret_vals: ret_vals.append(rv) return ret_vals
Writing all this stuff out to a string. We loop over each of the individual sections, getting their string representation. We implement this using the cStringIO module for faster work. Note that the default output comment specifies a CIF 1.1 standard file.
<Write out to string representation>= (<-U) def WriteOut(self,comment='',wraplength=80,maxoutlength=2048): import cStringIO if not comment: comment = self.header_comment outstring = cStringIO.StringIO() outstring.write(comment) for datablock in self.dictionary.keys(): outstring.write('\n' + self.type_tag +datablock+'\n') self.dictionary[datablock].SetOutputLength(wraplength,maxoutlength) outstring.write(str(self.dictionary[datablock])) returnstring = outstring.getvalue() outstring.close() return returnstring
If we are passed a filename, we open it and read it in, assuming that
it is a conformant STAR file. A StarFile object is a dictionary of
StarBlock objects, accessed by block name.
Parameter maxoutlength
sets the maximum line size for output. If
maxoutlength
is not specified, it defaults to the maximum input
length.
<StarFile class>= (<-U) class StarFile(BlockCollection): <Initialise data structures>
When initialising, we add those parts that are unique to the StarFile as
opposed to a simple collection of blocks - i.e. reading in from a file,
and some line length restrictions. We don't indent this section in the
noweb file, so that our comment characters output at the beginning of the
line. We allow a blocktype
argument so that we can restrict blocks to
be less than a StarBlock - generally, a CifBlock.
We catch any extra arguments and keyword arguments, as we could re-call our initialisation function, which may be a subclass which takes other arguments.
<Initialise data structures>= (<-U) def __init__(self,datasource=None,maxinlength=-1,maxoutlength=0,blocktype=StarBlock,**kwargs): BlockCollection.__init__(self,datasource=datasource,element_class=blocktype,type_tag='data_') self.maxinlength = maxinlength #no restriction if maxoutlength == 0: self.maxoutlength = 2048 else: self.maxoutlength = maxoutlength if type(datasource) is StringType: newself = ReadStar(datasource,self.maxinlength) # print "Reinjecting by calling %s.__init__ with kwargs %s" % (`self.__init__.im_class`,kwargs) self.__init__.im_class.__init__(self,datasource=newself,maxoutlength=maxoutlength,**kwargs) self.header_comment = \ """#\\#STAR ########################################################################## # STAR Format file # Produced by PySTARRW module # # This is a STAR file. STAR is a superset of the CIF file type. For # more information, please refer to International Tables for Crystallography, # Volume G, Chapter 2.1 # ########################################################################## """
Reading in a file. We now use the Yapps2-generated YappsStarParser
module to provide grammar
services. The structure returned from parsing is a StarFile, with
possible grammar violations due to duplicate block or item names.
We allow fast reads using the compiled StarScan module by passing the option 'flex' to this routine.
<Read in a STAR file>= (<-U) def ReadStar(filename,maxlength=2048,dest=StarFile(),scantype='standard'): import YappsStarParser,string filestream = urlopen(filename) text = filestream.read() filestream.close() if not text: # empty file, return empty block return dest # we recognise ctrl-Z as end of file endoffile = text.find('\x1a') if endoffile >= 0: text = text[:endoffile] split = string.split(text,'\n') if maxlength > 0: toolong = filter(lambda a:len(a)>maxlength,split) if toolong: pos = split.index(toolong[0]) raise StarError( 'Line %d contains more than %d characters' % (pos+1,maxlength)) try: if scantype == 'standard': parser = YappsStarParser.StarParser(YappsStarParser.StarParserScanner(text)) else: parser = YappsStarParser.StarParser(YappsStarParser.Scanner(None,[],text,scantype='flex')) proto_star = getattr(parser,"input")() except YappsStarParser.SyntaxError: errorstring = 'Syntax error in input file: last value parsed was %s' % YappsStarParser.lastval errorstring = errorstring + '\nParser status: %s' % `parser._scanner` raise StarError( errorstring) # duplication check on all blocks audit_result = map(lambda a:(a,proto_star[a].audit()),proto_star.keys()) audit_result = filter(lambda a:len(a[1])>0,audit_result) if audit_result: raise StarError( 'Duplicate keys as follows: %s' % `audit_result`) return proto_star
This is the fundamental building block of a StarFile. We abstract a loop to mean a collection of tag value pairs and a collection of zero or more loop blocks (recursive definition). The values have a dimension one less than the values in the loop blocks.
We store the dimension in the class for convenience.
A Star Block is then also a Loop block with dimension zero.
<LoopBlock class>= (<-U) class LoopBlock: <Initialise Loop Block> <Add emulation of a mapping type> <Selection of iterators> <Insert a nested loop> <Remove a nested loop> <Return value of item> <Remove a data item> <Add a data item> <Check data name for STAR conformance> <Check data item for STAR conformance> <Regularise data values> <Get data dimension> <Get complete looped data> <Get nth loop packet> <Get item order> <Change data item order> <Collapse to nth packet> <Audit for repeated names> <Get co-looped names> <Add to looped data> <Functions for printing out>
If given non-zero data to initialise the block with, we either copy (if it is a dictionary) or else initialise each key-value pair separately (if tuples). We take care to include our special "loop" key if it is not in the supplied dictionary, but apart from this we make no check of the actual conformance of the dictionary items.
The dimension parameter refers to the number of dimenstions of the value; zero would be a single value, 1 is a 1-dimensional array, etc.
To manage case insensitivity while preserving the case of items
that we are passed, we store a list of lower-case keys so that we
are not constantly calling the lower()
method of the strings. This
list applies only to the items in the body of the loop, not to any
items in nested loops. However, when searching for items and returning
items, nested loops are searched.
The overwrite
argument allows values to be silently replaced, as per a
normal python dictionary. However, when reading in from a file, we want to
detect duplicated values, so we set this to false.
<Initialise Loop Block>= (<-U) def __init__(self,data = (), dimension = 0, maxoutlength=2048, wraplength=80, overwrite=True): # print 'Creating new loop block, dimension %d' % dimension self.block = {} self.loops = [] self.no_packets = 0 self.item_order = [] self.lower_keys = [] #for efficiency self.dimension = dimension self.popout = False #used during load iteration self.curitem = -1 #used during iteration self.maxoutlength = maxoutlength self.wraplength = wraplength self.overwrite = overwrite if not hasattr(self,'loopclass'): #in case are derived class self.loopclass = LoopBlock #when making new loops self.char_check = re.compile("[][ \n\r\t!%&\(\)*+,./:<=>?@0-9A-Za-z\\\\^`{}\|~\"#$';_-]+",re.M) if isinstance(data,(TupleType,ListType)): for item in data: self.AddLoopItem(item) elif isinstance(data,LoopBlock): self.block = data.block.copy() self.item_order = data.item_order[:] self.lower_keys = data.lower_keys[:] self.dimension = data.dimension # loops as well; change loop class for loopno in range(len(data.loops)): try: placeholder = self.item_order.index(data.loops[loopno]) except ValueError: print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`data.loops[loopno]`,str(data.loops[loopno]),`self.item_order`) placeholder = -1 self.item_order.remove(data.loops[loopno]) #gone newobject = self.loopclass(data.loops[loopno]) # print "Recasting and adding loop %s -> %s" % (`data.loops[loopno]`,`newobject`) self.insert_loop(newobject,position=placeholder)
Adding emulation of a mapping type. We add any of the other
functions we'd like to emulate. __len__
returns the number
of items in this block, either in a loop or not. So it is
not the simple length of the dictionary.
<Add emulation of a mapping type>= (<-U) def __str__(self): return self.printsection() def __setitem__(self,key,value): # catch a one member loop, for convenience # we assume the key is a string value only self.AddLoopItem((key,value)) def __getitem__(self,key): if isinstance(key,IntType): #return a packet!! return self.GetPacket(key) return self.GetLoopItem(key) def __delitem__(self,key): self.RemoveLoopItem(key) def __len__(self): blen = len(self.block) for aloop in self.loops: # print 'Aloop is %s' % `aloop` blen = blen + len(aloop) # also a LoopBlock return blen def __nonzero__(self): if self.__len__() > 0: return 1 return 0 # keys returns all internal keys def keys(self): thesekeys = self.block.keys() for aloop in self.loops: thesekeys.extend(aloop.keys()) return thesekeys def values(self): ourkeys = self.keys() return map(lambda a:self[a],ourkeys) def items(self): ourkeys = self.keys() return map(lambda a,b:(a,b),self.keys(),self.values()) def has_key(self,key): if key.lower() in self.lower_keys: return 1 for aloop in self.loops: if aloop.has_key(key): return 1 return 0 def get(self,key,default=None): if self.has_key(key): retval = self.GetLoopItem(key) else: retval = default return retval def clear(self): self.block = {} self.loops = [] self.item_order = [] self.lower_keys = [] self.no_packets = 0 # doesn't appear to work def copy(self): newcopy = self.copy.im_class(dimension = self.dimension) newcopy.block = self.block.copy() newcopy.loops = [] newcopy.no_packets = self.no_packets newcopy.item_order = self.item_order[:] newcopy.lower_keys = self.lower_keys[:] for loop in self.loops: try: placeholder = self.item_order.index(loop) except ValueError: print "Warning: loop %s (%s) in loops, but not in item_order (%s)" % (`loop`,str(loop),`self.item_order`) placeholder = -1 newcopy.item_order.remove(loop) #gone newobject = loop.copy() # print "Adding loop %s -> %s" % (`loop`,`newobject`) newcopy.insert_loop(newobject,position=placeholder) return newcopy # this is not appropriate for subloops. Instead, the loop block # should be accessed directly for update def update(self,adict): for key in adict.keys(): self.AddLoopItem((key,adict[key]))
There are two potential ways of running over the data in a LoopBlock: we could loop over the set of values in the non-nested values, and return the corresponding nested loop packets in a LoopBlock (a one level iterator), in which case the calling program decides whether or not it wants to dig deeper; or we could recursively expand and loop over all nested loops as well. We set the default behaviour on initialisation to be one-level.
<Selection of iterators>= (<-U) <A load iterator> <A recursive iterator> <A one-level iterator>
When loading values, we want to iterate over the items until a "stop_" token is found - this is communicated via the "popout" attribute changing to True. We save the __iter__ method for iterating over packets. Also, when a new packet is begun, all subloops should be extended correspondingly. We are in a special situation where we don't enforce length matching, as we assume that things will be loaded in as we go.
Each yield returns a list which should be appended to with a unitary item. So, as the number of packets increases, we need to make sure that the lowest level lists are extended as needed with empty lists.
<A load iterator>= (<-U) def load_iter(self,coords=[]): count = 0 #to create packet index while not self.popout: # ok, we have a new packet: append a list to our subloops for aloop in self.loops: aloop.new_enclosing_packet() for iname in self.item_order: if isinstance(iname,LoopBlock): #into a nested loop for subitems in iname.load_iter(coords=coords+[count]): # print 'Yielding %s' % `subitems` yield subitems # print 'End of internal loop' else: if self.dimension == 0: # print 'Yielding %s' % `self[iname]` yield self,self[iname] else: backval = self.block[iname] for i in range(len(coords)): # print 'backval, coords: %s, %s' % (`backval`,`coords`) backval = backval[coords[i]] yield self,backval count = count + 1 # count packets self.popout = False # reinitialise # print 'Finished iterating' yield self,'###Blank###' #this value should never be used # an experimental fast iterator for level-1 loops (ie CIF) def fast_load_iter(self): targets = map(lambda a:self.block[a],self.item_order) while targets: for target in targets: yield self,target # Add another list of the required shape to take into account a new outer packet def new_enclosing_packet(self): if self.dimension > 1: #otherwise have a top-level list for iname in self.keys(): #includes lower levels target_list = self[iname] for i in range(3,self.dimension): #dim 2 upwards are lists of lists of... target_list = target_list[-1] target_list.append([]) # print '%s now %s' % (iname,`self[iname]`)
We recursively expand out all values in nested loops and return a simple dictionary type. Although it only seems to make sense to call this from a dimension 0 LoopBlock, if we are not a level 0 LoopBlock, we drill down until we get a simple value to return, then start looping.
We want to build up a return dictionary by adding keys from the deeper loops, but if we simply use the dictionary update method, we will find that we have stale keys from previous inner loops. Therefore, we keep our values as (key,value) tuples which we turn into a dictionary at the last moment.
<A recursive iterator>= (<-U) def recursive_iter(self,dict_so_far={},coord=[]): # print "Recursive iter: coord %s, keys %s, dim %d" % (`coord`,`self.block.keys()`,self.dimension) my_length = 0 top_items = self.block.items() drill_values = self.block.values() for dimup in range(0,self.dimension): if len(drill_values)>0: drill_values=drill_values[0] #drill in else: raise StarError("Malformed loop packet %s" % `top_items[0]`) my_length = len(drill_values) if self.dimension == 0: for aloop in self.loops: for apacket in aloop.recursive_iter(): # print "Recursive yielding %s" % `dict(top_items + apacket.items())` yield dict(top_items + apacket.items()) else: for i in range(my_length): kvpairs = map(lambda a:(a,self.coord_to_group(a,coord)[i]),self.block.keys()) # print "Recursive kvpairs at %d: %s" % (i,`kvpairs`) if self.loops: for aloop in self.loops: for apacket in aloop.recursive_iter(coord=coord+[i]): # print "Recursive yielding %s" % `dict(kvpairs + apacket.items())` yield dict(kvpairs + apacket.items()) else: # we're at the bottom of the tree # print "Recursive yielding %s" % `dict(kvpairs)` yield dict(kvpairs) # small function to use the coordinates. def coord_to_group(self,dataname,coords): if not isinstance(dataname,StringType): return dataname # flag inner loop processing newm = self[dataname] # newm must be a list or tuple for c in coords: # print "Coord_to_group: %s ->" % (`newm`), newm = newm[c] # print `newm` return newm
Return a series of LoopBlocks with the appropriate packet chosen. This does not loop over interior blocks, so called at the top level it just returns the whole star block.
<A one-level iterator>= (<-U) def flat_iterator(self): if self.dimension == 0: yield copy.copy(self) else: my_length = 0 top_keys = self.block.keys() if len(top_keys)>0: my_length = len(self.block[top_keys[0]]) for pack_no in range(my_length): yield(self.collapse(pack_no))
Insert a subloop. Rather than a simple append, we need to register the order in which this loop appears, by putting its integer index into our item_order array. We can optionally check for duplicate values, which is normally a good idea; however, if we are reading in a file, for efficiency we only do this at the end of input.
<Insert a nested loop>= (<-U) def insert_loop(self,newloop,position=-1,audit=True): # check that new loop is kosher if newloop.dimension != self.dimension + 1: raise StarError( 'Insertion of loop of wrong nesting level %d, should be %d' % (newloop.dimension, self.dimension+1)) self.loops.append(newloop) if audit: dupes = self.audit() if dupes: dupenames = map(lambda a:a[0],dupes) raise StarError( 'Duplicate names: %s' % `dupenames`) if position >= 0: self.item_order.insert(position,newloop) else: self.item_order.append(newloop) # print "Insert loop: item_order now" + `self.item_order`
<Remove a nested loop>= (<-U) def remove_loop(self,oldloop): # print "Removing %s: item_order %s" % (`oldloop`,self.item_order) # print "Length %d" % len(oldloop) self.item_order.remove(oldloop) self.loops.remove(oldloop)
Returning an item value. Note that a looped block has little
meaning without all the items in the loop. Routine GetLoop
is
better in this case. This is a real time-intensive loop, so we
initially assume that the key we have been passed is the right
key (i.e. case is the same) and only check for case if this
fails.
<Return value of item>= (<-U) def GetLoopItem(self,itemname): # assume case is correct first try: return self.block[itemname] except KeyError: for loop in self.loops: try: return loop[itemname] except KeyError: pass if itemname.lower() not in self.lower_keys: raise KeyError, 'Item %s not in block' % itemname # it is there somewhere, now we need to find it real_keys = self.block.keys() lower_keys = map(lambda a:a.lower(),self.block.keys()) try: k_index = lower_keys.index(itemname.lower()) except ValueError: raise KeyError, 'Item %s not in block' % itemname return self.block[real_keys[k_index]]
This function returns the particular loop block containing the specified dataname, so that we can manipulate its contents directly.
<Get complete looped data>= (<-U) def GetLoop(self,keyname): if keyname in self.block: #python 2.2 or above return self for aloop in self.loops: try: return aloop.GetLoop(keyname) except KeyError: pass raise KeyError, 'Item %s does not exist' % keyname
Get nth looped packet. This returns a packet of data, including any nested loops. For a nested loop, we want the set of packets corresponding to the nth outer packet; so after picking out the appropriate elements, we have to transpose so that we have a packet.
<Get nth loop packet>= (<-U) def GetPacket(self,index): thispack = [] for myitem in self.item_order: if isinstance(myitem,LoopBlock): pack_list = map(lambda b:myitem[b][index],myitem.item_order) # print 'Pack_list -> %s' % `pack_list` thispack.append(pack_list) elif self.dimension==0: thispack.append(self[myitem]) else: thispack.append(self[myitem][index]) return thispack
Return order of items - this is just a copy of our item_order array.
<Get item order>= (<-U) def GetItemOrder(self): return self.item_order[:]
Move an item to a different position in the loop. This only affects
the printout order. We allow different capitalisation and have to
absorb the possibility of nested loops in the order list, and being
passed a loop reference in the itemname
argument.
<Change data item order>= (<-U) def ChangeItemOrder(self,itemname,newpos): import string def low_case(item): #need to skip Loop blocks try: return string.lower(item) except AttributeError: return item lowcase_order = map(low_case,self.item_order) try: testname = string.lower(itemname) except AttributeError: testname = itemname testpos = lowcase_order.index(testname) del self.item_order[testpos] # so we have an object ready for action self.item_order.insert(newpos,itemname)
This returns a copy, in theory independent (check that) with just the nth packet selected, and order preserved.
<Collapse to nth packet>= (<-U) def collapse(self,packet_no): if self.dimension == 0: raise StarError( "Attempt to select non-existent packet") newlb = LoopBlock(dimension=self.dimension-1) for one_item in self.item_order: if isinstance(one_item,LoopBlock): newlb.insert_loop(one_item.collapse(packet_no)) else: # print "Collapse: %s -> %s" % (one_item,`self[one_item][packet_no]`) newlb[one_item] = self[one_item][packet_no] return newlb
This function is typically called once by the topmost loop after reading in a complete datablock; if it returns an empty list, that is a guarantee that no datanames are repeated within this loop and subloops. We use the sets module for efficiency (when we go to 2.4 support we'll use the builtin as well).
<Audit for repeated names>= (<-U) def audit(self): import sets allkeys = self.keys() uniquenames = sets.Set(allkeys) if len(uniquenames) == len(allkeys): return [] else: keycount = map(lambda a:(a,allkeys.count(a)),uniquenames) return filter(lambda a:a[1]>1,keycount)
Get co-looped names. Sometimes we just want names, and will get the values ourselves on a need-to-know basis.
<Get co-looped names>= (<-U) def GetLoopNames(self,keyname): if keyname in self: return self.keys() for aloop in self.loops: try: return aloop.GetLoopNames(keyname) except KeyError: pass raise KeyError, 'Item does not exist'
Adding to a loop. We find the loop containing the dataname that
we've been passed, and then append all of the (key,values) pairs that we
are passed in data
, which is a dictionary. We expect that the data
have been sorted out for us, unlike when data are passed in AddLoopItem
,
when there can be both unlooped and looped data in one set. The dataname
passed to this routine is simply a convenient way to refer to the
loop, and has no other significance.
<Add to looped data>= (<-U) def AddToLoop(self,dataname,loopdata): thisloop = self.GetLoop(dataname) for itemname,itemvalue in loopdata.items(): thisloop[itemname] = itemvalue
Removing a data item. We delete the item, and if it is looped, and nothing is left in the loop, we remove that element of the list.
<Remove a data item>= (<-U) def RemoveLoopItem(self,itemname): if self.has_key(itemname): testkey = itemname.lower() real_keys = self.block.keys() lower_keys = map(lambda a:a.lower(),real_keys) try: k_index = lower_keys.index(testkey) except ValueError: #must be in a lower loop for aloop in self.loops: if aloop.has_key(itemname): # print "Deleting %s (%s)" % (itemname,aloop[itemname]) del aloop[itemname] if len(aloop)==0: # all gone self.remove_loop(aloop) break else: del self.block[real_keys[k_index]] self.lower_keys.remove(testkey) # now remove the key in the order list for i in range(len(self.item_order)): if isinstance(self.item_order[i],StringType): #may be loop if self.item_order[i].lower()==testkey: del self.item_order[i] break if len(self.block)==0: #no items in loop, length -> 0 self.no_packets = 0 return #no duplicates, no more checking needed
Adding a data item. This routine adds a single data item to a pre-existing loop, checking both the dimension and length to make sure they match already-existing items. We make a special exception for an empty list on the assumption that it is going to be filled manually (in particular, using load_iter during file reading).
If an item is already stored, it will be silently replaced. Note that we can only guarantee this behaviour, and that duplicate items are not present, if this is called in the top loop. If it is called as a method of an inner loop, only subloops are visible for checking/ replacing. We could get around this restriction by being passed a function which would fix things up for us.
We also check for consistency, by making sure the new item is
not in the block already. If it is, we replace it (consistent with
the meaning of square brackets in Python), unless self.overwrite
is False, in which case an error is raised.
We skip checking of data values if the precheck
value is true- this
is typically set if the item is being read from a file, and so is already
checked.
<Add a data item>= (<-U) def AddLoopItem(self,data,precheck=False,maxlength=-1): # print "Received data %s" % `data` # we accept only tuples, strings and lists!! if isinstance(data[0],(TupleType,ListType)): # internal loop # first we remove any occurences of these datanames in # other loops for one_item in data[0]: if self.has_key(one_item): if not self.overwrite: raise StarError( 'Attempt to insert duplicate item name %s' % data[0]) else: del self[one_item] newloop = self.loopclass(dimension = self.dimension+1) keyvals = zip(data[0],data[1]) for key,val in keyvals: newloop.AddLoopItem((key,val)) self.insert_loop(newloop) elif not isinstance(data[0],StringType): raise TypeError, 'Star datanames are strings only (got %s)' % `data[0]` else: if data[1] == [] or self.get_dim(data[1])[0] == self.dimension: if not precheck: self.check_data_name(data[0],maxlength) # make sure no nasty characters # check that we can replace data if not self.overwrite: if self.has_key(data[0]): raise StarError( 'Attempt to insert duplicate item name %s' % data[0]) # now make sure the data is OK type regval = self.regularise_data(data[1]) if not precheck: try: self.check_item_value(regval) except StarError, errmes: raise StarError( "Item name " + data[0] + " " + `errmes`) if self.dimension > 0: if self.no_packets <= 0: self.no_packets = len(data[1]) #first item if len(data[1]) != self.no_packets: raise StarLengthError, 'Not enough values supplied for %s' % (data[0]) self.RemoveLoopItem(data[0]) # may be different case, so have to do this self.block.update({data[0]:regval}) # trust the data is OK self.lower_keys.append(data[0].lower()) self.item_order.append(data[0]) else: #dimension mismatch raise StarLengthError, "input data dim %d != required dim %d: %s %s" % (self.get_dim(data[1])[0],self.dimension,data[0],`data[1]`)
Checking the data names. The CIF 1.1 standard restricts characters in a data name to ASCII 33-126 and there should be a leading underscore. Items are allowed to have the blank characters as well, i.e. ascii 09,10,13 and 32. Data items may be lists, which we need to detect before checking. We assume that the item has been regularised before this check is called.
<Check data name for STAR conformance>= (<-U) def check_data_name(self,dataname,maxlength=-1): if maxlength > 0: if len(dataname)>maxlength: raise StarError( 'Dataname %s exceeds maximum length %d' % (dataname,maxlength)) if dataname[0]!='_': raise StarError( 'Dataname ' + dataname + ' does not begin with _') if len (filter (lambda a: ord(a) < 33 or ord(a) > 126, dataname)) > 0: raise StarError( 'Dataname ' + dataname + ' contains forbidden characters')
<Check data item for STAR conformance>= (<-U) def check_item_value(self,item): test_item = item if type(item) != TupleType and type(item) != ListType: test_item = [item] #single item list def check_one (it): if type(it) == StringType: if it=='': return me = self.char_check.match(it) if not me: raise StarError( 'Bad character in %s' % it) else: if me.span() != (0,len(it)): raise StarError('Data item "' + it + '"... contains forbidden characters') map(check_one,test_item)
Regularising data. We want the copy.deepcopy operation to work, so we can't have any arrays passed into the master dictionary. We make sure everything goes in either as a single item or as a list/tuple.
<Regularise data values>= (<-U) def regularise_data(self,dataitem): alrighttypes = [IntType, LongType, FloatType, StringType] okmappingtypes = [TupleType, ListType] thistype = type(dataitem) if thistype in alrighttypes or thistype in okmappingtypes: return dataitem # so try to make into a list try: regval = list(dataitem) except TypeError, value: raise StarError( str(dataitem) + ' is wrong type for data value\n' ) return regval
Dimension of data. This would ordinarily be the number of nested levels, and if we have a naked string, we have to return zero. We recursively burrow down to the lowest level. If a list is of zero length, we can't burrow any further, so simply return one more than the current level.
We return as well the length of the received packet.
<Get data dimension>= (<-U) def get_dim(self,dataitem,current=0,packlen=0): zerotypes = [IntType, LongType, FloatType, StringType] if type(dataitem) in zerotypes: return current, packlen elif len(dataitem)>0: # print "Get_dim: %d: %s" % (current,`dataitem`) return self.get_dim(dataitem[0],current+1,len(dataitem)) else: return current+1,0
<Functions for printing out>= (<-U) <Set the output length> <Print a loop block> <Format loop names> <Format loop packets> <Format a single packet item> <Format a string>
For non-default output lengths, we include a function which will set the internal attribute that controls maximum line length. As this is a per-block value, this function is most likely called by the StarFile object rather than directly.
Two values control output line formatting: self.wraplength
and
self.maxoutlength
. self.wraplength
is the value at which the
line will be wrapped normally, but long strings will not force an
internal wrap inside the string; self.maxoutlength
is the absolute
maximum length.
<Set the output length>= (<-U) def SetOutputLength(self,wraplength=80,maxoutlength=2048): if wraplength > maxoutlength: raise StarError("Wrap length (requested %d) must be <= Maximum line length (requested %d)" % (wraplength,maxoutlength)) self.wraplength = wraplength self.maxoutlength = maxoutlength for loop in self.loops: loop.SetOutputLength(wraplength,maxoutlength)
Printing a section. We allow an optional order list to be given, in case the caller wants to order things in some nice way. By default, we use the item_order dictionary item. Naturally, looped items are grouped together according to their relative order in the order list.
Note that we must be careful to add spaces between data items, especially when formatting string loop data, where our string addition could get quite hairy. As we are doing so much concatenation, we use a stringIO buffer to speed it up.
Also, it is conceivable that we print an internal loop without the enclosing loop. This means that we cannot assume that we find ourselves with a nice simple one-dimensional array after selecting out the matrix coordinate of our current packet. Therefore, if we are not starting out with a zero-dimensional block, we use the contents of coord to make our choice for every non-specified dimension.
This routine should not be called recursively.
We attempt some nice formatting by printing non-packet items with an apparent tab stop at 40 characters.
<Print a loop block>= (<-U) def printsection(self,instring='',blockstart="",blockend="",indent=0,coord=[]): import cStringIO import string # first make an ordering order = self.item_order[:] # now do it... if not instring: outstring = cStringIO.StringIO() # the returned string else: outstring = instring if not coord: coords = [0]*(self.dimension-1) else: coords = coord if(len(coords)<self.dimension-1): raise StarError("Not enough block packet coordinates to uniquely define data") # print loop delimiter outstring.write(blockstart) while len(order)>0: # print "Order now: " + `order` itemname = order.pop(0) if self.dimension == 0: # ie value next to tag if not isinstance(itemname,LoopBlock): #no loop itemvalue = self[itemname] if isinstance(itemvalue,StringType): #need to sanitize thisstring = self._formatstring(itemvalue) else: thisstring = str(itemvalue) # try for a tabstop at 40 if len(itemname)<40 and (len(thisstring)-40 < self.wraplength-1): itemname = itemname + ' '*(40-len(itemname)) else: itemname = itemname + ' ' if len(thisstring) + len(itemname) < (self.wraplength-1): outstring.write('%s%s\n' % (itemname,thisstring)) else: outstring.write('%s\n %s\n' % (itemname, thisstring)) else: # we are asked to print an internal loop block #first make sure we have sensible coords. Length should be one #less than the current dimension outstring.write(' '*indent); outstring.write('loop_\n') itemname.format_names(outstring,indent+2) itemname.format_packets(outstring,coords,indent+2) else: # we are a nested loop outstring.write(' '*indent); outstring.write('loop_\n') self.format_names(outstring,indent+2) self.format_packets(outstring,coords,indent+2) if instring: return #inside a recursion else: returnstring = outstring.getvalue() outstring.close() return returnstring
Formatting a loop section. We are passed an indent and destination string, and are expected to append a list of item names to the string indented by the indicated number of spaces. If we have loops, we add those in too.
<Format loop names>= (<-U) def format_names(self,outstring,indent=0): temp_order = self.item_order[:] while len(temp_order)>0: itemname = temp_order.pop(0) if isinstance(itemname,StringType): #(not loop) outstring.write(' ' * indent) outstring.write(itemname) outstring.write("\n") else: # a loop outstring.write(' ' * indent) outstring.write("loop_\n") itemname.format_names(outstring,indent+2) outstring.write(" stop_\n")
Formatting a loop packet. We are passed an array of coordinates into the required packet, of length dim - 1, and have to output the corresponding values. Our final packet will involve collecting the ith value of each item in our particular loop. Note that we have to be careful with indentation, as the <return>; digraph must be recognised.
<Format loop packets>= (<-U) def format_packets(self,outstring,coordinates,indent=0): import cStringIO import string # get our current group of data # print 'Coords: %s' % `coordinates` alldata = map(lambda a:self.coord_to_group(a,coordinates),self.item_order) # print 'Alldata: %s' % `alldata` packet_data = apply(zip,alldata) # print 'Packet data: %s' % `packet_data` curstring = '' for position in range(len(packet_data)): for point in range(len(packet_data[position])): datapoint = packet_data[position][point] packstring = self.format_packet_item(datapoint,indent) if len(curstring) + len(packstring)> self.wraplength-2: #past end of line with space curstring = curstring + '\n' + ' '*indent + packstring elif curstring == '': curstring = curstring + ' '*indent + packstring else: curstring = curstring + ' ' + packstring outstring.write(curstring + '\n') #end of one packet curstring = '' outstring.write(' ' + curstring + '\n') #last time through
Formatting a single packet item - could be a nested packet! If we have a
list of nested packets, we have to transpose first. Note also that a nested
packet implies a STAR file, which means there are no line length restrictions.
We are therefore a bit sloppy with our checking against wraplength
and
maxoutlength
.
<Format a single packet item>= (<-U) def format_packet_item(self,pack_item,indent): # print 'Formatting %s' % `pack_item` curstring = '' if isinstance(pack_item,(StringType,IntType,FloatType,LongType)): if isinstance(pack_item,StringType): thisstring = self._formatstring(pack_item) #no spaces yet if '\n' in thisstring: #must have semicolon digraph then curstring = curstring + thisstring curstring = curstring + (' ' * indent) thisstring = '' else: thisstring = '%s' % str(pack_item) if len(curstring) + len(thisstring)> self.wraplength-2: #past end of line with space curstring = curstring + '\n' #add the space curstring = curstring + (' ' * indent) + thisstring else: curstring = curstring + ' ' + thisstring # Now, for each nested loop we call ourselves again # After first outputting the current line else: # a nested packet if not isinstance(pack_item[0],(ListType,TupleType)): #base packet item_list = pack_item else: item_list = apply(zip,pack_item) for sub_item in item_list: curstring = curstring + ' ' + self.format_packet_item(sub_item,indent) # stop_ is not issued at the end of each innermost packet if isinstance(pack_item[0],(ListType,TupleType)): curstring = curstring + ' stop_ ' return curstring
Formatting a string. We make sure that the length of the item value
is less than self.maxoutlength
, or else we should split them, and so on. We check the
value for terminators and impossible apostrophes and length, before
deciding whether to print it and the item on a single line. We try to
respect carriage returns in the string, if the caller has tried to do
the formatting for us. If we are not putting apostrophes around a
string, we make the first character a space, to avoid problems if the
first character of a line is a semicolon.
The STAR specification states that embedded quotes are allowed so long as they are not followed by a space. So if we find any quotes followed by spaces we output a semicolon-terminated string to avoid too much messing around. This routine is called very often and could be improved.
We have to catch empty strings as well, which are legal. Another gotcha concerns 'embedded' strings; if the datavalue begins with a quote, it will be output verbatim (and misunderstood) unless spaces elsewhere force quotation.
<Format a string>= (<-U) def _formatstring(self,instring): import string if len(instring)==0: return "''" if len(instring)< (self.maxoutlength-2) and '\n' not in instring and not ('"' in instring and '\'' in instring): if not ' ' in instring and not '\t' in instring and not '\v' \ in instring and not '_' in instring and not (instring[0]=="'" or \ instring[0]=='"'): # no blanks return instring if not "'" in instring: #use apostrophes return "'%s'" % (instring) elif not "\"" in instring: return '"%s"' % (instring) # is a long one or one that needs semicolons due to carriage returns outstring = "\n;" # if there are returns in the string, try to work with them while 1: retin = string.find(instring,'\n')+1 if retin < self.maxoutlength and retin > 0: # honour this break outstring = outstring + instring[:retin] instring = instring[retin:] elif len(instring)<self.maxoutlength: # finished outstring = outstring + instring + '\n;\n' break else: # find a space for letter in range(self.maxoutlength-1,self.wraplength-1,-1): if instring[letter] in ' \t\f': break outstring = outstring + instring[:letter+1] outstring = outstring + '\n' instring = instring[letter+1:] return outstring
A Star Block is simply a LoopBlock with a couple of extras to take care of the fact that the topmost level can hold save frames.
<StarBlock class>= (<-U) class StarBlock(LoopBlock): <Initialise Star Block> <Adjust emulation of mapping type>
A Star Block is a Loop Block which can hold save frames in the outermost loop. So essentially the extra value we are adding here is to handle save frames.
<Initialise Star Block>= (<-U) def __init__(self,*pos_args,**keyword_args): LoopBlock.__init__(self,*pos_args,**keyword_args) self.saves = BlockCollection(element_class=LoopBlock,type_tag="save")
Emulation of a mapping type. We catch the saves key and redirect it to our saves attribute, and add printing of the saves block.
<Adjust emulation of mapping type>= (<-U) def __getitem__(self,key): if key == "saves": return self.saves else: return LoopBlock.__getitem__(self,key) def __setitem__(self,key,value): if key == "saves": self.saves[key] = value else: LoopBlock.__setitem__(self,key,value) def clear(self): LoopBlock.clear(self) self.saves = BlockCollection(element_class=LoopBlock,type_tag="save_") def copy(self): newblock = LoopBlock.copy(self) newblock.saves = self.saves.copy() return self.copy.im_class(newblock) #catch inheritance def has_key(self,key): if key == "saves": return 1 else: return LoopBlock.has_key(self,key) def __str__(self): retstr = '' for sb in self.saves.keys(): retstr = retstr + '\nsave_%s\n\n' % sb self.saves[sb].SetOutputLength(self.wraplength,self.maxoutlength) retstr = retstr + str(self.saves[sb]) retstr = retstr + '\nsave_\n\n' return retstr + LoopBlock.__str__(self)
<Define an error class>= (<-U) class StarError(Exception): def __init__(self,value): self.value = value def __str__(self): return '\nStar Format error: '+ self.value class StarLengthError(Exception): def __init__(self,value): self.value = value def __str__(self): return '\nStar length error: ' + self.value