Source code for drizzlepac.staticMask

This module provides functions and classes that manage the creation
of the global static masks.

For `staticMask`, the user interface function is :py:func:`createMask`.

:Authors: Ivo Busko, Christopher Hanley, Warren Hack, Megan Sosey

:License: :doc:`LICENSE`


from __future__ import absolute_import, division, print_function  # confidence high

import os
import sys
from distutils.version import LooseVersion

import numpy as np
from import fileutil, teal, logutil
import astropy
from import fits
from stsci.imagestats import ImageStats
from . import util
from . import processInput

ASTROPY_VER_GE13 = LooseVersion(astropy.__version__) >= LooseVersion('1.3')

__taskname__ = "drizzlepac.staticMask"
_step_num_ = 1

log = logutil.create_logger(__name__)

#this is called by the user
[docs]def createMask(input=None, static_sig=4.0, group=None, editpars=False, configObj=None, **inputDict): """ The user can input a list of images if they like to create static masks as well as optional values for static_sig and inputDict. The configObj.cfg file will set the defaults and then override them with the user options. """ if input is not None: inputDict["static_sig"]=static_sig inputDict["group"]=group inputDict["updatewcs"]=False inputDict["input"]=input else: print >> sys.stderr, "Please supply an input image\n" raise ValueError #this accounts for a user-called init where config is not defined yet configObj = util.getDefaultConfigObj(__taskname__,configObj,inputDict,loadOnly=(not editpars)) if configObj is None: return if not editpars: run(configObj)
#this is called by the TEAL interface
[docs]def run(configObj): #now we really just need the imageObject list created for the dataset filelist,output,ivmlist,oldasndict=processInput.processFilenames(configObj['input'],None) imageObjList=processInput.createImageObjectList(filelist,instrpars={},group=configObj['group']) createStaticMask(imageObjList,configObj)
#this is the workhorse function called by MultiDrizzle
[docs]def createStaticMask(imageObjectList=[],configObj=None,procSteps=None): if procSteps is not None: procSteps.addStep('Static Mask') step_name = util.getSectionName(configObj,_step_num_) if not configObj[step_name]['static']:'Static Mask step not performed.') procSteps.endStep('Static Mask') return if (not isinstance(imageObjectList,list) or (len(imageObjectList) ==0)): msg = "Invalid image object list given to static mask" print(msg, file=sys.stderr) raise ValueError(msg)'USER INPUT PARAMETERS for Static Mask Step:') util.printParams(configObj[step_name], log=log) #create a static mask object myMask = staticMask(configObj) for image in imageObjectList: myMask.addMember(image) # create tmp filename here... #save the masks to disk for later access myMask.saveToFile(imageObjectList) myMask.close() if procSteps is not None: procSteps.endStep('Static Mask')
[docs]def constructFilename(signature): """Construct an output filename for the given signature:: signature=[instr+detector,(nx,ny),detnum] The signature is in the image object. """ suffix = buildSignatureKey(signature) filename = os.path.join('.', suffix) return filename
[docs]def buildSignatureKey(signature): """ Build static file filename suffix used by mkstemp() """ return signature[0]+"_"+str(signature[1][0])+"x"+str(signature[1][1])+"_"+str(signature[2])+"_staticMask.fits"
[docs]class staticMask(object): """ This class manages the creation of the global static mask which masks pixels that are unwanted in the SCI array. A static mask object gets created for each global mask needed, one for each chip from each instrument/detector. Each static mask array has type Int16, and resides in memory. :Notes: Class that manages the creation of a global static mask which is used to mask pixels that are some sigma BELOW the mode computed for the image. """ def __init__(self, configObj=None): # the signature is created in the imageObject class self.masklist={} self.masknames = {} self.step_name=util.getSectionName(configObj,_step_num_) if configObj is not None: self.static_sig = configObj[self.step_name]['static_sig'] else: self.static_sig = 4. # define a reasonable number log.warning('Using default of 4. for static mask sigma.')
[docs] def addMember(self, imagePtr=None): """ Combines the input image with the static mask that has the same signature. Parameters ---------- imagePtr : object An imageObject reference Notes ----- The signature parameter consists of the tuple:: (instrument/detector, (nx,ny), chip_id) The signature is defined in the image object for each chip """ numchips=imagePtr._numchips"Computing static mask:\n") chips = if chips is None: chips = imagePtr.getExtensions() #for chip in range(1,numchips+1,1): for chip in chips: chipid=imagePtr.scienceExt + ','+ str(chip) chipimage=imagePtr.getData(chipid) signature=imagePtr[chipid].signature # If this is a new signature, create a new Static Mask file which is empty # only create a new mask if one doesn't already exist if ((signature not in self.masklist) or (len(self.masklist) == 0)): self.masklist[signature] = self._buildMaskArray(signature) maskname = constructFilename(signature) self.masknames[signature] = maskname else: chip_sig = buildSignatureKey(signature) for s in self.masknames: if chip_sig in self.masknames[s]: maskname = self.masknames[s] break imagePtr[chipid].outputNames['staticMask'] = maskname stats = ImageStats(chipimage,nclip=3,fields='mode') mode = stats.mode rms = stats.stddev nbins = len(stats.histogram) del stats' mode = %9f; rms = %7f; static_sig = %0.2f' % (mode, rms, self.static_sig)) if nbins >= 2: # only combine data from new image if enough data to mask sky_rms_diff = mode - (self.static_sig*rms) np.bitwise_and(self.masklist[signature], np.logical_not(np.less(chipimage, sky_rms_diff)), self.masklist[signature]) del chipimage
def _buildMaskArray(self,signature): """ Creates empty numpy array for static mask array signature. """ return np.ones(signature[1],dtype=np.int16)
[docs] def getMaskArray(self, signature): """ Returns the appropriate StaticMask array for the image. """ if signature in self.masklist: mask = self.masklist[signature] else: mask = None return mask
[docs] def getFilename(self,signature): """Returns the name of the output mask file that should reside on disk for the given signature. """ filename=constructFilename(signature) if(fileutil.checkFileExists(filename)): return filename else: print("\nmMask file for ", str(signature), " does not exist on disk", file=sys.stderr) return None
[docs] def getMaskname(self,chipid): """Construct an output filename for the given signature:: signature=[instr+detector,(nx,ny),detnum] The signature is in the image object and the name of the static mask file is saved as sci_chip.outputNames["staticMask"]. """ return self._image[chipid].outputNames["staticMask"]
[docs] def close(self): """ Deletes all static mask objects. """ for key in self.masklist.keys(): self.masklist[key] = None self.masklist = {}
[docs] def deleteMask(self,signature): """ Delete just the mask that matches the signature given.""" if signature in self.masklist: self.masklist[signature] = None else: log.warning("No matching mask")
[docs] def saveToFile(self,imageObjectList): """ Saves the static mask to a file it uses the signatures associated with each mask to contruct the filename for the output mask image. """ virtual = imageObjectList[0].inmemory for key in self.masklist.keys(): #check to see if the file already exists on disk filename = self.masknames[key] #create a new fits image with the mask array and a standard header #open a new header and data unit newHDU = fits.PrimaryHDU() = self.masklist[key] if virtual: for img in imageObjectList: img.saveVirtualOutputs({filename:newHDU}) else: try: if ASTROPY_VER_GE13: newHDU.writeto(filename, overwrite=True) else: newHDU.writeto(filename, clobber=True)"Saving static mask to disk: %s" % filename) except IOError: log.error("Problem saving static mask file: %s to " "disk!\n" % filename) raise IOError
[docs]def help(file=None): """ Print out syntax help for running astrodrizzle Parameters ---------- file : str (Default = None) If given, write out help to the filename specified by this parameter Any previously existing file with this name will be deleted before writing out the help. """ helpstr = getHelpAsString(docstring=True, show_ver = True) if file is None: print(helpstr) else: if os.path.exists(file): os.remove(file) f = open(file, mode = 'w') f.write(helpstr) f.close()
[docs]def getHelpAsString(docstring = False, show_ver = True): """ return useful help from a file in the script directory called """ install_dir = os.path.dirname(__file__) taskname = util.base_taskname(__taskname__, __package__) htmlfile = os.path.join(install_dir, 'htmlhelp', taskname + '.html') helpfile = os.path.join(install_dir, taskname + '.help') if docstring or (not docstring and not os.path.exists(htmlfile)): if show_ver: helpString = os.linesep + \ ' '.join([__taskname__, 'Version', __version__, ' updated on ', __version_date__]) + 2*os.linesep else: helpString = '' if os.path.exists(helpfile): helpString += teal.getHelpFileAsString(taskname, __file__) else: if __doc__ is not None: helpString += __doc__ + os.linesep else: helpString = 'file://' + htmlfile return helpString
createMask.__doc__ = getHelpAsString(docstring = True, show_ver = False)