You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

299 lines
8.4 KiB
Python

1 month ago
from peewee import *
from search import InventorySearch as ivs
db = MySQLDatabase('inventory', thread_safe=True, user='inventory', password='nfrwnfprifbwef', host='database', port=3306)
search = None
class user(Model):
name = CharField()
username = CharField(unique=True, primary_key=True)
password = CharField() # replace with AD or something!!
changepw = BooleanField(default=False)
class Meta:
database = db
legacy_table_names = False
class office(Model):
name = CharField(unique=True)
officeid = AutoField()
class Meta:
database = db
legacy_table_names = False
class location(Model):
name = CharField()
locationid = AutoField()
description = CharField(null=True)
parent = ForeignKeyField('self', null=True, backref="sublocations")
class Meta:
database = db
legacy_table_names = False
class item(Model):
loc = ForeignKeyField(location, backref="items_here", null=True)
office = ForeignKeyField(office, backref="items_here", null=True)
fullname = CharField(null=True)
description = CharField(null=True)
serial = CharField(null=True)
checkout = BooleanField(default=False)
checkout_user = ForeignKeyField(user, backref="items_held", null=True)
checkout_start = DateTimeField(null=True)
checkout_end = DateTimeField(null=True)
mac = CharField(null=True)
barcode = CharField(unique=True, primary_key=True)
fwver = CharField(null=True)
manufacturer = CharField()
last_user = ForeignKeyField(user, null=True) # remove null=True once user auth works
class Meta:
database = db
legacy_table_names = False
class component(Model):
owner = ForeignKeyField(item, backref="components")
name = CharField()
description = CharField(null=True)
barcode = CharField(unique=True, primary_key=True)
serial = CharField(null=True)
class Meta:
database = db
legacy_table_names = False
def init():
print("Connecting to database...")
db.connect()
print("Checking & creating tables...")
db.create_tables([location, office, item, component, user])
print("Database initialized.")
global search
print("Creating cache index...")
search = ivs()
add = item.select().dicts()
#print(add)
#print(type(add))
for itm in add:
print(itm)
#print(type(itm))
search.add_document(itm)
print("Cache build complete.")
def search_item(query, filters: dict={}):
#print(filters)
if len(filters) > 0:
filt = ""
for key, val in filters.items():
if key == "office":
# convert to integer!
if val != 'all':
if len(office.select().where(office.name == val).dicts()) > 0:
val2 = str(office.select().where(office.name == val).dicts()[0]['officeid'])
#print(val2)
else:
# office does not have any parts
val2 = str(999999)
filt += key + " = " + val2 + " AND "
else:
continue
else:
filt += key + " = " + val + " AND "
filt = filt[0:-4] # remove extra and
#print(filt)
return search.search(query, filt)["hits"]
else:
return search.search(query, "")["hits"]
def find_item(barcode):
return search.get_barcode(barcode)
def create_item(fullname, serial, officename, barcode, location=None, description=None, manufacturer=None, mac=None, fwver=None):
try:
off = office(name=officename)
off.save(force_insert=True)
except IntegrityError:
pass
try:
off = office.select().where(office.name == officename)[0]
itm = item(office=off, barcode=barcode, fullname=fullname, description=description, loc=location, serial=serial, mac=mac, fwver=fwver, manufacturer=manufacturer)
itm.save(force_insert=True)
search.add_document(item.select().where(item.barcode==barcode).dicts()[0])
print("item: " + itm.fullname)
return itm
except IntegrityError:
print("Duplicate item " + fullname)
return False
def delete_item(itm):
#item.delete(itm)
itm.delete_instance()
def item_location_str(itm):
try:
return itm.loc.name
except:
if itm.checkout:
return "Checked out to unknown location"
else:
return "Unknown"
def create_component(parentitem, name, barcode, serial=None, description=None):
itm = parentitem
try:
cmp = component(owner=itm, name=name, barcode=barcode, description=description, serial=serial)
cmp.save(force_insert=True)
print("component: " + cmp.name)
return cmp
except IntegrityError:
print("Duplicate component " + name)
return False
def get_item(barcode):
query = item.select().where(item.barcode == barcode)
if len(query) == 1:
return query[0]
# check if component exists
return get_component(barcode)
def get_component(barcode):
query = component.select().where(component.barcode == barcode)
if len(query) == 1:
return query[0]
return False
def create_user(name, username, password, changepw=False):
try:
usr = user(username=username, name=name, password=password, changepw=changepw)
usr.save(force_insert=True)
return usr
except IntegrityError:
print("User " + username + " already exists.")
return False
def change_password(username, password):
usr = get_user(username)
if usr:
usr.password = password
user.changepw = False
usr.save(force_insert=True)
return True
else:
return False
def checkout(user, barcode, loc=None):
itm = get_item(barcode)
if itm:
itm.checkout = True
itm.checkout_user = user
itm.loc = loc
itm.save()
return itm
else:
return False
def checkin(user, barcode, loc=None):
itm = get_item(barcode)
if itm:
itm.checkout = False
itm.last_user = user
itm.loc = loc
itm.save()
return itm
else:
return False
def create_location(name, parent=None):
if parent is not None:
loc = location(name=name, parent=parent)
loc.save()
return loc
else:
return False
def _find_parent(loc, parent):
if hasattr(loc, 'parent'):
if loc.parent.locationid == parent.locationid:
return True
else:
return _find_parent(loc.parent, parent)
else:
return False
def get_location(name, parent=None):
try:
query = location.select().where(location.name == name)
if parent is not None:
for loc in query:
if _find_parent(loc, parent):
return loc
return False
else:
if len(query) == 1:
return query[0]
else:
return False
except:
return False
def get_user(name):
query = user.select().where(user.username == name)
if len(query) == 1:
return query[0]
query = user.select().where(user.name == name)
if len(query) == 1:
return query[0]
return False
def user_login(username, password):
user = get_user(username)
if user:
return user.password == password
else:
return False
def test():
costa = create_user("Costa Aralis", "caralpwtwfpis", "12345")
costa.username = "caralis"
costa.save(force_insert=True)
office = location(name="Chicago CIC")
office.save()
shelf2a = location(name="Shelf 2A", parent=office)
shelf2a.save()
create_item("BRS50", "BRS50-00242Q2Q-STCY99HHSESXX.X.XX", "12345678", location=shelf2a)
create_item("BRS50", "BRS50-00242W2W-STCY99HHSESXX.X.XX", "123456789", location=office)
#brs50 = part(name="BRS50", description="it's a frickin BRS dude", quantity=1)
#brs50.save()
#mybrs = item(owner=brs50, fullname="BRS50-00242Q2Q-STCY99HHSESXX.X.XX", description="This one has 6 dead ports. RIP", loc=shelf2a, barcode="tlg4276p4dj85697")
#mybrs.save()
print("Querying...")
#query = part.select()
query = item.select().where(item.name == "BRS50")
for brs in query:
for itm in brs.items:
print(itm.fullname)
if __name__ == "__main__":
init()
test()