Source code for aposong

#interactive plots
import matplotlib
try :
    matplotlib.use('TkAgg')
    import matplotlib.pyplot as plt
    plt.ion()
except :
    print('TkAgg failed')

import numpy as np
import os
import pdb
import glob
import yaml
import socket
import time
import datetime
import multiprocessing as mp
import threading
import yaml
import subprocess

# set up logger
import logging
import yaml
import logging.config
logger=logging.getLogger(__name__)

from astropy.coordinates import SkyCoord, EarthLocation, Angle
import astropy.units as u
from astropy.io import fits
from astropy.time import Time
from astropy.table import Table

from astroquery.vo_conesearch import ConeSearch, conesearch
from astroquery.vizier import Vizier
import astroquery.utils
astroquery.utils.suppress_vo_warnings()

import cal
import fitsheader
import mail

# alpaca imports, put in try/except for readthedocs
try:
  from alpaca import discovery, management
  from alpaca.telescope import *
  from alpaca.covercalibrator import *
  from alpaca.dome import *
  from alpaca.safetymonitor import *
  from alpaca.focuser import *
  from alpaca.filterwheel import *
  from alpaca.camera import *
  from alpaca.switch import *
except:
  logger.exception('no alpaca')

# pwi4 HTTP interface
import pwi4_client

from pyvista import tv, skycalc, centroid, stars, image, spectra, imred
import focus
import status
import database

from collections import namedtuple
Exposure = namedtuple('Exposure', ['hdu', 'name', 'exptime', 'filter'])
DomeStatus = namedtuple('DomeStatus', ['shutterstate', 'az', 'slewing', 'lights'])

# some global variables
dataroot = None
sync_process = None
guide_process = None
disp = None

# discovery seems to fail on 10.75.0.0, so hardcode servers
def ascom_init(svrs) :
    global D, S, T, F, Filt, C, Covers, SW
    global camindex 
    camindex = np.zeros(4,dtype=int)-1
    D, S, T, F, Filt, C, Covers,SW  = (None, None, None, [], None, [], None, [])
    print("Alpaca devices: ")
    if svrs is None : return
    for svr in svrs:
        print(f"  At {svr}")
        print (f"    V{management.apiversions(svr)} server")
        print (f"    {management.description(svr)['ServerName']}")
        devs = management.configureddevices(svr)
        def isconnected(dev,target,append=False) :
            try: 
                dev.Connected
                if append : target.append(dev)
                else :target = dev
            except :
                pass
            return target
                
        # open Alpaca devices
        for dev in devs:
            print(f"      {dev['DeviceType']}[{dev['DeviceNumber']}]: {dev['DeviceName']}")
            if dev['DeviceType'] == 'Telescope' :
                T =isconnected(Telescope(svr,dev['DeviceNumber']),T)
            elif dev['DeviceType'] == 'Dome' :
                D = isconnected(Dome(svr,dev['DeviceNumber']),D)
            elif dev['DeviceType'] == 'CoverCalibrator' :
                Covers = isconnected(CoverCalibrator(svr,dev['DeviceNumber']),Covers)
            elif dev['DeviceType'] == 'Focuser' :
                F = isconnected(Focuser(svr,dev['DeviceNumber']),F,append=True)
            elif dev['DeviceType'] == 'FilterWheel' :
                Filt = isconnected(FilterWheel(svr,dev['DeviceNumber']),Filt)
            elif dev['DeviceType'] == 'Camera' :
                C = isconnected(Camera(svr,dev['DeviceNumber']),C,append=True)
            elif dev['DeviceType'] == 'Safetymonitor' :
                S = isconnected(SafetyMonitor(svr,dev['DeviceNumber']),S)
            elif dev['DeviceType'] == 'Switch' :
                SW = isconnected(Switch(svr,dev['DeviceNumber']),SW,append=True)

    cams = ['ICX285','ICX694','KAF_8300','QHY']
    for i in range(4) :
        for index,c in enumerate(C) :
            try :
                if cams[i] in c.SensorName :
                    camindex[i] = index
            except : pass

    try: C[getcam(0)].Magnification=1.33
    except : print("can't access C[getcam(0)]")
    print()
    print("All ASCOM commands available through devices: ")
    print('    T : telescope commands')
    print('    C : camera commands')
    print('    F : focusser command')
    print('    Filt : filter wheel commands')
    print('    D : dome commands')

[docs] def getcam(camera=None) : """ Get correct list index for specified camera (ASCOM doesn't always deliver them in order!) camera=0 : Port 2 camera camera=1 : Port 2 spectrograph camera (eShel) camera=2 : Port 1 camera (QSI) camera=3 : SONG spectrograph camera """ if camera is None : print('Cameras: ') print(' 0: Port 2 guide camera (Atik w/ICX285 detector)') print(' 1: Port 2 spectrograph camera eShel (Atik w/ICX694 detector)') print(' 2: Port 1 camera (QSI w/KAF_800 detector)') print(' 3: SONG spectrograph camera (QHY 600)') return global camindex if camindex[camera]>=0 : return camindex[camera] #for index,c in enumerate(C) : # if cams[camera] in c.SensorName : # return index print('no such camera!')
[docs] def getfocuser(focuser) : """ Get correct list index for specified focuser (ASCOM doesn't always deliver them in order!) focuser='PWI' : Port 1 focuser (PWI) focuser='Zaber' : Port 2 focuser (Zaber) focuser='Iodine' : Port 2 iodine stage (LTS150) focuser='Calibration' : Port 2 calibration stage (LTS150) focuser='PLL' : Spectrograph focus (PrimaLuceLab Esatto 3.5") """ for index,c in enumerate(F) : if focuser in c.Name : return index print('no such focuser!') return -1
[docs] def getswitch(switch) : """ Get correct list index for specified switch (ASCOM doesn't always deliver them in order!) switch='TC300' : Thorlabs temperature controller switch='K8056' : relay for Shelyak calibration control switch='LCUS' : LCUS relay for calibration shutter switch='Yocto' : Thermocouples on QHY600 switch='Wanderer' : Wanderer PowerBox for spectrograph CCD and focuser USB reset switch='TCube' : TCube chiller """ for index,c in enumerate(SW) : if switch in c.Name : return index print('no such switch!') return -1
[docs] def wait_moving(Foc) : """ Check if input Focuser is stil moving """ while True : time.sleep(1) try : if not Foc.IsMoving : break except : logger.exception('error with F.IsMoving') break time.sleep(1) return
# Camera commands
[docs] def qck(exptime,filt='current') : """ (shorthand) Take exposure without saving to disk, current filter by default """ expose(exptime,filt)
[docs] def gexp(*args, **kwargs) : """ Expose with guide camera, see expose() for keywords Parameters ---------- exptime : float Exposure time in seconds bin : int, default=1 binning factor (both x and y) display : pyvista tv, default=None pyvista display tool to display into if specified name : str, default=None root file to save image to """ if 'bin' not in kwargs : kwargs['bin'] = 1 return expose(*args, cam=0, filt=None, **kwargs)
[docs] def sexp(*args,**kwargs) : """ Expose with spectrograph camera, see expose() for keywords Parameters ---------- exptime : float Exposure time in seconds bin : int, default=2 binning factor (both x and y) display : pyvista tv, default=None pyvista display tool to display into if specified name : str, default=None root file to save image to """ if 'bin' not in kwargs : kwargs['bin'] = 2 return expose(*args, cam=3, filt=None, **kwargs)
[docs] def expose(exptime=1.0,filt='current',bin=3,box=None,light=True,display=None,name=None, min=None, max=None, cam=0, insert=True, targ=None, avg=1, imagetyp='unspecified', header=None, fast=False) : """ Take an exposure with camera Parameters ---------- exptime : float Exposure time in seconds filt : str Name of filter bin : int, default=3 binning factor (both x and y) box : pyvista BOX, default=None if specified, window to box parameters light : bool, default=True open shutter for exposure? display : pyvista tv, default=None pyvista display tool to display into if specified name : str, default=None root file to save image to Returns ------- named tuple: Exposure = namedtuple('Exposure', ['hdu', 'name', 'exptime', 'filter']) """ exposure = Exposure(None,None,None,None) if pwi is not None : stat = pwi.status() if stat.m3.port == 1 : if filt is not None and filt != 'current' and filt != 'iodine' : pos = np.where(np.array(Filt.Names) == filt) if len(pos) == 0 : logger.warning('no such filter') logger.warning('available filters: ', Filt.Names) return exposure Filt.Position=pos[0] elif filt == 'current' : filt = Filt.Names[Filt.Position] else : if filt == 'iodine' : iodine_in() elif filt == 'open' : filt = 'None' iodine_out() else : filt = 'None' else : filt = 'None' try : icam = getcam(cam) C[icam].BinX=bin C[icam].BinY=bin if box is not None : C[icam].StartX = box.xmin C[icam].StartY = box.ymin nx = box.ncol() ny = box.nrow() else : C[icam].StartX = 0 C[icam].StartY = 0 nx = C[icam].CameraXSize//bin ny = C[icam].CameraYSize//bin C[icam].NumX = nx C[icam].NumY = ny t = Time.now() # if requested, average avg exposures for iavg in range(avg) : C[icam].StartExposure(exptime,light) if int(exptime) > 5 : #print countdown for i in range(int(exptime),0,-1) : print('{:<4d}'.format(i),end='\r') time.sleep(0.99) while not C[icam].ImageReady or C[icam].CameraState != 0: time.sleep(0.25) if iavg == 0 : if avg == 1 : data = np.array(C[icam].ImageArray) else : data = np.array(C[icam].ImageArray).astype(float) else : data += np.array(C[icam].ImageArray) if avg == 1 : data = data.T else : data = data.T/avg data=data.astype(np.uint16) except : logger.exception('ERROR : exposure failed') return exposure if display is not None : display.tv(data,min=min,max=max) hdu=fits.PrimaryHDU(data) hdu.header['ORIGIN'] = 'SONG/APO' hdu.header['DATE'] = Time.now().fits hdu.header['OBSERVAT'] = 'APO' hdu.header['TELESCOP'] = ('Node 4', 'APO SONG 1m') if cam == 3 : hdu.header['INSTRUME'] = 'Spectrograph' elif cam == 1 : hdu.header['INSTRUME'] = 'Acquisition/guider' hdu.header['IMAGETYP'] = imagetyp hdu.header['IMFORM'] = 'FITS' hdu.header['DATATYPE'] = 'Counts' if not fast : fitsheader.camera(hdu,C[getcam(cam)],exptime,avg,light) if header is not None : fitsheader.mixed(hdu,header) fitsheader.object(hdu,targ) try : focval = specfoc() except : focval = None fitsheader.spectrograph(hdu,focval) try : focval = foc() except : focval = None fitsheader.telescope(hdu,pwi.status(),focval,D.Azimuth) pos = iodine_position() if abs(float(pos)-config['iodinestage_in_pos']) < 1. : i2pos = 4 else : i2pos = 0 temp1,temp2 = iodine_tget().split()[2:] fitsheader.fpu(hdu,iodine_position(),i2pos,temp1,temp2, calstage_position(),SW[getswitch('K8056')],filt) fitsheader.weather(hdu) #fitsheader.sunmoon(hdu) fitsheader.time(hdu,t) tab=Table() # write file to disk if name given if name is not None : y,m,d,hr,mi,se = t.ymdhms try: dirname = os.path.dirname('{:s}/UT{:d}{:02d}{:02d}/{:s}'.format(dataroot,y-2000,m,d,name)) except : pdb.set_trace() try: os.makedirs(dirname) except : pass files = glob.glob(dirname+'/*.fits') exts = [] for f in files : exts.append(int(f.split('.')[-2])) if len(exts) > 0 : ext = np.array(exts).max() + 1 else : ext=1 outname = '{:s}/{:s}.{:04d}.fits'.format(dirname,os.path.basename(name).replace(' ','_'),ext) songname = 's4_{:04d}-{:02d}-{:02d}T{:02d}-{:02d}-{:02d}.fits'.format( y,m,d,hr,mi,int(se)) hdu.header['APONAME'] = outname hdu.header['SONGNAME'] = songname hdu.writeto(outname) # Create hard link for SONG directory and file name try : project = header['PROJECT'] except : project = 'APO' if cam == 3 and project != 'APO' : dirname = os.path.dirname( '/data/song/star_spec/{:04d}/{:04d}{:02d}{:02d}/night/raw/'.format(y,y,m,d)) try: os.makedirs(dirname) except : pass os.link(outname,'{:s}/{:s}'.format(dirname,songname)) # construct Exposure object to return, and populate filename in tab exposure = Exposure(hdu, outname, exptime, filt) tmp=outname.split('/') tab['file'] = [tmp[-2]+'/'+tmp[-1]] else : outname='' exposure = Exposure(hdu, None, exptime, filt) tab['file'] = [''] if insert and not fast: cards = ['DATE-OBS','MJD-DATE','EXPTIME','FILTER','FOCUS','CCD_TEMP', 'HOR_BIN','VER_BIN','RA','DEC','AZ','ALT','ROT', 'SPECFOC','I_POS','I_TEMP1', 'I_TEMP2', 'CAL_POS','TUNGSTEN','LED','THAR'] cols = ['dateobs','mjd','exptime','filter','focus','ccdtemp', 'xbin','ybin','ra','dec','az','alt','rot', 'specfoc','iodine_position','iodine_temp1', 'iodine_temp2', 'calstage_position','tungsten','led','thar'] for card,col in zip(cards,cols) : try : tab[col] = [hdu.header[card]] except KeyError: print('no {:s} card found'.format(card)) tab['camera'] = [cam] # insert exposure into database try : d=database.DBSession() d.ingest('obs.exposure',tab,onconflict='update') d.close() except : print('error loading exposure into database') return exposure
[docs] def settemp(temp,cam=0) : """ Change detector temperature set point and turn cooler on """ icam=getcam(cam) C[icam].SetCCDTemperature = temp C[icam].CoolerOn = True
[docs] def chiller(temp=None) : """ Get/set chiller temperature """ if temp is not None : SW[getswitch('TCube')].Action('tset',0,temp) return float(SW[getswitch('TCube')].Action('get_tact',0))
[docs] def chiller_fault(temp=None) : """ Get chiller fault status """ return SW[getswitch('TCube')].Action('get_fault',0)
[docs] def cooler(state=True,cam=0) : """ Set detector cooler state on/off """ icam=getcam(cam) C[icam].CoolerOn = state
[docs] def filtname() : """ Return current filter name """ return Filt.Names[Filt.Position]
[docs] def focrun(cent,step,n,exptime=1.0,filt='V',bin=3,box=None,display=None, max=30000, thresh=25,cam=0,plot=False) : """ Obtain a focus run Parameters ---------- cent : int Middle focus value step : int Focus step size n : int Number of steps to take, centered on middle value exptime : float Exposure time filt : str Filter to use bin : int, default=3 Binning factor in x and y box : pyvista BOX, default=None if specified, window to box parameters disp : pyvista tv, default=None If given, display each image as it is being taken into specified device Returns ------- List of file names taken for focus run """ foc0=foc() files=[] names=[] images=[] focvals= np.arange(int(cent)-n//2*int(step),int(cent)+n//2*int(step)+1,int(step)) for i,focval in enumerate(focvals) : foc(int(focval)) if i == 0 : time.sleep(3) logger.info('position: {:d}'.format(foc())) exp = expose(exptime,filt,box=box,bin=bin,display=display, max=max,name='focus_{:d}'.format(focval),cam=cam) hdu=exp.hdu tmp=exp.name.split('/') name=tmp[-2]+'/'+tmp[-1] names.append(name) files.append(exp.name) images.append(hdu) try : bestfitfoc, bestfithf, bestfoc, besthf = focus.focus(files,pixscale=pixscale(cam,bin=bin), display=display,max=max,thresh=thresh,plot=plot) if bestfitfoc > 0 : logger.info('setting focus to best fit focus : {:.1f} with hf diameter {:.2f}'.format( bestfitfoc,bestfithf)) f=foc(int(bestfitfoc)) best=bestfithf else : logger.info('setting focus to minimum image focus : {:.1f} with hf diameter {:.2f}'.format( bestfoc,besthf)) f=foc(int(bestfoc)) best=besthf except : bestfitfoc, bestfithf, bestfoc, besthf, best = -1, -1., -1, -1., -1. logger.exception('focus failed') f=foc(foc0) tab=Table() tab['mjd'] = [Time.now().mjd] tab['exptime'] = [exptime] tab['filter'] = [filt] tab['bin'] = [bin] tab['camera'] = [cam] tab['focvals'] = [focvals] tab['bestfoc'] = [bestfoc] tab['besthf'] = [besthf] tab['bestfitfoc'] = [bestfitfoc] tab['bestfithf'] = [bestfithf] tab['files'] = [names] d=database.DBSession() d.ingest('obs.focus',tab,onconflict='update') d.close() if display is not None : display.tvclear() return f, focvals, best
[docs] def pixscale(cam=0,bin=1) : """ Return pixscale for desired camera """ icam=getcam(cam) scale=206265/6000*C[icam].PixelSizeX*1.e-3*bin try: scale /= C[icam].Magnification except: pass return scale
[docs] def telescope_status() : """ Return telescope status """ stat = pwi.status() stat.RightAscension = T.RightAscension stat.Declination = T.Declination stat.Azimuth = T.Azimuth stat.Altitude = T.Altitude return stat
[docs] def slew(ra, dec,dome=True) : """ Slew to RA/DEC and wait for telescope/dome Parameters ---------- ra : float or str RA in hours (float), or hh:mm:ss (str) dec : float or str DEC in degrees (float), or dd:mm:ss (str) """ domesync(False) if (isinstance(ra,float) or isinstance(ra,int) ) and \ (isinstance(dec,float) or isinstance(dec,int) ) : pwi.mount_goto_ra_dec_j2000(ra,dec) elif isinstance(ra,str) and isinstance(dec,str) : coords=SkyCoord("{:s} {:s}".format(ra,dec),unit=(u.hourangle,u.deg)) pwi.mount_goto_ra_dec_j2000(coords.ra.value/15.,coords.dec.value) #tra, tdec = j2000totopocentric(ra,dec) #T.SlewToCoordinatesAsync(tra,tdec) time.sleep(5) while T.Slewing : time.sleep(1) T.Tracking = True # Wait for dome slewing to stop domesync(dome) while True : if not T.Slewing : time.sleep(10) if not D.Slewing : break
[docs] def altaz(az,alt) : """ Slew to specified az / alt Parameters ---------- az : float az in degrees alt : float alt in degrees """ pwi.mount_goto_alt_az(alt, az)
[docs] def usno(ra=None,dec=None,rad=1*u.degree,mag='Rmag',magmin=0,magmax=20,goto=True, cat= 'I/252') : """ Find/goto nearest USNO stars from specified position (default current telescope) and mag range Parameters ---------- ra : str or float, default=None RA to search around, default current telescope position dec : str or float, default=None DEC to search around, default current telescope position rad : angular quantity, default = 1*u.degree Radius to use for sear4ch bmin, bmax : float, default=(0,15) Minimum, maximum Bmag rmin, rmax : float, default=(0,15) Minimum, maximum Rmag goto : bool, default=True If True, slew to closest returned object cat : str, default= 'The USNO-A2.0 Catalogue 1' astroquery conesearch catalog to search from hr=V/50, sao=I/131A/sao (but doesn't have J2000) """ if ra is None : stat = pwi.status() ra = stat.mount.ra_j2000_hours dec = stat.mount.dec_j2000_degs coords=SkyCoord("{:f} {:f}".format(ra,dec),unit=(u.hourangle,u.deg)) elif (isinstance(ra,float) or isinstance(ra,int) ) and \ (isinstance(dec,float) or isinstance(dec,int) ) : coords=SkyCoord("{:f} {:f}".format(ra,dec),unit=(u.hourangle,u.deg)) else : coords=SkyCoord("{:s} {:s}".format(ra,dec),unit=(u.hourangle,u.deg)) try : viz=Vizier(columns=['*','+_r'],catalog=cat) if mag is not None : result=viz.query_region(coords,radius=rad, column_filters={mag:'<{:f}'.format(magmax)})[0] gd=np.where(result[mag]>magmin)[0] result=result[gd] else : result=viz.query_region(coords,radius=rad)[0] except : logger.warning('usno: no stars found within specified rad and mag range') return best = result['_r'].argmin() logger.info(result[best]) if goto: try: slew(result['RAJ2000'][best]/15., result['DEJ2000'][best]) except : slew(result['RA_ICRS'][best]/15., result['DE_ICRS'][best])
[docs] def center(display,x0=None,y0=None,exptime=5,bin=1,filt=None,settle=3,cam=0) : """ Center star """ exp=expose(exptime,filt=filt,bin=bin) display.tv(exp.hdu) print('mark star on display: ') k,x,y=display.tvmark() icam=getcam(cam) if x0 is None : x0=C[icam].NumX//2 if y0 is None : y0=C[icam].NumY//2 offsetxy((x-x0),(y-y0),scale=pixscale()) time.sleep(settle)
[docs] def guide(cmd,verbose=True, **kwargs) : """ Send command to guider process """ s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(('127.0.0.1',5000)) if cmd == 'start' : for key,value in zip(kwargs.keys(),kwargs.values()) : cmd+=(' {:s}={:d}'.format(key,value)) s.send(cmd.encode()) if cmd == 'status' : try : out = s.recv(16, socket.MSG_DONTWAIT | socket.MSG_PEEK) print('out: ', out) except BlockingIOError : print('blockingIOError') pass except : print('recv failed') raise Exception('error with guider') out=s.recv(1024) if verbose : print('received {:s}'.format(out.decode())) s.close() if 'fail' in out.decode() : return False else : return True
[docs] def offsetxy(dx,dy,sign=-1,scale=0.16,pa=None) : """ Offset in detector coordinates """ pa = sign*rotator()*np.pi/180. dra = -dx*np.cos(pa) - dy*np.sin(pa) ddec = -dx*np.sin(pa) + dy*np.cos(pa) #offset(dra*scale,ddec*scale) offset(dra*scale,ddec*scale)
[docs] def offset(dra, ddec) : """ One or more of the following offsets can be specified as a keyword argument: AXIS_reset: Clear all position and rate offsets for this axis. Set this to any value to issue the command. AXIS_stop_rate: Set any active offset rate to zero. Set this to any value to issue the command. AXIS_add_arcsec: Increase the current position offset by the specified amount AXIS_set_rate_arcsec_per_sec: Continually increase the offset at the specified rate As of PWI 4.0.11 Beta 7, the following options are also supported: AXIS_stop: Stop both the offset rate and any gradually-applied commands AXIS_stop_gradual_offset: Stop only the gradually-applied offset, and maintain the current rate AXIS_set_total_arcsec: Set the total accumulated offset at the time the command is received to the specified value. Any in-progress rates or gradual offsets will continue to be applied on top of this. AXIS_add_gradual_offset_arcsec: Gradually add the specified value to the total accumulated offset. Must be paired with AXIS_gradual_offset_rate or AXIS_gradual_offset_seconds to determine the timeframe over which the gradual offset is applied. AXIS_gradual_offset_rate: Paired with AXIS_add_gradual_offset_arcsec; Specifies the rate at which a gradual offset should be applied. For example, if an offset of 10 arcseconds is to be applied at a rate of 2 arcsec/sec, then it will take 5 seconds for the offset to be applied. AXIS_gradual_offset_seconds: Paired with AXIS_add_gradual_offset_arcsec; Specifies the time it should take to apply the gradual offset. For example, if an offset of 10 arcseconds is to be applied over a period of 2 seconds, then the offset will be increasing at a rate of 5 arcsec/sec. Where AXIS can be one of: ra: Offset the target Right Ascension coordinate dec: Offset the target Declination coordinate axis0: Offset the mount's primary axis position (roughly Azimuth on an Alt-Az mount, or RA on In equatorial mount) axis1: Offset the mount's secondary axis position (roughly Altitude on an Alt-Az mount, or Dec on an equatorial mount) path: Offset along the direction of travel for a moving target transverse: Offset perpendicular to the direction of travel for a moving target """ pwi.mount_offset(ra_add_arcsec=dra/np.cos(T.Declination*np.pi/180.)) pwi.mount_offset(dec_add_arcsec=ddec)
[docs] def j2000totopocentric(ra,dec) : """ Routine to convert J2000 coordinates to topocentric RA/DEC """ #from coordio import sky, site, time coords=sky.ICRS(np.array([[15*ra,dec]])) s=site.Site('APO') s.set_time(time.Time()) # defaults to now obs=sky.Observed(coords,site=s) print('topo: ',obs.ra/15.,obs.dec) return obs.ra/15., obs.dec
# tpm routines #apo=EarthLocation.of_site('APO') #d=datetime.now() #utc = tpm.gcal2j(d.year,d.month,d.day) #tt = tpm.utc3tdb(utc) #v6 = convert.cat2v6(ra*np.pi/180,dec*np.pi/180) #v6_app = convert.convertv6(s1=6,s2=16,lon=apo.lon.value,lat=apo.lat.value,alt=apo.height.value)
[docs] def rotator(offset=90.) : """ Get current rotator position angle, from parallactic angle and altitude for port 2 and telescope status for port 1 Parameters ---------- offset : float, default=100 Angle constant (degrees) that needs to be set depending on camera/fixed rotator position for port 2 """ stat = pwi.status() if stat.m3.port == 2 : t=Time(Time.now(),location=EarthLocation.of_site('APO')) lst=t.sidereal_time('mean').value ha = lst - T.RightAscension pa = skycalc.parang(ha,T.Declination,'APO')-T.Altitude+offset else : inst = stat.rotator.mech_position_degs pa = stat.rotator.field_angle_degs return pa
[docs] def tracking(tracking) : """ Set telescope tracking on (True) or off (False) Parameters ---------- tracking : bool if True, turn tracking on, if False, turn tracking off """ T.Tracking= tracking
[docs] def park() : """ Park telescope and dome """ domesync(False) D.Park() try: T.Park() except: logger.error('telescope.Park raised an exception')
[docs] def foc_home(port=None) : """ Send focus to home """ if port is None : stat = pwi.status() port = stat.m3.port if port == 1 : index = getfocuser('PWI') else : index = getfocuser('Zaber') F[index].Action('home') wait_moving(F[index])
[docs] def foc(val=None, relative=False, port=None) : """ Change focus, depending on port """ if port is None : stat = pwi.status() port = stat.m3.port if port == 1 : index = getfocuser('PWI') else : index = getfocuser('Zaber') if val is not None : if relative : if val == 0 : return F[index].Position val += F[index].Position F[index].Move(val) wait_moving(F[index]) return F[index].Position
[docs] def specfoc(val=None) : """ Change spectrograph focus """ index=getfocuser('PLL') if val is not None : F[index].Move(val) wait_moving(F[index]) return F[index].Position
[docs] def iodine_tset(val=None,tmax=66) : """ Get/set iodine cell set temperature and (re)enable heaters """ iswitch = getswitch('TC300') if val is not None : if val > tmax : print('values > {:d} must be explicitly allowed with tmax= keyword'.format(tmax)) return SW[iswitch].SetSwitchValue(0,val) SW[iswitch].SetSwitchValue(1,0) # no second channel on new iodine cell iodine_set('enable',1) tset1 = SW[iswitch].Action('get_tset',0) tset2 = SW[iswitch].Action('get_tset',1) return f'set temperature: {tset1} {tset2}'
[docs] def iodine_tget() : """ Get/set iodine cell actual temperature """ iswitch = getswitch('TC300') try : tact1 = SW[iswitch].GetSwitchValue(0) tact2 = SW[iswitch].GetSwitchValue(1) return f'actual temperature: {tact1} {tact2}' except : return f'actual temperature: -1 -1'
[docs] def iodine_set(quantity,val) : """ Miscellaneous TC300 commands: enable, dark, light """ iswitch = getswitch('TC300') if quantity in ['enable','dark','light'] : q1 = SW[iswitch].Action('set_{:s}'.format(quantity),0,val) q2 = SW[iswitch].Action('set_{:s}'.format(quantity),1,val) return f'{q1} {q2}' else : print('unknown quantity')
[docs] def iodine_get(quantity) : """ Get iodine cell quantity """ iswitch = getswitch('TC300') if quantity in ['tset','voltage','current','enable','dark','light'] : q1 = SW[iswitch].Action('get_{:s}'.format(quantity),0) q2 = SW[iswitch].Action('get_{:s}'.format(quantity),1) return f'{q1} {q2}' else : print('unknown quantity')
[docs] def iodine_in(val=None,focoffset=None) : """ Move iodine cell into beam """ # don't move if already there, to avoid extra focus change` if val is None : val = config['iodinestage_in_pos'] if focoffset is None : focoffset = config['df_iodine'] ipos = iodine_position() if ipos < 0 : print("error with iodine stage") return if abs(ipos-val) > 0.1 : iodine_position(val) foc(focoffset,relative=True,port=2) time.sleep(5) else : print('iodine stage already at desired postion, no motion or focus offset done')
[docs] def iodine_out(val=None,focoffset=None) : """ Move iodine cell out of beam """ # don't move if already there, to avoid extra focus change` if val is None : val = config['iodinestage_out_pos'] if focoffset is None : focoffset = config['df_iodine'] ipos = iodine_position() if ipos < 0 : print("error with iodine stage") return if abs(ipos-val) > 0.1 : iodine_position(val) foc(-1*focoffset,relative=True,port=2) time.sleep(5) else : print('iodine stage already at desired postion, no motion or focus offset done')
[docs] def iodine_home() : """ Send iodine stage to home """ index=getfocuser('Iodine') F[index].Action('home')
[docs] def iodine_position(val=None) : """ Get/set iodine stage position """ index=getfocuser('Iodine') try : if val is not None : F[index].Move(int(val*1000.)) wait_moving(F[index]) return F[index].Position/1000. except : return -1
[docs] def calstage_home() : """ Send calibration stage to home """ index=getfocuser('Calibration') F[index].Action('home')
[docs] def calstage_position(val=None) : """ Get/set calibration stage position """ index=getfocuser('Calibration') if val is not None : F[index].Move(int(val*1000.)) wait_moving(F[index]) return F[index].Position/1000.
[docs] def calstage_in(val=None,calfoc=None) : """ Move calibration stage into beam """ # don't move if already there, to avoid extra focus change if val is None : val = config['calstage_in_pos'] if calfoc is None : calfoc = config['calstage_focus'] foc(calfoc) calstage_position(val)
[docs] def calstage_out(val=None) : """ Move calibration stage out of beam """ # don't move if already there, to avoid extra focus change if val is None : val = config['calstage_out_pos'] calstage_position(val)
[docs] def calstage_find(display=None) : """ Find calibration spot location """ calstage_in() cal.lamps(quartz=True,led=True) im=gexp(0.002,display=display,max=60000).hdu.data y0,x0=np.unravel_index(np.argmax(im),im.shape) mask=np.zeros_like(im) yg,xg=np.mgrid[0:mask.shape[0],0:mask.shape[1]] r2=(xg-config['hole_pos'][1])**2+(yg-config['hole_pos'][0])**2 bd=np.where(r2<7**2) mask[bd]=1 cent=centroid.rasym_centroid(im,x0,y0,rad=35,mask=mask) logger.info('Calibration spot center: {:.2f} {:.2f}'.format(cent.x,cent.y)) if display is not None : display.tvclear() display.tvcirc(cent.x,cent.y,50) config['calstage_in_pos'] -= (cent.y-config['hole_pos'][0])/10*.05 gexp(0.002,display=display,max=60000).hdu.data cal.lamps() calstage_in() return config['calstage_in_pos']
[docs] def fans_on(roles=None): """ Turn on PWI fans Parameters ---------- roles: if None, turn on all fans Otherwise, can be a CSV string of one or more fan roles to turn on: m1rear: Primary mirror fans (rear fans only) m1side: Primary mirror fans (side fans only) m2: Secondary mirror fans m3: M3 mirror fans m1heaters: Primary mirror heat distribution fans """ logger.info("Turning fans on ...") pwi.fans_on(roles)
[docs] def fans_off(roles=None): """ Turn off PWI fans """ logger.info("Turning fans off ...") pwi.fans_off(roles)
[docs] def port(port) : """ Change PWI M3 port """ pwi.m3_goto(port)
[docs] def mirror_covers(open=False) : """ Open/close mirror covers """ current = Covers.CoverState.value if open and current == 3 : return if not open and current == 1 : return altaz(T.Azimuth,85.) logger.info('waiting for telescope to slew to high altitude...') while T.Slewing : time.sleep(1) if open and current != 3 : Covers.OpenCover() elif not open and current != 1 : Covers.CloseCover() logger.info('waiting 20 seconds for mirror covers to close...') time.sleep(20)
[docs] def louvers(open=False) : """ Open louvers """ if open : logger.info('opening louvers...') subprocess.run("ms on 9",shell=True) else : logger.info('closing louvers...') subprocess.run("ms off 9",shell=True) subprocess.run("ms list 9",shell=True)
def coverstate() : print(Covers.CoverState.value)
[docs] def issafe() : """ Query SafetyMonitor for safe to open """ return S.IsSafe
[docs] def override(t,verbose=True) : """ Set override to allow open """ try : if verbose : resp = input("Are you sure you to override 3.5m/2.5m dome opening requirement? CTRL-C to abort: ") else : resp = 'y' if resp != 'n' and resp != 'N' : S.Action('override',int(t)) print('override in place for {:d} seconds'.format(int(t))) except : print('override aborted',resp)
[docs] def domestatus() : """ Return dome azimuth and shutter status """ out=subprocess.run("ms list 3",shell=True,capture_output=True,text=True) if 'On' in out.stdout : light = True else : light = False stat = DomeStatus(D.ShutterStatus.value,D.Azimuth,D.Slewing,light) return stat
[docs] def domehome() : """ Home dome """ D.FindHome()
[docs] def domeopen(dome=True,covers=True,fans=True,louvers=False) : """ Open dome, mirror covers, louvers and start fans, as requested """ if dome and D.ShutterStatus.name != 'shutterOpen' : logger.info('opening shutter...') D.OpenShutter() # Wait for shutter open before opening mirror covers logger.info('waiting for shutter to open...') while D.ShutterStatus.name != 'shutterOpen' : time.sleep(1) if covers and Covers.CoverState.name != 'Open': mirror_covers(True) logger.info('waiting for mirror covers to open...') while Covers.CoverState.value != 3 : time.sleep(1) if fans : logger.info('turning fans on...') fans_on() if louvers : logger.info('opening louvers...') louvers(True)
[docs] def domeclose(dome=True,covers=True,fans=True, closelouvers=True) : """ Close mirror covers and dome """ if closelouvers : logger.info('closing louvers...') louvers(False) if fans : logger.info('turning off fans...') fans_off() if covers : mirror_covers(False) if dome : # don't wait for mirror covers to report closed, in case they don't! park() logger.info('closing shutter...') D.CloseShutter() # put this at end in case guider has died and this won't return logger.info('stopping guider...') try : guide('stop') except : pass
[docs] def domesync(dosync=True,manual=False) : """ Start/stop domesync thread """ global sync_process, run_sync def sync(update=5) : while run_sync : while T.Slewing or D.Slewing : time.sleep(1) # Get telescope az 3x and median to avoid glitches az=[] for i in range(3) : az.append(T.Azimuth) time.sleep(0.2) D.SlewToAzimuth(np.median(az)) time.sleep(update) if manual : print('Rotate dome manually....then continue') pdb.set_trace() return if dosync and sync_process is None : logger.info('starting dome sync') sync_process=threading.Thread(target=sync) run_sync = True sync_process.start() elif dosync == False and sync_process is not None : logger.info('stopping dome sync') run_sync = False sync_process.join() sync_process = None
[docs] def start_status(camera=True) : """ Start status window thread """ global proc # allow for no camera state to avoid command collisions if camera : Cam = C else : Cam = None proc = mp.Process(target=status.status, kwargs={'pwi' : pwi, 'T' : T, 'D' : D, 'F' : F, 'Filt' : Filt, 'C' : Cam, 'Covers': Covers}) proc.daemon = True proc.start()
[docs] def stop_status() : """ Stop status window thread """ proc.terminate()
def commands() : print() print("Observatory commands") print(" domeopen(): open dome, mirror covers") print(" domeclose(): close mirror covers, dome, and park") print(" issafe(): check safety monitor for status") print() print("Dome commands") print(" domehome(): move dome to home position") print(" domesync(True|False): sync dome to telescope position") print(" louvers(True|False): open/close louvers") print() print("Telescope commands") print(" slew(ra,dec): slew to coordinates") print(" offset(ra,dec): offset telescope in sky coordinates (arcsec) ") print(" offsetxy(x,y): offset telescope in detector coordinates (pixels)") print(" usno([ra,dec]) : find/slew to USNO A2.0 star") print(" altaz(az,alt): slew to az/alt coordinates") print(" foc(focus) : set focus to specified value") print(" foc_home() : home focus (port=2 only)") print(" tracking(True|False): turn tracking on/off") print(" mirror_covers(True|False): control mirror covers") print(" fans(True|False): control telescope fans") print(" port(port): move tertiary to requested port") print(" park(): park telescope and dome") print() print("Camera commands") print(" expose(exptime,filt,**kwargs: take an exposure with specified camera") print(" gexp(exptime,filt,**kwargs: take an exposure with guide camera (frontend to expose)") print(" sexp(exptime,filt,**kwargs: take an exposure with spectrograph camera (frontend to expose)") print(" focrun(cent,step,nsteps,exptime,filt,**kwargs): take series of exposures at different focus positions") print(" settemp(temp): set camera temperature set point") print(" cooler(state): set camera cooler state on (True) or off (False)") print(" chiller([temp]): get/set chiller temperature") print(" chiller_fault(): get chiller fault status") print() print("Iodine commands") print(" iodine_in() : move iodine cell into beam, and adjust focus") print(" iodine_out() : move iodine cell out of beam, and adjust focus") print(" iodine_position([val]) : get or set (with val) iodine stage position") print(" iodine_home() : home iodine stage") print(" iodine_tset(val) : set iodine temperature (both channels)") print(" iodine_tget(): get actual iodine temperatures") print(" iodine_get(quantity): get various TC300 values (tset,voltage,current,enable,dark,light)") print() print("Calibration commands") print(" calstage_in() : move calibration stage into beam, and adjust focus (if needed)") print(" calstage_out() : move calibration stage out of beam, and adjust focus (if needed)") print(" calstage_position([val]) : get or set (with val) iodine stage position") print(" calstage_home() : home iodine stage") print(" cal.getlamps() : get eShel lamp status") print(" cal.lamps() : control eShel lamps") print(" cal.cals() : turn lamps on, take sequences of flats and ThAr, turn lamps off") print(" cal.shutter() : open/close shutter in direct calibration feed") print() print("Use help(command) for more details")
[docs] def devices() : """ List connected ASCOM devices """ global D, S, T, F, Filt, C, Covers, SW print('Dome: ') print(D.Name, D.Description) print('Telescope (T): ') print(' {:s}, {:s}'.format(T.Name, T.Description)) print('Mirror Covers (Covers) : ') try : print(' {:s}, {:s}'.format(Covers.Name, Covers.Description)) except : print(' NOT FOUND') print('FilterWheel (Filt) : ') try: print(' {:s}, {:s}'.format(Filt.Name, Filt.Description)) except : print(' NOT FOUND') print('SafetyMonitor (S) : ') print(' {:s} {:s}'.format(S.Name, S.Description)) print('Focusers (F[]): ') for i,dev in enumerate(F) : print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description)) print('Cameras (C[]): ') for i,dev in enumerate(C) : print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description)) print('Switches (SW[]): ') for i,dev in enumerate(SW) : print(' {:d} {:s}, {:s}'.format(i,dev.Name, dev.Description))
[docs] def init() : """ Start ascom and pwi connections and pyvista display """ global config, dataroot, pwi_srv try : with open('aposong.yml','r') as config_file : config = yaml.safe_load(config_file) except: print('no configuration file found') config={} config['devices']={} config['devices']['ascom_search'] = False config['devices']['ascom_srvs'] = [] config['devices']['pwi_srv'] = None config['dataroot'] = './' if config['devices']['ascom_search'] : svrs=discovery.search_ipv4(timeout=30,numquery=3) else : svrs=config['devices']['ascom_srvs'] dataroot=config['dataroot'] try : updatecamera=config['updatecamera'] except : updatecamera=True ascom_init(svrs) print('pwi_init...') pwi_srv = config['devices']['pwi_srv'] pwi_init(pwi_srv) commands()
def pwi_init(pwi_srv) : global pwi print('Starting PWI4 client...') if pwi_srv is not None : pwi=pwi4_client.PWI4(host=pwi_srv) for axis in [0,1] : pwi.mount_enable(axis) else : pwi = None
[docs] def disp_init() : """ Start display with good geometry """ try : disp=tv.TV(figsize=(9.5,6)) disp.fig.canvas.manager.window.wm_geometry('-0+0') except : print("Can't open display") return disp
[docs] def alert(message,loggers=None,recipients=None) : """ Print alert, and log and email as requeste3d """ print('{:s} aposong alert: {:s}'.format(Time.now().fits,message)) if loggers is not None : if isinstance(loggers,str) : loggers=list(logger) for logger in loggers : logger.info(message) if recipients is not None : mail.send(recipients,subject='aposong alert: {:s}'.format(Time.now().fits),message=message)
[docs] def isguideok(ok,loggers=None,recipients=None) : """ Check to see if guider is responding, if not send alert/log as requested (if a state change) Parameters ---------- ok : bool current state of guider going into call loggers : logger or list of loggers, default=None loggers to send message not recipients : list of addresses, default=None addresses to send alerts to, if specified """ try : guide('status',verbose=False) if not ok : # if guider has changed state, log and alert alert('guider OK, resuming operations',loggers=loggers,recipients=recipients) return True except : if ok : # if guider has changed state, log and alert msg='guider failed, suspending operations. Check guider process in guider tab on song1m, quit/CTRL-\\ and restart guider' alert(msg,loggers=loggers,recipients=recipients) return False
def isdomeok(ok,loggers=None,recipients=None) : try : test=D.Azimuth if not ok : # if dome had changed state, log and alert alert('dome OK, resuming operations',loggers=loggers,recipients=recipients) return True except : if ok : # if dome had changed state, log and alert msg='dome failed, suspending operations. Check dome1m desktop, kill dome_app.py process (CTRL-\\) and restart python dome_app.py' alert(msg,loggers=loggers,recipients=recipients) return False def istelescopeok(ok,loggers=None,recipients=None) : try : stat=telescope_status() if stat.RightAscension != 0 or stat.Declination != 0 : if not ok : # if dome had changed state, log and alert alert('telescope OK, resuming operations',loggers=loggers,recipients=recipients) return True else : if ok : # if telescope has changed state, log and alert msg='telescope failed 1, suspending operations. Check pwi1m desktop: is PWI4 running and connected? Is ASCOM remote running?' alert(msg,loggers=loggers,recipients=recipients) return False except : if ok : # if telescope has changed state, log and alert msg='telescope failed 2, suspending operations. Check pwi1m desktop: is PWI4 running and connected? Is ASCOM remote running?' alert(msg,loggers=loggers,recipients=recipients) return False def isccdok(ok,loggers=None,recipients=None) : try : temp=C[2].CCDTemperature if temp != 0 : if not ok : # if CCD has changed state, log and alert alert('CCD temperature OK',loggers=loggers,recipients=recipients) return True else : if ok : # if CCD has changed state, log and alert msg='CCD temperature=0. Check spec1m desktop: Is ASCOM remote running? Should it be restarted?' alert(msg,loggers=loggers,recipients=recipients) return False except : if ok : # if CCD has changed state, log and alert alert('CCD temperature=0',loggers=loggers,recipients=recipients) return False def logtest() : logger.info('logging test') if __name__ == '__main__' : try : with open('logging.yml', 'rt') as f: logconfig = yaml.safe_load(f.read()) logging.config.dictConfig(logconfig) for handler in logging.root.manager.loggerDict['aposong'].handlers : if handler.name == 'daily' : handler.atTime = datetime.time(hour=12) logger=logging.getLogger('aposong') except FileNotFoundError : #trap for readthedocs print('logging.yml not found') except : print('error with logging') from aposong import * import robotic init() from aposong import * disp=disp_init()