# Copyright (C) 2016 Jeremy Sanders <jeremy@jeremysanders.net>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 3 of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Module to interrogate xspec to get count rates and luminosities
given model parameters.
"""
import subprocess
import os
import select
import atexit
import re
import sys
import signal
[docs]
def deleteFile(filename):
"""Delete file, ignoring errors."""
try:
os.unlink(filename)
except OSError:
pass
# keep track of xspec invocations which need finishing
_finishatexit = []
# tcl code to do an infinite evaluation of commands until end
# includes commands by default
# - set trace elements to Fe abundance (1.0 by default)
tclloop = '''
autosave off
xset APEC_TRACE_ABUND Fe
while { 1 } {
set s [gets stdin]
if { [eof stdin] } {
tclexit
}
eval $s
}
'''
[docs]
class XSpecHelper:
"""A helper to get count rates for temperature and densities."""
specialcode = '@S@T@V'
specialre = re.compile('%s (.*) %s' % (specialcode, specialcode))
xspeccmd = 'xspec'
def __init__(self):
try:
self.xspecsub = subprocess.Popen(
[self.xspeccmd],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
bufsize=1,
universal_newlines=True
)
except OSError:
raise RuntimeError('Failed to start xspec')
self.throwAwayOutput()
self.tempoutput = None
_finishatexit.append(self)
self.write('set SCODE %s\n' % self.specialcode)
# debugging
logfile = os.path.join(os.environ['HOME'], 'xspec.log.%i' % id(self))
deleteFile(logfile)
#self.xspecsub.stdin.write('log %s\n' % logfile)
# form of xspec loop where it doesn't blow up if this program
# closes its stdin
self.write(tclloop)
def write(self, text):
self.xspecsub.stdin.write(text)#.encode('utf-8'))
[docs]
def loadXCM(self, xcmfile):
"""Load an xcm file into xspec."""
assert os.path.exists(xcmfile)
self.write('@%s\n' % xcmfile)
self.throwAwayOutput()
[docs]
def throwAwayOutput(self):
"""Ignore output from program until no more data available."""
while True:
i, o, e = select.select([self.xspecsub.stdout.fileno()], [], [], 0)
if i:
t = os.read(i[0], 1024)
if not t: # file was closed
break
else:
break
[docs]
def readResult(self):
"""Return result from xspec."""
search = None
while not search:
line = self.xspecsub.stdout.readline()
#line = line.decode('utf-8')
search = XSpecHelper.specialre.search(line)
return search.group(1)
[docs]
def setAbund(self, abund):
"""Set model abundance."""
self.write('abund %s\n' % abund)
self.throwAwayOutput()
[docs]
def getRate(self):
"""Get rate from current model in current noticed band."""
self.write('puts "$SCODE [tcloutr rate 1] $SCODE"\n')
retn = self.readResult()
modelrate = float( retn.split()[2] )
return modelrate
[docs]
def getFlux(self, emin_keV, emax_keV):
"""Get flux from current model in energy range given."""
self.write('flux %e %e\n' % (emin_keV, emax_keV))
self.write('puts "$SCODE [tcloutr flux] $SCODE"\n')
flux = float( self.readResult().split()[0] )
return flux
[docs]
def changeResponse(self, rmf, arf, minenergy_keV, maxenergy_keV):
"""Create a fake spectrum using the response and use energy range given."""
self.setPlaw(0.1, 1.7, 1)
self.tempoutput = '/tmp/mbproj2d_temp_%i.fak' % os.getpid()
deleteFile(self.tempoutput)
self.write('data none\n')
if arf is None:
arf = 'none'
elif not os.path.isfile(arf):
raise RuntimeError('ARF does not exist')
if not os.path.isfile(rmf):
raise RuntimeError('RMF does not exist')
self.write('fakeit none & %s & %s & y & foo & %s & 1.0\n' %
(rmf, arf, self.tempoutput))
# this is the range we are interested in getting rates for
self.write('ignore **:**-%f,%f-**\n' % (minenergy_keV, maxenergy_keV))
self.throwAwayOutput()
[docs]
def dummyResponse(self):
"""Make a wide-energy band dummy response."""
self.write('data none\n')
self.write('dummyrsp 0.01 100. 10000\n')
self.throwAwayOutput()
[docs]
def setPlaw(self, NH_1022pcm2, gamma, norm):
"""Set an absorbed powerlaw model."""
self.write(
'model TBabs(powerlaw) & %g & %g & %g\n' %
(NH_1022pcm2, gamma, norm)
)
self.throwAwayOutput()
[docs]
def setApec(self, NH_1022pcm2, T_keV, Z_solar, redshift, norm):
"""Set an absorbed apec model."""
self.write(
'model TBabs(apec) & %g & %g & %g & %g & %g\n' %
(NH_1022pcm2, T_keV, Z_solar, redshift, norm)
)
self.throwAwayOutput()
def finish(self):
self.write('tclexit\n')
#self.xspecsub.stdin.flush()
self.throwAwayOutput()
self.xspecsub.stdout.close()
self.xspecsub.wait()
if self.tempoutput:
deleteFile(self.tempoutput)
del _finishatexit[ _finishatexit.index(self) ]
[docs]
class XSpecContext:
"""Manager to handle creating xspec instance and cleaning up."""
def __enter__(self):
self.xspec = XSpecHelper()
return self.xspec
def __exit__(self, ttype, value, tb):
self.xspec.finish()
def _finishXSpecs():
"""Finish any remaining xspecs if finish() does not get called above."""
while _finishatexit:
_finishatexit[0].finish()
atexit.register(_finishXSpecs)
# multiprocessing doesn't call atexit unless we do this
def sigterm(num, frame):
_finishXSpecs()
sys.exit()
signal.signal(signal.SIGTERM, sigterm)