Source code for drizzlepac.staticMask

"""
This module provides functions and classes that manage the creation
of the global static masks.

For `staticMask`, the user interface function is :py:func:`createMask`.

:Authors: Ivo Busko, Christopher Hanley, Warren Hack, Megan Sosey

:License: :doc:`LICENSE`

"""

from __future__ import absolute_import, division, print_function  # confidence high

import os
import sys
from distutils.version import LooseVersion

import numpy as np
from stsci.tools import fileutil, teal, logutil
import astropy
from astropy.io import fits
from stsci.imagestats import ImageStats
from . import util
from . import processInput

ASTROPY_VER_GE13 = LooseVersion(astropy.__version__) >= LooseVersion('1.3')

__taskname__ = "drizzlepac.staticMask"
_step_num_ = 1


log = logutil.create_logger(__name__)


#this is called by the user
[docs]def createMask(input=None, static_sig=4.0, group=None, editpars=False, configObj=None, **inputDict): """ The user can input a list of images if they like to create static masks as well as optional values for static_sig and inputDict. The configObj.cfg file will set the defaults and then override them with the user options. """ if input is not None: inputDict["static_sig"]=static_sig inputDict["group"]=group inputDict["updatewcs"]=False inputDict["input"]=input else: print >> sys.stderr, "Please supply an input image\n" raise ValueError #this accounts for a user-called init where config is not defined yet configObj = util.getDefaultConfigObj(__taskname__,configObj,inputDict,loadOnly=(not editpars)) if configObj is None: return if not editpars: run(configObj)
#this is called by the TEAL interface
[docs]def run(configObj): #now we really just need the imageObject list created for the dataset filelist,output,ivmlist,oldasndict=processInput.processFilenames(configObj['input'],None) imageObjList=processInput.createImageObjectList(filelist,instrpars={},group=configObj['group']) createStaticMask(imageObjList,configObj)
#this is the workhorse function called by MultiDrizzle
[docs]def createStaticMask(imageObjectList=[],configObj=None,procSteps=None): if procSteps is not None: procSteps.addStep('Static Mask') step_name = util.getSectionName(configObj,_step_num_) if not configObj[step_name]['static']: log.info('Static Mask step not performed.') procSteps.endStep('Static Mask') return if (not isinstance(imageObjectList,list) or (len(imageObjectList) ==0)): msg = "Invalid image object list given to static mask" print(msg, file=sys.stderr) raise ValueError(msg) log.info('USER INPUT PARAMETERS for Static Mask Step:') util.printParams(configObj[step_name], log=log) #create a static mask object myMask = staticMask(configObj) for image in imageObjectList: myMask.addMember(image) # create tmp filename here... #save the masks to disk for later access myMask.saveToFile(imageObjectList) myMask.close() if procSteps is not None: procSteps.endStep('Static Mask')
[docs]def constructFilename(signature): """Construct an output filename for the given signature:: signature=[instr+detector,(nx,ny),detnum] The signature is in the image object. """ suffix = buildSignatureKey(signature) filename = os.path.join('.', suffix) return filename
[docs]def buildSignatureKey(signature): """ Build static file filename suffix used by mkstemp() """ return signature[0]+"_"+str(signature[1][0])+"x"+str(signature[1][1])+"_"+str(signature[2])+"_staticMask.fits"
[docs]class staticMask(object): """ This class manages the creation of the global static mask which masks pixels that are unwanted in the SCI array. A static mask object gets created for each global mask needed, one for each chip from each instrument/detector. Each static mask array has type Int16, and resides in memory. :Notes: Class that manages the creation of a global static mask which is used to mask pixels that are some sigma BELOW the mode computed for the image. """ def __init__(self, configObj=None): # the signature is created in the imageObject class self.masklist={} self.masknames = {} self.step_name=util.getSectionName(configObj,_step_num_) if configObj is not None: self.static_sig = configObj[self.step_name]['static_sig'] else: self.static_sig = 4. # define a reasonable number log.warning('Using default of 4. for static mask sigma.')
[docs] def addMember(self, imagePtr=None): """ Combines the input image with the static mask that has the same signature. Parameters ---------- imagePtr : object An imageObject reference Notes ----- The signature parameter consists of the tuple:: (instrument/detector, (nx,ny), chip_id) The signature is defined in the image object for each chip """ numchips=imagePtr._numchips log.info("Computing static mask:\n") chips = imagePtr.group if chips is None: chips = imagePtr.getExtensions() #for chip in range(1,numchips+1,1): for chip in chips: chipid=imagePtr.scienceExt + ','+ str(chip) chipimage=imagePtr.getData(chipid) signature=imagePtr[chipid].signature # If this is a new signature, create a new Static Mask file which is empty # only create a new mask if one doesn't already exist if ((signature not in self.masklist) or (len(self.masklist) == 0)): self.masklist[signature] = self._buildMaskArray(signature) maskname = constructFilename(signature) self.masknames[signature] = maskname else: chip_sig = buildSignatureKey(signature) for s in self.masknames: if chip_sig in self.masknames[s]: maskname = self.masknames[s] break imagePtr[chipid].outputNames['staticMask'] = maskname stats = ImageStats(chipimage,nclip=3,fields='mode') mode = stats.mode rms = stats.stddev nbins = len(stats.histogram) del stats log.info(' mode = %9f; rms = %7f; static_sig = %0.2f' % (mode, rms, self.static_sig)) if nbins >= 2: # only combine data from new image if enough data to mask sky_rms_diff = mode - (self.static_sig*rms) np.bitwise_and(self.masklist[signature], np.logical_not(np.less(chipimage, sky_rms_diff)), self.masklist[signature]) del chipimage
def _buildMaskArray(self,signature): """ Creates empty numpy array for static mask array signature. """ return np.ones(signature[1],dtype=np.int16)
[docs] def getMaskArray(self, signature): """ Returns the appropriate StaticMask array for the image. """ if signature in self.masklist: mask = self.masklist[signature] else: mask = None return mask
[docs] def getFilename(self,signature): """Returns the name of the output mask file that should reside on disk for the given signature. """ filename=constructFilename(signature) if(fileutil.checkFileExists(filename)): return filename else: print("\nmMask file for ", str(signature), " does not exist on disk", file=sys.stderr) return None
[docs] def getMaskname(self,chipid): """Construct an output filename for the given signature:: signature=[instr+detector,(nx,ny),detnum] The signature is in the image object and the name of the static mask file is saved as sci_chip.outputNames["staticMask"]. """ return self._image[chipid].outputNames["staticMask"]
[docs] def close(self): """ Deletes all static mask objects. """ for key in self.masklist.keys(): self.masklist[key] = None self.masklist = {}
[docs] def deleteMask(self,signature): """ Delete just the mask that matches the signature given.""" if signature in self.masklist: self.masklist[signature] = None else: log.warning("No matching mask")
[docs] def saveToFile(self,imageObjectList): """ Saves the static mask to a file it uses the signatures associated with each mask to contruct the filename for the output mask image. """ virtual = imageObjectList[0].inmemory for key in self.masklist.keys(): #check to see if the file already exists on disk filename = self.masknames[key] #create a new fits image with the mask array and a standard header #open a new header and data unit newHDU = fits.PrimaryHDU() newHDU.data = self.masklist[key] if virtual: for img in imageObjectList: img.saveVirtualOutputs({filename:newHDU}) else: try: if ASTROPY_VER_GE13: newHDU.writeto(filename, overwrite=True) else: newHDU.writeto(filename, clobber=True) log.info("Saving static mask to disk: %s" % filename) except IOError: log.error("Problem saving static mask file: %s to " "disk!\n" % filename) raise IOError
[docs]def help(file=None): """ Print out syntax help for running astrodrizzle Parameters ---------- file : str (Default = None) If given, write out help to the filename specified by this parameter Any previously existing file with this name will be deleted before writing out the help. """ helpstr = getHelpAsString(docstring=True, show_ver = True) if file is None: print(helpstr) else: if os.path.exists(file): os.remove(file) f = open(file, mode = 'w') f.write(helpstr) f.close()
[docs]def getHelpAsString(docstring = False, show_ver = True): """ return useful help from a file in the script directory called __taskname__.help """ install_dir = os.path.dirname(__file__) taskname = util.base_taskname(__taskname__, __package__) htmlfile = os.path.join(install_dir, 'htmlhelp', taskname + '.html') helpfile = os.path.join(install_dir, taskname + '.help') if docstring or (not docstring and not os.path.exists(htmlfile)): if show_ver: helpString = os.linesep + \ ' '.join([__taskname__, 'Version', __version__, ' updated on ', __version_date__]) + 2*os.linesep else: helpString = '' if os.path.exists(helpfile): helpString += teal.getHelpFileAsString(taskname, __file__) else: if __doc__ is not None: helpString += __doc__ + os.linesep else: helpString = 'file://' + htmlfile return helpString
createMask.__doc__ = getHelpAsString(docstring = True, show_ver = False)