import base64 from syslog import syslog, LOG_DEBUG, LOG_ERR, LOG_INFO import sys import traceback import fsio import os import logConfig import FilePaths #Define constants _enable_syslog = False #Define this env variable when you want to test locally. _DATA_ROOT_PATH = os.environ.get('LOCAL_VAR_ROOT_PATH') if _DATA_ROOT_PATH == None or _DATA_ROOT_PATH == '': #This is the path on target system where status data are stored _DATA_ROOT_PATH = "/var/run/" else: print 'warning using local var path:', _DATA_ROOT_PATH _DATA_ROOT_PATH_DIAG = _DATA_ROOT_PATH + "diag/" CYCLE_FILENAME = _DATA_ROOT_PATH_DIAG + "cycle" ENABLE_FILENAME = _DATA_ROOT_PATH_DIAG + "enable" CODE_FILENAME = _DATA_ROOT_PATH_DIAG + "code" OPERATION_MODE_FILENAME = _DATA_ROOT_PATH_DIAG + "operationmode" RES_CON_STATES_FILENAME = _DATA_ROOT_PATH_DIAG + "ResConStates" TEMPERATURE = _DATA_ROOT_PATH_DIAG + "temperature" TEMPERATURE_MOTOR = _DATA_ROOT_PATH_DIAG + "temperature_motor" TEMPERATURE_PLATINE = _DATA_ROOT_PATH_DIAG + "temperature_platine" TEMPERATURE_HEAT_SINK_LT = _DATA_ROOT_PATH_DIAG + "lt/Kuehlblechtemperatur" TEMPERATURE_MOTOR_LT = _DATA_ROOT_PATH_DIAG + "lt/Motortemperatur" LOAD_CYCLE_COUNTER = _DATA_ROOT_PATH_DIAG + "loadcycle" AVG_LOAD_TORQUE = _DATA_ROOT_PATH_DIAG + "avgload" AVG_START_LOAD_TORQUE = _DATA_ROOT_PATH_DIAG + "avgstart" PARSYSTEM = _DATA_ROOT_PATH_DIAG + 'parsystem' SERVICE = _DATA_ROOT_PATH_DIAG + "service" #Battery data _BAT_DATA_PATH = _DATA_ROOT_PATH_DIAG + "battery/" #Ti wlan data _TI_WLAN_DATA_PATH = _DATA_ROOT_PATH_DIAG + "tiwlan/" BATPOWER_FILENAME = _BAT_DATA_PATH + "power" BATSTATUS_FILENAME = _BAT_DATA_PATH + "status" MAX_SRB_STEPS_FILENAME = _DATA_ROOT_PATH_DIAG + "maxsrbsteps" WLAN_ACTIVE_FILENAME = _TI_WLAN_DATA_PATH + "interface" WLAN_RSSI_LEVEL_FILENAME = _TI_WLAN_DATA_PATH + "rssi" WLAN_CONNECTION_STATUS_FILENAME = _TI_WLAN_DATA_PATH + "constatus" FCC_TEST_MODE_ACTIVE_FILENAME = _TI_WLAN_DATA_PATH + "fccmode" CFG_LIST_PRG_0 = "srbprg/prg0.lua" CFG_LIST_JOB_000 = "jobs/job000.lua" CFG_LIST_QCODE = "QCode.json" # Return an integer value def getOperationMode(): try: res = int(fsio.readNb(OPERATION_MODE_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getOperationMode " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getOperationMode ' + str(sys.exc_info()[0])) def getResConnState(): try: conn_vals = [] for root, dirs, files in os.walk(RES_CON_STATES_FILENAME): for conn_state in files: conn_vals.append(int(fsio.readNb(os.path.join(root, conn_state)))) disconn = 0 conn = 0 for val in conn_vals: if val is 0: disconn += 1 elif val is 1: conn += 1 state = 0 if disconn > 0: state = 1 elif conn > 0: state = 2 if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getResConnState " + str(res)) return state except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getResConnState ' + str(sys.exc_info()[0])) def getCycle(): try: res = int(fsio.readNb(CYCLE_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getCycle " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getCycle ' + str(sys.exc_info()[0])) def getMaxSrbSteps(): try: res = int(fsio.readNb(MAX_SRB_STEPS_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getMaxSrbSteps " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getMaxSrbSteps' + str(sys.exc_info()[0])) # Return an integer value def toolEnableGet(): try: res = int(fsio.readNb(ENABLE_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.toolEnableGet " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in toolEnableGet ' + str(sys.exc_info()[0])) # Return an integer value def getBatPower(): try: res = int(fsio.readNb(BATPOWER_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getBatPower " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getBatPower ' + str(sys.exc_info()[0])) # Return an integer value def getBatStatus(): try: res = (int(fsio.readNb(BATSTATUS_FILENAME))) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getBatStatus " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getBatStatus ' + str(sys.exc_info()[0])) def getWlanRssiLevel(): try: res = int(fsio.readNb(WLAN_RSSI_LEVEL_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getWlanRssiLevel " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getWlanRssiLevel ' + str(sys.exc_info()[0])) def getWlanConnectionStatus(): try: res = int(fsio.readNb(WLAN_CONNECTION_STATUS_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getWlanConnectionStatus " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getWlanConnectionStatus ' + str(sys.exc_info()[0])) def getWlanActive(): try: res = boolValue(fsio.readNb(WLAN_ACTIVE_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getWlanActive " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getWlanActive' + str(sys.exc_info()[0])) def getPruefSoftwareActive(): try: res = int(fsio.readNb(SERVICE)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getPrueSoftwareActive' + str(sys.exc_info()[0])) def boolValue(arg): if arg is None: return False if type(arg) == str: arg = arg.strip().lower() if arg == "false": return False elif arg == "true": return True return False def getFccTestModeActive(): try: res = boolValue(fsio.readNb(FCC_TEST_MODE_ACTIVE_FILENAME)) if logConfig.enable_syslog: syslog(LOG_DEBUG, "FileIface.getFccTestModeActive " + str(res)) return res except: if logConfig.enable_syslog: syslog(LOG_DEBUG, 'Exception in getFccTestModeActive ' + traceback.format_exc()) #str(sys.exc_info()[0])) def get_auth_expiration(): timeout = 900 try: if os.path.exists(FilePaths.AUTH_EXPIRATION_FILE): timeout = int(fsio.read(FilePaths.AUTH_EXPIRATION_FILE)) except: syslog(LOG_ERR, "setting default session timeout ({0} s). File error '{1}'".format( timeout, FilePaths.AUTH_EXPIRATION_FILE) ) syslog(LOG_INFO, "session timeout is " + str(timeout) + " s") return timeout if __name__ == '__main__': #---------------- Test ------------------ def _writeValue(filename, data, nosync=True): if nosync: with open(filename, 'w') as f: f.write(data) f.flush() else: fsio.write(filename, data) # thread test shutdown = False from threading import Thread import time def writer(id): data = 0 while (not shutdown): time.sleep(id) data +=1 _writeValue(CYCLE_FILENAME, str(data)) _writeValue(ENABLE_FILENAME, str(data)) _writeValue(BATPOWER_FILENAME, str(data)) _writeValue(BATSTATUS_FILENAME, str(data)) if data >= 10: data = 0 thrd1 = Thread(target = writer, args = (1,)) def reader(id): data = 0 while (not shutdown): time.sleep(id) print 'cycle:', getCycle() print 'enable:', toolEnableGet() print 'batPower:', getBatPower() print 'batStatus:', getBatStatus() thrd2 = Thread(target = reader, args = (1,)) #Start threads thrd1.start() thrd2.start() ret = raw_input("press any key to exit:") shutdown = True thrd1.join(10.0) thrd2.join(10.0) if thrd1.is_alive(): print 'thrd1 is running' if thrd2.is_alive(): print 'thrd2 is running' else: print 'exited'