#interactive plots
import matplotlib
try :
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
plt.ion()
except :
print('TkAgg failed')
import numpy as np
import os
import pdb
import glob
import yaml
import socket
import time
import datetime
import multiprocessing as mp
import threading
import yaml
import subprocess
# set up logger
import logging
import yaml
import logging.config
logger=logging.getLogger(__name__)
from astropy.coordinates import SkyCoord, EarthLocation, Angle
import astropy.units as u
from astropy.io import fits
from astropy.time import Time
from astropy.table import Table
from astroquery.vo_conesearch import ConeSearch, conesearch
from astroquery.vizier import Vizier
import astroquery.utils
astroquery.utils.suppress_vo_warnings()
import cal
import fitsheader
import mail
# alpaca imports, put in try/except for readthedocs
try:
from alpaca import discovery, management
from alpaca.telescope import *
from alpaca.covercalibrator import *
from alpaca.dome import *
from alpaca.safetymonitor import *
from alpaca.focuser import *
from alpaca.filterwheel import *
from alpaca.camera import *
from alpaca.switch import *
except:
logger.exception('no alpaca')
# pwi4 HTTP interface
import pwi4_client
from pyvista import tv, skycalc, centroid, stars, image, spectra, imred
import focus
import status
import database
from collections import namedtuple
Exposure = namedtuple('Exposure', ['hdu', 'name', 'exptime', 'filter'])
DomeStatus = namedtuple('DomeStatus', ['shutterstate', 'az', 'slewing', 'lights'])
# some global variables
dataroot = None
sync_process = None
guide_process = None
disp = None
# discovery seems to fail on 10.75.0.0, so hardcode servers
def ascom_init(svrs) :
global D, S, T, F, Filt, C, Covers, SW
global camindex
camindex = np.zeros(4,dtype=int)-1
D, S, T, F, Filt, C, Covers,SW = (None, None, None, [], None, [], None, [])
print("Alpaca devices: ")
if svrs is None : return
for svr in svrs:
print(f" At {svr}")
print (f" V{management.apiversions(svr)} server")
print (f" {management.description(svr)['ServerName']}")
devs = management.configureddevices(svr)
def isconnected(dev,target,append=False) :
try:
dev.Connected
if append : target.append(dev)
else :target = dev
except :
pass
return target
# open Alpaca devices
for dev in devs:
print(f" {dev['DeviceType']}[{dev['DeviceNumber']}]: {dev['DeviceName']}")
if dev['DeviceType'] == 'Telescope' :
T =isconnected(Telescope(svr,dev['DeviceNumber']),T)
elif dev['DeviceType'] == 'Dome' :
D = isconnected(Dome(svr,dev['DeviceNumber']),D)
elif dev['DeviceType'] == 'CoverCalibrator' :
Covers = isconnected(CoverCalibrator(svr,dev['DeviceNumber']),Covers)
elif dev['DeviceType'] == 'Focuser' :
F = isconnected(Focuser(svr,dev['DeviceNumber']),F,append=True)
elif dev['DeviceType'] == 'FilterWheel' :
Filt = isconnected(FilterWheel(svr,dev['DeviceNumber']),Filt)
elif dev['DeviceType'] == 'Camera' :
C = isconnected(Camera(svr,dev['DeviceNumber']),C,append=True)
elif dev['DeviceType'] == 'Safetymonitor' :
S = isconnected(SafetyMonitor(svr,dev['DeviceNumber']),S)
elif dev['DeviceType'] == 'Switch' :
SW = isconnected(Switch(svr,dev['DeviceNumber']),SW,append=True)
cams = ['ICX285','ICX694','KAF_8300','QHY']
for i in range(4) :
for index,c in enumerate(C) :
try :
if cams[i] in c.SensorName :
camindex[i] = index
except : pass
try: C[getcam(0)].Magnification=1.33
except : print("can't access C[getcam(0)]")
print()
print("All ASCOM commands available through devices: ")
print(' T : telescope commands')
print(' C : camera commands')
print(' F : focusser command')
print(' Filt : filter wheel commands')
print(' D : dome commands')
[docs]
def getcam(camera=None) :
""" Get correct list index for specified camera (ASCOM doesn't always deliver them in order!)
camera=0 : Port 2 camera
camera=1 : Port 2 spectrograph camera (eShel)
camera=2 : Port 1 camera (QSI)
camera=3 : SONG spectrograph camera
"""
if camera is None :
print('Cameras: ')
print(' 0: Port 2 guide camera (Atik w/ICX285 detector)')
print(' 1: Port 2 spectrograph camera eShel (Atik w/ICX694 detector)')
print(' 2: Port 1 camera (QSI w/KAF_800 detector)')
print(' 3: SONG spectrograph camera (QHY 600)')
return
global camindex
if camindex[camera]>=0 : return camindex[camera]
#for index,c in enumerate(C) :
# if cams[camera] in c.SensorName :
# return index
print('no such camera!')
[docs]
def getfocuser(focuser) :
""" Get correct list index for specified focuser (ASCOM doesn't always deliver them in order!)
focuser='PWI' : Port 1 focuser (PWI)
focuser='Zaber' : Port 2 focuser (Zaber)
focuser='Iodine' : Port 2 iodine stage (LTS150)
focuser='Calibration' : Port 2 calibration stage (LTS150)
focuser='PLL' : Spectrograph focus (PrimaLuceLab Esatto 3.5")
"""
for index,c in enumerate(F) :
if focuser in c.Name :
return index
print('no such focuser!')
return -1
[docs]
def getswitch(switch) :
""" Get correct list index for specified switch (ASCOM doesn't always deliver them in order!)
switch='TC300' : Thorlabs temperature controller
switch='K8056' : relay for Shelyak calibration control
switch='LCUS' : LCUS relay for calibration shutter
switch='Yocto' : Thermocouples on QHY600
switch='Wanderer' : Wanderer PowerBox for spectrograph CCD and focuser USB reset
switch='TCube' : TCube chiller
"""
for index,c in enumerate(SW) :
if switch in c.Name :
return index
print('no such switch!')
return -1
[docs]
def wait_moving(Foc) :
""" Check if input Focuser is stil moving
"""
while True :
time.sleep(1)
try :
if not Foc.IsMoving : break
except :
logger.exception('error with F.IsMoving')
break
time.sleep(1)
return
# Camera commands
[docs]
def qck(exptime,filt='current') :
""" (shorthand) Take exposure without saving to disk, current filter by default
"""
expose(exptime,filt)
[docs]
def gexp(*args, **kwargs) :
""" Expose with guide camera, see expose() for keywords
Parameters
----------
exptime : float
Exposure time in seconds
bin : int, default=1
binning factor (both x and y)
display : pyvista tv, default=None
pyvista display tool to display into if specified
name : str, default=None
root file to save image to
"""
if 'bin' not in kwargs :
kwargs['bin'] = 1
return expose(*args, cam=0, filt=None, **kwargs)
[docs]
def sexp(*args,**kwargs) :
""" Expose with spectrograph camera, see expose() for keywords
Parameters
----------
exptime : float
Exposure time in seconds
bin : int, default=2
binning factor (both x and y)
display : pyvista tv, default=None
pyvista display tool to display into if specified
name : str, default=None
root file to save image to
"""
if 'bin' not in kwargs :
kwargs['bin'] = 2
return expose(*args, cam=3, filt=None, **kwargs)
[docs]
def expose(exptime=1.0,filt='current',bin=3,box=None,light=True,display=None,name=None,
min=None, max=None, cam=0, insert=True, targ=None, avg=1, imagetyp='unspecified', header=None, fast=False) :
""" Take an exposure with camera
Parameters
----------
exptime : float
Exposure time in seconds
filt : str
Name of filter
bin : int, default=3
binning factor (both x and y)
box : pyvista BOX, default=None
if specified, window to box parameters
light : bool, default=True
open shutter for exposure?
display : pyvista tv, default=None
pyvista display tool to display into if specified
name : str, default=None
root file to save image to
Returns
-------
named tuple: Exposure = namedtuple('Exposure', ['hdu', 'name', 'exptime', 'filter'])
"""
exposure = Exposure(None,None,None,None)
if pwi is not None :
stat = pwi.status()
if stat.m3.port == 1 :
if filt is not None and filt != 'current' and filt != 'iodine' :
pos = np.where(np.array(Filt.Names) == filt)
if len(pos) == 0 :
logger.warning('no such filter')
logger.warning('available filters: ', Filt.Names)
return exposure
Filt.Position=pos[0]
elif filt == 'current' :
filt = Filt.Names[Filt.Position]
else :
if filt == 'iodine' :
iodine_in()
elif filt == 'open' :
filt = 'None'
iodine_out()
else :
filt = 'None'
else :
filt = 'None'
try :
icam = getcam(cam)
C[icam].BinX=bin
C[icam].BinY=bin
if box is not None :
C[icam].StartX = box.xmin
C[icam].StartY = box.ymin
nx = box.ncol()
ny = box.nrow()
else :
C[icam].StartX = 0
C[icam].StartY = 0
nx = C[icam].CameraXSize//bin
ny = C[icam].CameraYSize//bin
C[icam].NumX = nx
C[icam].NumY = ny
t = Time.now()
# if requested, average avg exposures
for iavg in range(avg) :
C[icam].StartExposure(exptime,light)
if int(exptime) > 5 :
#print countdown
for i in range(int(exptime),0,-1) :
print('{:<4d}'.format(i),end='\r')
time.sleep(0.99)
while not C[icam].ImageReady or C[icam].CameraState != 0:
time.sleep(0.25)
if iavg == 0 :
if avg == 1 :
data = np.array(C[icam].ImageArray)
else :
data = np.array(C[icam].ImageArray).astype(float)
else :
data += np.array(C[icam].ImageArray)
if avg == 1 :
data = data.T
else :
data = data.T/avg
data=data.astype(np.uint16)
except :
logger.exception('ERROR : exposure failed')
return exposure
if display is not None :
display.tv(data,min=min,max=max)
hdu=fits.PrimaryHDU(data)
hdu.header['ORIGIN'] = 'SONG/APO'
hdu.header['DATE'] = Time.now().fits
hdu.header['OBSERVAT'] = 'APO'
hdu.header['TELESCOP'] = ('Node 4', 'APO SONG 1m')
if cam == 3 :
hdu.header['INSTRUME'] = 'Spectrograph'
elif cam == 1 :
hdu.header['INSTRUME'] = 'Acquisition/guider'
hdu.header['IMAGETYP'] = imagetyp
hdu.header['IMFORM'] = 'FITS'
hdu.header['DATATYPE'] = 'Counts'
if not fast :
fitsheader.camera(hdu,C[getcam(cam)],exptime,avg,light)
if header is not None : fitsheader.mixed(hdu,header)
fitsheader.object(hdu,targ)
try : focval = specfoc()
except : focval = None
fitsheader.spectrograph(hdu,focval)
try : focval = foc()
except : focval = None
fitsheader.telescope(hdu,pwi.status(),focval,D.Azimuth)
pos = iodine_position()
if abs(float(pos)-config['iodinestage_in_pos']) < 1. :
i2pos = 4
else :
i2pos = 0
temp1,temp2 = iodine_tget().split()[2:]
fitsheader.fpu(hdu,iodine_position(),i2pos,temp1,temp2,
calstage_position(),SW[getswitch('K8056')],filt)
fitsheader.weather(hdu)
#fitsheader.sunmoon(hdu)
fitsheader.time(hdu,t)
tab=Table()
# write file to disk if name given
if name is not None :
y,m,d,hr,mi,se = t.ymdhms
try: dirname = os.path.dirname('{:s}/UT{:d}{:02d}{:02d}/{:s}'.format(dataroot,y-2000,m,d,name))
except : pdb.set_trace()
try: os.makedirs(dirname)
except : pass
files = glob.glob(dirname+'/*.fits')
exts = []
for f in files :
exts.append(int(f.split('.')[-2]))
if len(exts) > 0 : ext = np.array(exts).max() + 1
else : ext=1
outname = '{:s}/{:s}.{:04d}.fits'.format(dirname,os.path.basename(name).replace(' ','_'),ext)
songname = 's4_{:04d}-{:02d}-{:02d}T{:02d}-{:02d}-{:02d}.fits'.format(
y,m,d,hr,mi,int(se))
hdu.header['APONAME'] = outname
hdu.header['SONGNAME'] = songname
hdu.writeto(outname)
# Create hard link for SONG directory and file name
try : project = header['PROJECT']
except : project = 'APO'
if cam == 3 and project != 'APO' :
dirname = os.path.dirname(
'/data/song/star_spec/{:04d}/{:04d}{:02d}{:02d}/night/raw/'.format(y,y,m,d))
try: os.makedirs(dirname)
except : pass
os.link(outname,'{:s}/{:s}'.format(dirname,songname))
# construct Exposure object to return, and populate filename in tab
exposure = Exposure(hdu, outname, exptime, filt)
tmp=outname.split('/')
tab['file'] = [tmp[-2]+'/'+tmp[-1]]
else :
outname=''
exposure = Exposure(hdu, None, exptime, filt)
tab['file'] = ['']
if insert and not fast:
cards = ['DATE-OBS','MJD-DATE','EXPTIME','FILTER','FOCUS','CCD_TEMP',
'HOR_BIN','VER_BIN','RA','DEC','AZ','ALT','ROT',
'SPECFOC','I_POS','I_TEMP1', 'I_TEMP2',
'CAL_POS','TUNGSTEN','LED','THAR']
cols = ['dateobs','mjd','exptime','filter','focus','ccdtemp',
'xbin','ybin','ra','dec','az','alt','rot',
'specfoc','iodine_position','iodine_temp1', 'iodine_temp2',
'calstage_position','tungsten','led','thar']
for card,col in zip(cards,cols) :
try : tab[col] = [hdu.header[card]]
except KeyError: print('no {:s} card found'.format(card))
tab['camera'] = [cam]
# insert exposure into database
try :
d=database.DBSession()
d.ingest('obs.exposure',tab,onconflict='update')
d.close()
except :
print('error loading exposure into database')
return exposure
[docs]
def settemp(temp,cam=0) :
""" Change detector temperature set point and turn cooler on
"""
icam=getcam(cam)
C[icam].SetCCDTemperature = temp
C[icam].CoolerOn = True
[docs]
def chiller(temp=None) :
""" Get/set chiller temperature
"""
if temp is not None :
SW[getswitch('TCube')].Action('tset',0,temp)
return float(SW[getswitch('TCube')].Action('get_tact',0))
[docs]
def chiller_fault(temp=None) :
""" Get chiller fault status
"""
return SW[getswitch('TCube')].Action('get_fault',0)
[docs]
def cooler(state=True,cam=0) :
""" Set detector cooler state on/off
"""
icam=getcam(cam)
C[icam].CoolerOn = state
[docs]
def filtname() :
""" Return current filter name
"""
return Filt.Names[Filt.Position]
[docs]
def focrun(cent,step,n,exptime=1.0,filt='V',bin=3,box=None,display=None,
max=30000, thresh=25,cam=0,plot=False) :
""" Obtain a focus run
Parameters
----------
cent : int
Middle focus value
step : int
Focus step size
n : int
Number of steps to take, centered on middle value
exptime : float
Exposure time
filt : str
Filter to use
bin : int, default=3
Binning factor in x and y
box : pyvista BOX, default=None
if specified, window to box parameters
disp : pyvista tv, default=None
If given, display each image as it is being taken into specified device
Returns
-------
List of file names taken for focus run
"""
foc0=foc()
files=[]
names=[]
images=[]
focvals= np.arange(int(cent)-n//2*int(step),int(cent)+n//2*int(step)+1,int(step))
for i,focval in enumerate(focvals) :
foc(int(focval))
if i == 0 : time.sleep(3)
logger.info('position: {:d}'.format(foc()))
exp = expose(exptime,filt,box=box,bin=bin,display=display,
max=max,name='focus_{:d}'.format(focval),cam=cam)
hdu=exp.hdu
tmp=exp.name.split('/')
name=tmp[-2]+'/'+tmp[-1]
names.append(name)
files.append(exp.name)
images.append(hdu)
try :
bestfitfoc, bestfithf, bestfoc, besthf = focus.focus(files,pixscale=pixscale(cam,bin=bin),
display=display,max=max,thresh=thresh,plot=plot)
if bestfitfoc > 0 :
logger.info('setting focus to best fit focus : {:.1f} with hf diameter {:.2f}'.format(
bestfitfoc,bestfithf))
f=foc(int(bestfitfoc))
best=bestfithf
else :
logger.info('setting focus to minimum image focus : {:.1f} with hf diameter {:.2f}'.format(
bestfoc,besthf))
f=foc(int(bestfoc))
best=besthf
except :
bestfitfoc, bestfithf, bestfoc, besthf, best = -1, -1., -1, -1., -1.
logger.exception('focus failed')
f=foc(foc0)
tab=Table()
tab['mjd'] = [Time.now().mjd]
tab['exptime'] = [exptime]
tab['filter'] = [filt]
tab['bin'] = [bin]
tab['camera'] = [cam]
tab['focvals'] = [focvals]
tab['bestfoc'] = [bestfoc]
tab['besthf'] = [besthf]
tab['bestfitfoc'] = [bestfitfoc]
tab['bestfithf'] = [bestfithf]
tab['files'] = [names]
d=database.DBSession()
d.ingest('obs.focus',tab,onconflict='update')
d.close()
if display is not None : display.tvclear()
return f, focvals, best
[docs]
def pixscale(cam=0,bin=1) :
""" Return pixscale for desired camera
"""
icam=getcam(cam)
scale=206265/6000*C[icam].PixelSizeX*1.e-3*bin
try: scale /= C[icam].Magnification
except: pass
return scale
[docs]
def telescope_status() :
""" Return telescope status
"""
stat = pwi.status()
stat.RightAscension = T.RightAscension
stat.Declination = T.Declination
stat.Azimuth = T.Azimuth
stat.Altitude = T.Altitude
return stat
[docs]
def slew(ra, dec,dome=True) :
""" Slew to RA/DEC and wait for telescope/dome
Parameters
----------
ra : float or str
RA in hours (float), or hh:mm:ss (str)
dec : float or str
DEC in degrees (float), or dd:mm:ss (str)
"""
domesync(False)
if (isinstance(ra,float) or isinstance(ra,int) ) and \
(isinstance(dec,float) or isinstance(dec,int) ) :
pwi.mount_goto_ra_dec_j2000(ra,dec)
elif isinstance(ra,str) and isinstance(dec,str) :
coords=SkyCoord("{:s} {:s}".format(ra,dec),unit=(u.hourangle,u.deg))
pwi.mount_goto_ra_dec_j2000(coords.ra.value/15.,coords.dec.value)
#tra, tdec = j2000totopocentric(ra,dec)
#T.SlewToCoordinatesAsync(tra,tdec)
time.sleep(5)
while T.Slewing :
time.sleep(1)
T.Tracking = True
# Wait for dome slewing to stop
domesync(dome)
while True :
if not T.Slewing :
time.sleep(10)
if not D.Slewing : break
[docs]
def altaz(az,alt) :
""" Slew to specified az / alt
Parameters
----------
az : float
az in degrees
alt : float
alt in degrees
"""
pwi.mount_goto_alt_az(alt, az)
[docs]
def usno(ra=None,dec=None,rad=1*u.degree,mag='Rmag',magmin=0,magmax=20,goto=True,
cat= 'I/252') :
""" Find/goto nearest USNO stars from specified position (default current telescope) and mag range
Parameters
----------
ra : str or float, default=None
RA to search around, default current telescope position
dec : str or float, default=None
DEC to search around, default current telescope position
rad : angular quantity, default = 1*u.degree
Radius to use for sear4ch
bmin, bmax : float, default=(0,15)
Minimum, maximum Bmag
rmin, rmax : float, default=(0,15)
Minimum, maximum Rmag
goto : bool, default=True
If True, slew to closest returned object
cat : str, default= 'The USNO-A2.0 Catalogue 1'
astroquery conesearch catalog to search from
hr=V/50, sao=I/131A/sao (but doesn't have J2000)
"""
if ra is None :
stat = pwi.status()
ra = stat.mount.ra_j2000_hours
dec = stat.mount.dec_j2000_degs
coords=SkyCoord("{:f} {:f}".format(ra,dec),unit=(u.hourangle,u.deg))
elif (isinstance(ra,float) or isinstance(ra,int) ) and \
(isinstance(dec,float) or isinstance(dec,int) ) :
coords=SkyCoord("{:f} {:f}".format(ra,dec),unit=(u.hourangle,u.deg))
else :
coords=SkyCoord("{:s} {:s}".format(ra,dec),unit=(u.hourangle,u.deg))
try :
viz=Vizier(columns=['*','+_r'],catalog=cat)
if mag is not None :
result=viz.query_region(coords,radius=rad,
column_filters={mag:'<{:f}'.format(magmax)})[0]
gd=np.where(result[mag]>magmin)[0]
result=result[gd]
else :
result=viz.query_region(coords,radius=rad)[0]
except :
logger.warning('usno: no stars found within specified rad and mag range')
return
best = result['_r'].argmin()
logger.info(result[best])
if goto:
try:
slew(result['RAJ2000'][best]/15., result['DEJ2000'][best])
except :
slew(result['RA_ICRS'][best]/15., result['DE_ICRS'][best])
[docs]
def center(display,x0=None,y0=None,exptime=5,bin=1,filt=None,settle=3,cam=0) :
""" Center star
"""
exp=expose(exptime,filt=filt,bin=bin)
display.tv(exp.hdu)
print('mark star on display: ')
k,x,y=display.tvmark()
icam=getcam(cam)
if x0 is None : x0=C[icam].NumX//2
if y0 is None : y0=C[icam].NumY//2
offsetxy((x-x0),(y-y0),scale=pixscale())
time.sleep(settle)
[docs]
def guide(cmd,verbose=True, **kwargs) :
""" Send command to guider process
"""
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1',5000))
if cmd == 'start' :
for key,value in zip(kwargs.keys(),kwargs.values()) :
cmd+=(' {:s}={:d}'.format(key,value))
s.send(cmd.encode())
if cmd == 'status' :
try :
out = s.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK)
print('out: ', out)
except BlockingIOError :
print('blockingIOError')
pass
except :
print('recv failed')
raise Exception('error with guider')
out=s.recv(1024)
if verbose : print('received {:s}'.format(out.decode()))
s.close()
if 'fail' in out.decode() :
return False
else :
return True
[docs]
def offsetxy(dx,dy,sign=-1,scale=0.16,pa=None) :
"""
Offset in detector coordinates
"""
pa = sign*rotator()*np.pi/180.
dra = -dx*np.cos(pa) - dy*np.sin(pa)
ddec = -dx*np.sin(pa) + dy*np.cos(pa)
#offset(dra*scale,ddec*scale)
offset(dra*scale,ddec*scale)
[docs]
def offset(dra, ddec) :
"""
One or more of the following offsets can be specified as a keyword argument:
AXIS_reset: Clear all position and rate offsets for this axis. Set this to any value to issue the command.
AXIS_stop_rate: Set any active offset rate to zero. Set this to any value to issue the command.
AXIS_add_arcsec: Increase the current position offset by the specified amount
AXIS_set_rate_arcsec_per_sec: Continually increase the offset at the specified rate
As of PWI 4.0.11 Beta 7, the following options are also supported:
AXIS_stop: Stop both the offset rate and any gradually-applied commands
AXIS_stop_gradual_offset: Stop only the gradually-applied offset, and maintain the current rate
AXIS_set_total_arcsec: Set the total accumulated offset at the time the command is received to the specified value. Any in-progress rates or gradual offsets will continue to be applied on top of this.
AXIS_add_gradual_offset_arcsec: Gradually add the specified value to the total accumulated offset. Must be paired with AXIS_gradual_offset_rate or AXIS_gradual_offset_seconds to determine the timeframe over which the gradual offset is applied.
AXIS_gradual_offset_rate: Paired with AXIS_add_gradual_offset_arcsec; Specifies the rate at which a gradual offset should be applied. For example, if an offset of 10 arcseconds is to be applied at a rate of 2 arcsec/sec, then it will take 5 seconds for the offset to be applied.
AXIS_gradual_offset_seconds: Paired with AXIS_add_gradual_offset_arcsec; Specifies the time it should take to apply the gradual offset. For example, if an offset of 10 arcseconds is to be applied over a period of 2 seconds, then the offset will be increasing at a rate of 5 arcsec/sec.
Where AXIS can be one of:
ra: Offset the target Right Ascension coordinate
dec: Offset the target Declination coordinate
axis0: Offset the mount's primary axis position
(roughly Azimuth on an Alt-Az mount, or RA on In equatorial mount)
axis1: Offset the mount's secondary axis position
(roughly Altitude on an Alt-Az mount, or Dec on an equatorial mount)
path: Offset along the direction of travel for a moving target
transverse: Offset perpendicular to the direction of travel for a moving target
"""
pwi.mount_offset(ra_add_arcsec=dra/np.cos(T.Declination*np.pi/180.))
pwi.mount_offset(dec_add_arcsec=ddec)
[docs]
def j2000totopocentric(ra,dec) :
""" Routine to convert J2000 coordinates to topocentric RA/DEC
"""
#from coordio import sky, site, time
coords=sky.ICRS(np.array([[15*ra,dec]]))
s=site.Site('APO')
s.set_time(time.Time()) # defaults to now
obs=sky.Observed(coords,site=s)
print('topo: ',obs.ra/15.,obs.dec)
return obs.ra/15., obs.dec
# tpm routines
#apo=EarthLocation.of_site('APO')
#d=datetime.now()
#utc = tpm.gcal2j(d.year,d.month,d.day)
#tt = tpm.utc3tdb(utc)
#v6 = convert.cat2v6(ra*np.pi/180,dec*np.pi/180)
#v6_app = convert.convertv6(s1=6,s2=16,lon=apo.lon.value,lat=apo.lat.value,alt=apo.height.value)
[docs]
def rotator(offset=90.) :
""" Get current rotator position angle, from parallactic angle and altitude for port 2 and telescope status for port 1
Parameters
----------
offset : float, default=100
Angle constant (degrees) that needs to be set depending on camera/fixed rotator position for port 2
"""
stat = pwi.status()
if stat.m3.port == 2 :
t=Time(Time.now(),location=EarthLocation.of_site('APO'))
lst=t.sidereal_time('mean').value
ha = lst - T.RightAscension
pa = skycalc.parang(ha,T.Declination,'APO')-T.Altitude+offset
else :
inst = stat.rotator.mech_position_degs
pa = stat.rotator.field_angle_degs
return pa
[docs]
def tracking(tracking) :
""" Set telescope tracking on (True) or off (False)
Parameters
----------
tracking : bool
if True, turn tracking on, if False, turn tracking off
"""
T.Tracking= tracking
[docs]
def park() :
""" Park telescope and dome
"""
domesync(False)
D.Park()
try: T.Park()
except: logger.error('telescope.Park raised an exception')
[docs]
def foc_home(port=None) :
""" Send focus to home
"""
if port is None :
stat = pwi.status()
port = stat.m3.port
if port == 1 :
index = getfocuser('PWI')
else :
index = getfocuser('Zaber')
F[index].Action('home')
wait_moving(F[index])
[docs]
def foc(val=None, relative=False, port=None) :
""" Change focus, depending on port
"""
if port is None :
stat = pwi.status()
port = stat.m3.port
if port == 1 :
index = getfocuser('PWI')
else :
index = getfocuser('Zaber')
if val is not None :
if relative :
if val == 0 : return F[index].Position
val += F[index].Position
F[index].Move(val)
wait_moving(F[index])
return F[index].Position
[docs]
def specfoc(val=None) :
""" Change spectrograph focus
"""
index=getfocuser('PLL')
if val is not None :
F[index].Move(val)
wait_moving(F[index])
return F[index].Position
[docs]
def iodine_tset(val=None,tmax=66) :
""" Get/set iodine cell set temperature and (re)enable heaters
"""
iswitch = getswitch('TC300')
if val is not None :
if val > tmax :
print('values > {:d} must be explicitly allowed with tmax= keyword'.format(tmax))
return
SW[iswitch].SetSwitchValue(0,val)
SW[iswitch].SetSwitchValue(1,0) # no second channel on new iodine cell
iodine_set('enable',1)
tset1 = SW[iswitch].Action('get_tset',0)
tset2 = SW[iswitch].Action('get_tset',1)
return f'set temperature: {tset1} {tset2}'
[docs]
def iodine_tget() :
""" Get/set iodine cell actual temperature
"""
iswitch = getswitch('TC300')
try :
tact1 = SW[iswitch].GetSwitchValue(0)
tact2 = SW[iswitch].GetSwitchValue(1)
return f'actual temperature: {tact1} {tact2}'
except :
return f'actual temperature: -1 -1'
[docs]
def iodine_set(quantity,val) :
""" Miscellaneous TC300 commands: enable, dark, light
"""
iswitch = getswitch('TC300')
if quantity in ['enable','dark','light'] :
q1 = SW[iswitch].Action('set_{:s}'.format(quantity),0,val)
q2 = SW[iswitch].Action('set_{:s}'.format(quantity),1,val)
return f'{q1} {q2}'
else :
print('unknown quantity')
[docs]
def iodine_get(quantity) :
""" Get iodine cell quantity
"""
iswitch = getswitch('TC300')
if quantity in ['tset','voltage','current','enable','dark','light'] :
q1 = SW[iswitch].Action('get_{:s}'.format(quantity),0)
q2 = SW[iswitch].Action('get_{:s}'.format(quantity),1)
return f'{q1} {q2}'
else :
print('unknown quantity')
[docs]
def iodine_in(val=None,focoffset=None) :
""" Move iodine cell into beam
"""
# don't move if already there, to avoid extra focus change`
if val is None : val = config['iodinestage_in_pos']
if focoffset is None : focoffset = config['df_iodine']
ipos = iodine_position()
if ipos < 0 :
print("error with iodine stage")
return
if abs(ipos-val) > 0.1 :
iodine_position(val)
foc(focoffset,relative=True,port=2)
time.sleep(5)
else :
print('iodine stage already at desired postion, no motion or focus offset done')
[docs]
def iodine_out(val=None,focoffset=None) :
""" Move iodine cell out of beam
"""
# don't move if already there, to avoid extra focus change`
if val is None : val = config['iodinestage_out_pos']
if focoffset is None : focoffset = config['df_iodine']
ipos = iodine_position()
if ipos < 0 :
print("error with iodine stage")
return
if abs(ipos-val) > 0.1 :
iodine_position(val)
foc(-1*focoffset,relative=True,port=2)
time.sleep(5)
else :
print('iodine stage already at desired postion, no motion or focus offset done')
[docs]
def iodine_home() :
""" Send iodine stage to home
"""
index=getfocuser('Iodine')
F[index].Action('home')
[docs]
def iodine_position(val=None) :
""" Get/set iodine stage position
"""
index=getfocuser('Iodine')
try :
if val is not None :
F[index].Move(int(val*1000.))
wait_moving(F[index])
return F[index].Position/1000.
except :
return -1
[docs]
def calstage_home() :
""" Send calibration stage to home
"""
index=getfocuser('Calibration')
F[index].Action('home')
[docs]
def calstage_position(val=None) :
""" Get/set calibration stage position
"""
index=getfocuser('Calibration')
if val is not None :
F[index].Move(int(val*1000.))
wait_moving(F[index])
return F[index].Position/1000.
[docs]
def calstage_in(val=None,calfoc=None) :
""" Move calibration stage into beam
"""
# don't move if already there, to avoid extra focus change
if val is None : val = config['calstage_in_pos']
if calfoc is None : calfoc = config['calstage_focus']
foc(calfoc)
calstage_position(val)
[docs]
def calstage_out(val=None) :
""" Move calibration stage out of beam
"""
# don't move if already there, to avoid extra focus change
if val is None : val = config['calstage_out_pos']
calstage_position(val)
[docs]
def calstage_find(display=None) :
""" Find calibration spot location
"""
calstage_in()
cal.lamps(quartz=True,led=True)
im=gexp(0.002,display=display,max=60000).hdu.data
y0,x0=np.unravel_index(np.argmax(im),im.shape)
mask=np.zeros_like(im)
yg,xg=np.mgrid[0:mask.shape[0],0:mask.shape[1]]
r2=(xg-config['hole_pos'][1])**2+(yg-config['hole_pos'][0])**2
bd=np.where(r2<7**2)
mask[bd]=1
cent=centroid.rasym_centroid(im,x0,y0,rad=35,mask=mask)
logger.info('Calibration spot center: {:.2f} {:.2f}'.format(cent.x,cent.y))
if display is not None :
display.tvclear()
display.tvcirc(cent.x,cent.y,50)
config['calstage_in_pos'] -= (cent.y-config['hole_pos'][0])/10*.05
gexp(0.002,display=display,max=60000).hdu.data
cal.lamps()
calstage_in()
return config['calstage_in_pos']
[docs]
def fans_on(roles=None):
""" Turn on PWI fans
Parameters
----------
roles: if None, turn on all fans
Otherwise, can be a CSV string of one or more fan roles to turn on:
m1rear: Primary mirror fans (rear fans only)
m1side: Primary mirror fans (side fans only)
m2: Secondary mirror fans
m3: M3 mirror fans
m1heaters: Primary mirror heat distribution fans
"""
logger.info("Turning fans on ...")
pwi.fans_on(roles)
[docs]
def fans_off(roles=None):
""" Turn off PWI fans
"""
logger.info("Turning fans off ...")
pwi.fans_off(roles)
[docs]
def port(port) :
""" Change PWI M3 port
"""
pwi.m3_goto(port)
[docs]
def mirror_covers(open=False) :
""" Open/close mirror covers
"""
current = Covers.CoverState.value
if open and current == 3 : return
if not open and current == 1 : return
altaz(T.Azimuth,85.)
logger.info('waiting for telescope to slew to high altitude...')
while T.Slewing :
time.sleep(1)
if open and current != 3 :
Covers.OpenCover()
elif not open and current != 1 :
Covers.CloseCover()
logger.info('waiting 20 seconds for mirror covers to close...')
time.sleep(20)
[docs]
def louvers(open=False) :
""" Open louvers
"""
if open :
logger.info('opening louvers...')
subprocess.run("ms on 9",shell=True)
else :
logger.info('closing louvers...')
subprocess.run("ms off 9",shell=True)
subprocess.run("ms list 9",shell=True)
def coverstate() :
print(Covers.CoverState.value)
[docs]
def issafe() :
""" Query SafetyMonitor for safe to open
"""
return S.IsSafe
[docs]
def override(t,verbose=True) :
""" Set override to allow open
"""
try :
if verbose :
resp = input("Are you sure you to override 3.5m/2.5m dome opening requirement? CTRL-C to abort: ")
else :
resp = 'y'
if resp != 'n' and resp != 'N' :
S.Action('override',int(t))
print('override in place for {:d} seconds'.format(int(t)))
except :
print('override aborted',resp)
[docs]
def domestatus() :
""" Return dome azimuth and shutter status
"""
out=subprocess.run("ms list 3",shell=True,capture_output=True,text=True)
if 'On' in out.stdout : light = True
else : light = False
stat = DomeStatus(D.ShutterStatus.value,D.Azimuth,D.Slewing,light)
return stat
[docs]
def domehome() :
""" Home dome
"""
D.FindHome()
[docs]
def domeopen(dome=True,covers=True,fans=True,louvers=False) :
""" Open dome, mirror covers, louvers and start fans, as requested
"""
if dome and D.ShutterStatus.name != 'shutterOpen' :
logger.info('opening shutter...')
D.OpenShutter()
# Wait for shutter open before opening mirror covers
logger.info('waiting for shutter to open...')
while D.ShutterStatus.name != 'shutterOpen' :
time.sleep(1)
if covers and Covers.CoverState.name != 'Open':
mirror_covers(True)
logger.info('waiting for mirror covers to open...')
while Covers.CoverState.value != 3 :
time.sleep(1)
if fans :
logger.info('turning fans on...')
fans_on()
if louvers :
logger.info('opening louvers...')
louvers(True)
[docs]
def domeclose(dome=True,covers=True,fans=True, closelouvers=True) :
""" Close mirror covers and dome
"""
if closelouvers :
logger.info('closing louvers...')
louvers(False)
if fans :
logger.info('turning off fans...')
fans_off()
if covers :
mirror_covers(False)
if dome :
# don't wait for mirror covers to report closed, in case they don't!
park()
logger.info('closing shutter...')
D.CloseShutter()
# put this at end in case guider has died and this won't return
logger.info('stopping guider...')
try : guide('stop')
except : pass
[docs]
def domesync(dosync=True,manual=False) :
""" Start/stop domesync thread
"""
global sync_process, run_sync
def sync(update=5) :
while run_sync :
while T.Slewing or D.Slewing :
time.sleep(1)
# Get telescope az 3x and median to avoid glitches
az=[]
for i in range(3) :
az.append(T.Azimuth)
time.sleep(0.2)
D.SlewToAzimuth(np.median(az))
time.sleep(update)
if manual :
print('Rotate dome manually....then continue')
pdb.set_trace()
return
if dosync and sync_process is None :
logger.info('starting dome sync')
sync_process=threading.Thread(target=sync)
run_sync = True
sync_process.start()
elif dosync == False and sync_process is not None :
logger.info('stopping dome sync')
run_sync = False
sync_process.join()
sync_process = None
[docs]
def start_status(camera=True) :
""" Start status window thread
"""
global proc
# allow for no camera state to avoid command collisions
if camera :
Cam = C
else :
Cam = None
proc = mp.Process(target=status.status,
kwargs={'pwi' : pwi, 'T' : T, 'D' : D, 'F' : F, 'Filt' : Filt, 'C' : Cam, 'Covers': Covers})
proc.daemon = True
proc.start()
[docs]
def stop_status() :
""" Stop status window thread
"""
proc.terminate()
def commands() :
print()
print("Observatory commands")
print(" domeopen(): open dome, mirror covers")
print(" domeclose(): close mirror covers, dome, and park")
print(" issafe(): check safety monitor for status")
print()
print("Dome commands")
print(" domehome(): move dome to home position")
print(" domesync(True|False): sync dome to telescope position")
print(" louvers(True|False): open/close louvers")
print()
print("Telescope commands")
print(" slew(ra,dec): slew to coordinates")
print(" offset(ra,dec): offset telescope in sky coordinates (arcsec) ")
print(" offsetxy(x,y): offset telescope in detector coordinates (pixels)")
print(" usno([ra,dec]) : find/slew to USNO A2.0 star")
print(" altaz(az,alt): slew to az/alt coordinates")
print(" foc(focus) : set focus to specified value")
print(" foc_home() : home focus (port=2 only)")
print(" tracking(True|False): turn tracking on/off")
print(" mirror_covers(True|False): control mirror covers")
print(" fans(True|False): control telescope fans")
print(" port(port): move tertiary to requested port")
print(" park(): park telescope and dome")
print()
print("Camera commands")
print(" expose(exptime,filt,**kwargs: take an exposure with specified camera")
print(" gexp(exptime,filt,**kwargs: take an exposure with guide camera (frontend to expose)")
print(" sexp(exptime,filt,**kwargs: take an exposure with spectrograph camera (frontend to expose)")
print(" focrun(cent,step,nsteps,exptime,filt,**kwargs): take series of exposures at different focus positions")
print(" settemp(temp): set camera temperature set point")
print(" cooler(state): set camera cooler state on (True) or off (False)")
print(" chiller([temp]): get/set chiller temperature")
print(" chiller_fault(): get chiller fault status")
print()
print("Iodine commands")
print(" iodine_in() : move iodine cell into beam, and adjust focus")
print(" iodine_out() : move iodine cell out of beam, and adjust focus")
print(" iodine_position([val]) : get or set (with val) iodine stage position")
print(" iodine_home() : home iodine stage")
print(" iodine_tset(val) : set iodine temperature (both channels)")
print(" iodine_tget(): get actual iodine temperatures")
print(" iodine_get(quantity): get various TC300 values (tset,voltage,current,enable,dark,light)")
print()
print("Calibration commands")
print(" calstage_in() : move calibration stage into beam, and adjust focus (if needed)")
print(" calstage_out() : move calibration stage out of beam, and adjust focus (if needed)")
print(" calstage_position([val]) : get or set (with val) iodine stage position")
print(" calstage_home() : home iodine stage")
print(" cal.getlamps() : get eShel lamp status")
print(" cal.lamps() : control eShel lamps")
print(" cal.cals() : turn lamps on, take sequences of flats and ThAr, turn lamps off")
print(" cal.shutter() : open/close shutter in direct calibration feed")
print()
print("Use help(command) for more details")
[docs]
def devices() :
""" List connected ASCOM devices
"""
global D, S, T, F, Filt, C, Covers, SW
print('Dome: ')
print(D.Name, D.Description)
print('Telescope (T): ')
print(' {:s}, {:s}'.format(T.Name, T.Description))
print('Mirror Covers (Covers) : ')
try : print(' {:s}, {:s}'.format(Covers.Name, Covers.Description))
except : print(' NOT FOUND')
print('FilterWheel (Filt) : ')
try: print(' {:s}, {:s}'.format(Filt.Name, Filt.Description))
except : print(' NOT FOUND')
print('SafetyMonitor (S) : ')
print(' {:s} {:s}'.format(S.Name, S.Description))
print('Focusers (F[]): ')
for i,dev in enumerate(F) :
print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description))
print('Cameras (C[]): ')
for i,dev in enumerate(C) :
print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description))
print('Switches (SW[]): ')
for i,dev in enumerate(SW) :
print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description))
[docs]
def init() :
""" Start ascom and pwi connections and pyvista display
"""
global config, dataroot, pwi_srv
try :
with open('aposong.yml','r') as config_file :
config = yaml.safe_load(config_file)
except:
print('no configuration file found')
config={}
config['devices']={}
config['devices']['ascom_search'] = False
config['devices']['ascom_srvs'] = []
config['devices']['pwi_srv'] = None
config['dataroot'] = './'
if config['devices']['ascom_search'] :
svrs=discovery.search_ipv4(timeout=30,numquery=3)
else :
svrs=config['devices']['ascom_srvs']
dataroot=config['dataroot']
try : updatecamera=config['updatecamera']
except : updatecamera=True
ascom_init(svrs)
print('pwi_init...')
pwi_srv = config['devices']['pwi_srv']
pwi_init(pwi_srv)
commands()
def pwi_init(pwi_srv) :
global pwi
print('Starting PWI4 client...')
if pwi_srv is not None :
pwi=pwi4_client.PWI4(host=pwi_srv)
for axis in [0,1] : pwi.mount_enable(axis)
else :
pwi = None
[docs]
def disp_init() :
""" Start display with good geometry
"""
try :
disp=tv.TV(figsize=(9.5,6))
disp.fig.canvas.manager.window.wm_geometry('-0+0')
except :
print("Can't open display")
return disp
[docs]
def alert(message,loggers=None,recipients=None) :
""" Print alert, and log and email as requeste3d
"""
print('{:s} aposong alert: {:s}'.format(Time.now().fits,message))
if loggers is not None :
if isinstance(loggers,str) : loggers=list(logger)
for logger in loggers :
logger.info(message)
if recipients is not None :
mail.send(recipients,subject='aposong alert: {:s}'.format(Time.now().fits),message=message)
[docs]
def isguideok(ok,loggers=None,recipients=None) :
""" Check to see if guider is responding, if not send alert/log as requested (if a state change)
Parameters
----------
ok : bool
current state of guider going into call
loggers : logger or list of loggers, default=None
loggers to send message not
recipients : list of addresses, default=None
addresses to send alerts to, if specified
"""
try :
guide('status',verbose=False)
if not ok :
# if guider has changed state, log and alert
alert('guider OK, resuming operations',loggers=loggers,recipients=recipients)
return True
except :
if ok :
# if guider has changed state, log and alert
msg='guider failed, suspending operations. Check guider process in guider tab on song1m, quit/CTRL-\\ and restart guider'
alert(msg,loggers=loggers,recipients=recipients)
return False
def isdomeok(ok,loggers=None,recipients=None) :
try :
test=D.Azimuth
if not ok :
# if dome had changed state, log and alert
alert('dome OK, resuming operations',loggers=loggers,recipients=recipients)
return True
except :
if ok :
# if dome had changed state, log and alert
msg='dome failed, suspending operations. Check dome1m desktop, kill dome_app.py process (CTRL-\\) and restart python dome_app.py'
alert(msg,loggers=loggers,recipients=recipients)
return False
def istelescopeok(ok,loggers=None,recipients=None) :
try :
stat=telescope_status()
if stat.RightAscension != 0 or stat.Declination != 0 :
if not ok :
# if dome had changed state, log and alert
alert('telescope OK, resuming operations',loggers=loggers,recipients=recipients)
return True
else :
if ok :
# if telescope has changed state, log and alert
msg='telescope failed 1, suspending operations. Check pwi1m desktop: is PWI4 running and connected? Is ASCOM remote running?'
alert(msg,loggers=loggers,recipients=recipients)
return False
except :
if ok :
# if telescope has changed state, log and alert
msg='telescope failed 2, suspending operations. Check pwi1m desktop: is PWI4 running and connected? Is ASCOM remote running?'
alert(msg,loggers=loggers,recipients=recipients)
return False
def isccdok(ok,loggers=None,recipients=None) :
try :
temp=C[2].CCDTemperature
if temp != 0 :
if not ok :
# if CCD has changed state, log and alert
alert('CCD temperature OK',loggers=loggers,recipients=recipients)
return True
else :
if ok :
# if CCD has changed state, log and alert
msg='CCD temperature=0. Check spec1m desktop: Is ASCOM remote running? Should it be restarted?'
alert(msg,loggers=loggers,recipients=recipients)
return False
except :
if ok :
# if CCD has changed state, log and alert
alert('CCD temperature=0',loggers=loggers,recipients=recipients)
return False
def logtest() :
logger.info('logging test')
if __name__ == '__main__' :
try :
with open('logging.yml', 'rt') as f:
logconfig = yaml.safe_load(f.read())
logging.config.dictConfig(logconfig)
for handler in logging.root.manager.loggerDict['aposong'].handlers :
if handler.name == 'daily' :
handler.atTime = datetime.time(hour=12)
logger=logging.getLogger('aposong')
except FileNotFoundError :
#trap for readthedocs
print('logging.yml not found')
except :
print('error with logging')
from aposong import *
import robotic
init()
from aposong import *
disp=disp_init()