You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1000 lines
32 KiB
Python

"""
The module that offers access to the Paxton Net2 server
"""
import clr
import os
import sys
import Net2Scripting.settings
from Net2Scripting.net2xs.conversions import date_time_to_net, flex_date_time_to_net, \
time_zones_to_py, access_levels_to_py, access_level_detail_to_py
from datetime import datetime
from Net2Scripting.net2base import Net2Base
from Net2Scripting.pylog4net import Log4Net
from threading import RLock
from System import Array
from System.Data import DataSet
from System.Reflection import Assembly
# Minimal required net2 version
MIN_NET2_VERSION = 501
# Paxton assembly
PAXTON_ASSEMBLY = 'Paxton.Net2.OEMClientLibrary'
def readable_min_version():
"""Show a user readable minimum version
"""
major = MIN_NET2_VERSION / 100
minor = MIN_NET2_VERSION - major * 100
return "%d.%d" % (major, minor)
class Net2XSException(Exception):
"""Exception class for net2 xs
"""
pass
# The code below is required to determine if the already installed
# Net2 version can be used, or that the packaged version is required.
try:
# Obtain paxton assembly reference
asm = Assembly.LoadWithPartialName(PAXTON_ASSEMBLY)
# Found: check the version
if asm:
ver = asm.GetName().Version
if ver.Major * 100 + ver.Minor < MIN_NET2_VERSION:
raise Net2XSException(
'Only Net2 V%s or higher is supported' %
readable_min_version())
# Not found: enable the packaged paxton libs
else:
# Add path lib path to search path
PAXTON_LIB_DIR = os.path.join(Net2Scripting.settings.LIB_DIR, 'paxton')
if PAXTON_LIB_DIR not in sys.path:
sys.path.append(PAXTON_LIB_DIR)
try:
Assembly.LoadWithPartialName(PAXTON_ASSEMBLY)
clr.AddReference(PAXTON_ASSEMBLY)
from Paxton.Net2.OemClientLibrary import OemClient as OC
from Paxton.Net2.OemClientLibrary import AccessLevelDetailSet
from Paxton.Net2.OemClientLibrary import TimezonesSet
from Paxton.Net2.OemClientLibrary import EventViewEnums
except:
raise Net2XSException('Failed to load the library')
except Exception as e:
Log4Net.get_logger('Net2XS').Fatal('Paxton error: %s' % str(e))
sys.exit(1)
# end of paxton loading
class Net2XS(Net2Base):
"""Net2 Access class
"""
# Class variables
_logger = Log4Net.get_logger('Net2XS')
_lock = RLock()
def __init__(self, host='localhost', port=8025):
"""Class constructor
"""
self._client = None
self._host = host
self._port = port
self._connected = False
self._on_acu_event = None
def __enter__(self):
"""With enter handler
"""
return self
def __exit__(self, type, value, traceback):
"""With exit handler
"""
self.dispose()
def _check_client(self):
"""Check is client connection is valid
"""
if not self._client or not self._connected:
raise Net2XSException('Not connected')
def authenticate(self, user_id, password):
"""Authenticate to Net2
"""
with Net2XS._lock:
# Save for re authentication
self._user_id = user_id
self._password = password
self.dispose()
Net2XS._logger.Debug('Connecting to net2 server on %s:%d' %
(self._host, self._port))
self._client = OC(self._host, self._port)
methods = self._client.AuthenticateUser(user_id, password)
if not methods:
raise Net2XSException('Authentication failed: ' +
self._client.LastErrorMessage)
else:
self._connected = True
# Add disconnect reconnect handlers
self._client.Net2ServerReconnected += (
OC.Net2ServerReconnectedHandler(self._reconnected))
self._client.Net2ServerDisconnected += (
OC.Net2ServerDisconnectedHandler(self._disconnected))
# Add acu event handler
self._client.Net2AccessEvent += (
OC.Net2AcuEventHandler(self._acu_event))
Net2XS._logger.Debug('Authenticated')
@property
def client_version(self):
"""Client version
"""
asm = Assembly.GetAssembly(OC)
ver = asm.GetName().Version
return (ver.Major, ver.Minor)
def query_db(self, query):
"""Perform a db query
Returns a dataset
"""
with Net2XS._lock:
self._check_client()
return self._client.QueryDb(query)
def are_doors_being_synchronised(self):
"""Return if synchronization is taking place
Returns a boolean
"""
with Net2XS._lock:
self._check_client()
return self._client.AreDoorsBeingSynchronised()
def get_doors(self, device_address=-1):
"""Get all doors (ViewDoors)
Device_address is an optional integer
Returns a dataset
"""
with Net2XS._lock:
self._check_client()
if device_address < 0:
return self._client.ViewDoors().DoorsDataSource
else:
return self._client.ViewDoors(device_address).DoorsDataSource
def get_door_name(self, device_address):
"""Obtain door name
Returns a string
"""
with Net2XS._lock:
self._check_client()
dataset = self._client.ViewDoors(device_address).DoorsDataSource
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0].Name
def get_departments(self):
"""Get all departments (ViewDepartments)
Returns a dataset
"""
with Net2XS._lock:
self._check_client()
return self._client.ViewDepartments().DepartmentsDataSource
def close_door(self, device_address):
"""Close a door
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.CloseDoor(device_address)
def hold_door_open(self, device_address):
"""Hold a door open
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.HoldDoorOpen(device_address)
def control_door(self, device_address, relay, function, door_open_time, led_flash):
"""Control a door
Relay is 0 or 1, for relay 1 or 2.
Function 0 Close, 1 Timed open, 2 Hold Open.
Door_open_time in ms.
Led_flash see Net2 API.
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.ControlDoorEx(
device_address, relay, function, door_open_time, led_flash)
def get_department_name(self, department_id):
"""Get department name by id
Returns a string or None if department was not found
"""
dataset = self.query_db(
'select DepartmentName from sdk.Departments'
' where DepartmentID=%d' % department_id)
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0][0]
def get_users_ex(self, name=None):
"""Get users, from the UsersEx view.
Name param is optional tuple (first_name, sur_name)
Returns a dataset
"""
query = 'select * from sdk.UsersEx'
if name and len(name) == 2:
first_name, sur_name = name
query = ("%s where FirstName='%s' and Surname='%s'" %
(query, first_name, sur_name))
return self.query_db(query)
def get_user_id_by_name(self, name):
"""Get user id by name
Name is a (first_name, sur_name) tuple
Returns the id or -1 if not found.
"""
dataset = self.get_users(name)
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return -1
return dataset.Tables[0].Rows[0].get_Item("UserID")
def get_users(self, name=None):
"""Get users, from the ViewUserRecords call
Name param is optional tuple (first_name, sur_name)
Returns a dataset
"""
with Net2XS._lock:
self._check_client()
wheres = ['Active=1']
if name and len(name) == 2:
first_name, sur_name = name
if first_name:
wheres.append("FirstName='%s'" % first_name)
if sur_name:
wheres.append("Surname='%s'" % sur_name)
return self._client.ViewUserRecords(
' and '.join(wheres)).UsersDataSource
def get_user_record(self, user_id):
"""Get user record, from the ViewUserRecords call
Returns an IUserView user record or None if not found
"""
with Net2XS._lock:
self._check_client()
# Fetch current user info
users = self._client.ViewUserRecords(
'UserID=%d' % user_id).UsersList()
if users.Count != 1 or not users.ContainsKey(user_id):
return None
return users[user_id]
def get_user_name(self, user_id):
"""Get user name
Returns a string or None when user is not found
"""
dataset = self.query_db(
'select Username from sdk.UsersEx where UserID=%d' % user_id)
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0][0]
def add_user(
self,
access_level_id=1,
department_id=0,
anti_passback_ind=False,
alarm_user_ind=False,
first_name=None,
middle_name=None,
sur_name=None,
telephone_no=None,
telephone_extension=None,
pin_code=None,
activation_date=None,
active=True,
fax_no=None,
expiry_date=None,
custom_fields=None,
user_picture=None):
"""Add user record
DateTime fields can be either python or dotnet objects.
If activation date is None, the current date will be used.
If expiry date is None, the user entry will not expire.
Custom_fields is a string array (15) of which the first element is
ignored.
Returns True on success.
"""
# If no user name is given at all: create one (required by Net2)
if not first_name and not sur_name and not middle_name:
first_name = "New"
sur_name = "User"
with Net2XS._lock:
self._check_client()
return self._client.AddUserRecord(
access_level_id,
department_id,
anti_passback_ind,
alarm_user_ind,
first_name,
middle_name,
sur_name,
telephone_no,
telephone_extension,
pin_code,
None,
flex_date_time_to_net(activation_date) or self.now_date,
0,
0,
active,
fax_no,
flex_date_time_to_net(expiry_date) or self.no_expiration_date,
custom_fields,
user_picture)
def modify_user(
self,
user_id,
access_level_id=None,
department_id=None,
anti_passback_ind=None,
alarm_user_ind=None,
first_name=None,
middle_name=None,
sur_name=None,
telephone_no=None,
telephone_extension=None,
pin_code=None,
activation_date=None,
active=None,
fax_no=None,
expiry_date=None,
custom_fields=None,
user_picture=None,
delete_image=False):
"""Modify user record
Fields omitted keep their original value.
Custom_fields is a string array (15) of which the first element is
ignored.
Providing None custom values in the array, leaves the original value
unchanged.
Returns True on success.
"""
# Fetch current user info
uview = self.get_user_record(user_id)
if not uview:
return False
with Net2XS._lock:
self._check_client()
return self._client.UpdateUserRecord(
user_id,
uview.AccessLevelId if access_level_id is None else access_level_id,
uview.DepartmentId if department_id is None else department_id,
uview.AntiPassbackUser if anti_passback_ind is None else anti_passback_ind,
uview.AlarmUser if alarm_user_ind is None else alarm_user_ind,
first_name,
middle_name,
sur_name,
telephone_no,
telephone_extension,
pin_code,
None,
flex_date_time_to_net(activation_date) or uview.ActivationDate,
uview.Active if active is None else active,
fax_no,
flex_date_time_to_net(expiry_date) or uview.ExpiryDate,
custom_fields,
user_picture,
delete_image)
def delete_user(self, user_id):
"""Delete user record
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.PurgeUser(user_id)
def deactivate_user(self, user_id):
"""Deactivate user. Preferred method iso delete.
Returns True on success
"""
return self.modify_user(
user_id=user_id,
access_level_id=0,
department_id=0,
pin_code="",
active=False)
def modify_user_access_level(self, user_id, access_level_id):
"""Alter user access level
Returns True on success
"""
return self.modify_user(
user_id=user_id,
access_level_id=access_level_id)
def modify_user_picture(self, user_id, user_picture):
"""Alter user picture. If user_picture is None, remove the picture.
Returns True on success
"""
return self.modify_user(
user_id=user_id,
user_picture=user_picture,
delete_image=True if not user_picture else False)
def get_area_ids(self, device_address):
"""Get area ids of a device
Returns a tuple with area id's (in, out)
"""
dataset = self.query_db(
'select b.ToAreaID from sdk.PeripheralNames a'
' inner join sdk.AreaGateways b on'
' a.PeripheralID=b.PeripheralID and'
' a.SerialNumber=%d order by a.SubAddress' %
(device_address))
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count != 2):
return None
return (dataset.Tables[0].Rows[0][0], dataset.Tables[0].Rows[1][0])
def get_device_addr_info(self):
"""Obtain all relevant device address info
Returns a dataset
"""
return self.query_db(
'select a.SerialNumber, a.SubAddress, a.PeripheralID, b.ToAreaID'
' from sdk.PeripheralNames a'
' inner join AreaGateways b on a.PeripheralID=b.PeripheralID'
' order by a.SerialNumber')
def get_time_slots(self, access_level_id, area_id):
"""Obtain allocated timeslots
Returns a dataset
"""
return self.query_db(
'select * from sdk.Timeslots'
' where TimezoneID='
'(select TimezoneID from AccessLevelMembers'
' where AccessLevelID=%d and AreaID=%d)' %
(access_level_id, area_id or -1))
def get_time_zones(self, time_zone_id=-1):
"""Obtain timezones, including slot data
Time_zone_id is optional. If not provided, all timezones are returned.
Returns a dataset
"""
with Net2XS._lock:
self._check_client()
if time_zone_id < 0:
return self._client.ViewTimezones().TimezonesDataSource
else:
return self._client.ViewTimezones(
time_zone_id, False).TimezonesDataSource
def get_py_time_zones(self, time_zone_id=-1):
"""Like get_time_zones, but returns a python array of TimeZone objects
Returns an array of TimeZone objects
"""
dataset = self.get_time_zones(time_zone_id)
return time_zones_to_py(dataset)
def add_time_zone(self, time_zone):
"""Add new time zone
Time_zone is a python TimeZone object type.
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
tz_set = TimezonesSet()
for slot in time_zone.slots:
tz_set.Tables[1].AddTimeslotsRow(
slot.id,
time_zone.id,
date_time_to_net(slot.start_time),
date_time_to_net(slot.end_time),
slot.day)
# To array
ts_array = Array[TimezonesSet.TimeslotsRow](
len(time_zone.slots) * [None])
for i, row in enumerate(tz_set.Tables[1].Rows):
ts_array[i] = row
# Dispose timezones set
tz_set.Dispose()
return self._client.AddTimezone(time_zone.name, ts_array)
def delete_time_zone(self, time_zone_id):
"""Remove time zone with given id
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.DeleteTimezone(time_zone_id)
def update_time_zone(self, time_zone):
"""Update existing time zone
Time_zone is a python TimeZone object type.
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
tz_set = TimezonesSet()
nr_slots = len(time_zone.slots)
if nr_slots:
for slot in time_zone.slots:
tz_set.Tables[1].AddTimeslotsRow(
slot.id,
time_zone.id,
date_time_to_net(slot.start_time),
date_time_to_net(slot.end_time),
slot.day)
# Work around for bug "Update Timezone with empty timeslot" (forum)
# At least 1 row is required for the update to work correctly.
# Should be fixed in V4.21
# TODO: only perform for lower versions?
else:
nr_slots = 1
tz_set.Tables[1].AddTimeslotsRow(
-1,
time_zone.id,
date_time_to_net(datetime(1899, 12, 30, 23, 59, 59)),
date_time_to_net(datetime(1899, 12, 30, 23, 59, 59)),
0)
# To array (bit awkward in Python.Net)
ts_array = Array[TimezonesSet.TimeslotsRow](nr_slots * [None])
for i, row in enumerate(tz_set.Tables[1].Rows):
ts_array[i] = row
# Dispose timezones set
tz_set.Dispose()
return self._client.UpdateTimezone(time_zone.id, ts_array)
def get_time_zone_id_by_name(self, name):
"""Get time zone id of time zone with given name
Returns a string or None when the timezone could not be found.
"""
dataset = self.query_db(
"select TimezoneID from sdk.Timezones"
" where Name='%s'" % name)
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0][0]
def get_access_levels(self, prefix=None):
"""Obtain access levels, optinally profiding a name prefix filter
Prefix is an optional parameter to obtain only access levels with the
given prefix.
Details need to be fetched separately.
Returns a dataset.
"""
query = 'select * from sdk.AccessLevels'
if prefix:
query = "%s where AccessLevelName like '%s%%'" % (query, prefix)
return self.query_db(query)
def get_access_level_details(self, access_level_id):
"""Obtain access level details
Returns a dataset.
"""
query = 'select * from sdk.AccessLevelMembers where AccessLevelID=%d' % (access_level_id)
return self.query_db(query)
def get_py_access_levels(self, prefix=None):
"""Like get_access_level, but returning a python array of AccessLevel
objects including details!
Returns an array of AccessLevel objects.
"""
dataset = self.get_access_levels(prefix)
if not dataset:
return []
access_levels = access_levels_to_py(dataset)
for al in access_levels:
dataset = self.get_access_level_details(al.id)
if dataset:
access_level_detail_to_py(al, dataset)
return access_levels
def add_access_level(self, access_level):
"""Add new access level
Access_level is a python AccessLevel object type.
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
detail_set = AccessLevelDetailSet()
if access_level.details:
for detail in access_level.details:
detail_set.Tables[0].AddAccessLevelDetailRow(
None,
detail[0], # time zone id
None,
detail[1], # area id
None,
-1, # address
-1, # sub address
-1) # access level id
return self._client.AddAccessLevel(access_level.name, detail_set)
def delete_access_level(self, access_level_id):
"""Delete access level
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.DeleteAccessLevel(access_level_id)
def update_access_level(self, access_level):
"""Update access level
Access_level is a python AccessLevel object type.
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
if access_level.details:
detail_set = AccessLevelDetailSet()
for detail in access_level.details:
detail_set.Tables[0].AddAccessLevelDetailRow(
None,
detail[0], # time zone id
None,
detail[1], # area id
None,
-1, # address
-1, # sub address
-1) # access level id
# Work around for similar bug as with the time zone slots
# When details are empty, existing data will not be cleared
# Therefore set TimezoneID to 0 for any existing entries
else:
ald = self._client.ViewAccessLevelDetail(access_level.id)
detail_set = ald.AccessLevelDetailsDataSource
for row in detail_set.Tables[0].Rows:
row.TimezoneID = 0
return self._client.UpdateAccessLevel(
access_level.id, access_level.name, detail_set)
def get_access_level_id_by_user(self, user_id):
"""Get access level id of given user
Returns an integer or None if the user could not be found
"""
with Net2XS._lock:
self._check_client()
dataset = self._client.ViewUserRecords(
'UserID=%d' % (user_id)).UsersDataSource
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0].AccessLevelID
def get_access_level_id_by_name(self, name):
"""Get access level id of level with given name
Returns a string or None if the accesslevel could not be found.
"""
dataset = self.query_db(
"select AccessLevelID from sdk.AccessLevels"
" where AccessLevelName='%s'" % name)
if (not dataset or
dataset.Tables.Count < 1 or
dataset.Tables[0].Rows.Count == 0):
return None
return dataset.Tables[0].Rows[0][0]
def get_operator_level(self, user_id):
"""Get operator level of the given user id
Returns an integer.
"""
with Net2XS._lock:
self._check_client()
return self._client.GetOperatorLevel(user_id)
def add_card(self, card_nr, type_id, user_id):
"""Add a new card to the given user
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.AddCard(card_nr, type_id, user_id)
def delete_card(self, card_nr):
"""Delete a card with the given card_nr
Returns True on success (even if card_nr did not exist).
"""
with Net2XS._lock:
self._check_client()
return self._client.DeleteCard(card_nr)
def get_cards(self, user_id=None):
"""Obtain all 'cards' of the given user id
If no user_id is given, all cards are returned.
Returns a dataset.
"""
with Net2XS._lock:
self._check_client()
# Setup dataset to return
dataset = DataSet()
table = dataset.Tables.Add()
table.Columns.Add("UserID")
table.Columns.Add("CardNumber")
table.Columns.Add("LostCard")
table.Columns.Add("CardTypeID")
# Fetch normal cards
query = ("select * from sdk.Cards" +
" where CardTypeID is not null" +
" and CardTypeID <> 7" +
" and CardNumber <> 100000000 + UserID")
if type(user_id) is int:
query += " and UserID=%d" % (user_id)
card_dataset = self.query_db(query)
if card_dataset and card_dataset.Tables.Count > 0:
for row in card_dataset.Tables[0].Rows:
table.Rows.Add(row['UserID'],
row['CardNumber'],
row['LostCard'],
row['CardTypeID'])
# Fetch vehicle registrations
query = "select * from sdk.UserVehicleIndex"
if type(user_id) is int:
query += " where UserID=%d" % (user_id)
vehicle_dataset = self.query_db(query)
if vehicle_dataset and vehicle_dataset.Tables.Count > 0:
for row in vehicle_dataset.Tables[0].Rows:
table.Rows.Add(row['UserID'],
row['VehicleIndex'],
False,
7)
return dataset
def add_event_record(
self,
#CK: check!
#event_type, event_subtype=EventViewEnums.Net2EventSubTypes.None,
event_type, event_subtype=None,
address=0, subaddress=0, user_id=None, card_nr=0,
event_detail=None,
linked_event_id=0, ioboard_id=0, input_id=0, output_id=0):
"""Add an event record
Returns True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.AddEventRecord(
event_type,
event_subtype,
address, subaddress, user_id, card_nr, event_detail,
linked_event_id, ioboard_id, input_id, output_id)
def add_acu(self, address, acu_type):
"""Add new acu device
Acu_types: Classic = 1, Nano = 2, Plus = 3, EasyProxNano = 4, PaxLoc = 7, PaxLocMifare = 8
Returns True on success. (up till 4.27, there is a bug in the SDK causing a False on success)
"""
with Net2XS._lock:
self._check_client()
return self._client.AddACU(address, acu_type)
def delete_acu(self, address):
"""Delete acu device
Return True on success.
"""
with Net2XS._lock:
self._check_client()
return self._client.DeleteACU(address)
@property
def last_error_message(self):
"""Last error message
"""
with Net2XS._lock:
self._check_client()
return self._client.LastErrorMessage
@property
def no_expiration_date(self):
"""The 'special' no expiration date of Net2 (1753-01-01)
Returns dotnet DateTime object
"""
return date_time_to_net(datetime(year=1753, month=1, day=1))
@property
def now_date(self):
"""The now datetime
Returns dotnet DateTime object
"""
return date_time_to_net(datetime.now())
def dispose(self):
"""Dispose Net2 client
"""
with Net2XS._lock:
if self._client:
try:
self._client.Dispose()
self._client = None
Net2XS._logger.Debug('Disposed')
except:
pass
def _reconnected(self, sender, event_args):
"""Reconnect handler
"""
if event_args.ReAuthenticationRequired:
try:
self.authenticate(self._user_id, self._password)
self._connected = True
Net2XS._logger.Debug('Reconnected')
except Exception as e:
Net2XS._logger.Debug('Reconnect error: %s' % str(e))
else:
self._connected = True
def _disconnected(self, sender, event_args):
"""Disconnect handler
"""
Net2XS._logger.Debug('Disconnected')
self._connected = False
@property
def on_acu_event(self):
return self._on_acu_event
@on_acu_event.setter
def on_acu_event(self, value):
self._on_acu_event = value
def _acu_event(self, sender, event_view):
"""Acu event handler
"""
if self.on_acu_event:
try:
self.on_acu_event(sender, event_view)
except Exception as e:
Net2XS._logger.Debug('Acu even handler error: %s' % str(e))
def monitor_acu(self, address):
"""Monitor events of given acu
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.MonitorAcu(address)
def stop_monitoring_acu(self, address):
"""Stop monitoring events of given acu
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.StopMonitoringAcu(address)
def activate_ioboard_relays(self, ioboard_id, settings):
"""Control ioboard relays
Returns True on success
"""
with Net2XS._lock:
self._check_client()
return self._client.ActivateIoBoardRelays(
ioboard_id, settings)
@classmethod
def door_status_str(cls, status):
"""Translate door status to human readable string
Returns a csv string with the combined status bits.
"""
ret = []
if status & OC.DoorStatusFlags.PSUIsOK:
ret.append('psu ok')
if status & OC.DoorStatusFlags.IntruderAlarm:
ret.append('intruder')
if not status & OC.DoorStatusFlags.TamperStatusGood:
ret.append('tampered')
if status & OC.DoorStatusFlags.DoorContactClosed:
ret.append('door contact closed')
if status & OC.DoorStatusFlags.AlarmTripped:
ret.append('alarm')
if status & OC.DoorStatusFlags.DoorOpen:
ret.append('door open')
# When psu not ok, other state bits are irrelevant
else:
ret.append('psu not ok')
return ', '.join(ret)