299 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			299 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from peewee import *
 | |
| from search import InventorySearch as ivs
 | |
| 
 | |
| db = MySQLDatabase('inventory', thread_safe=True, user='inventory', password='nfrwnfprifbwef', host='database', port=3306)
 | |
| search = None 
 | |
| 
 | |
| class user(Model):
 | |
|     name = CharField()
 | |
|     username = CharField(unique=True, primary_key=True)
 | |
|     password  = CharField() # replace with AD or something!!
 | |
|     changepw = BooleanField(default=False)
 | |
| 
 | |
|     class Meta:
 | |
|         database = db
 | |
|         legacy_table_names = False
 | |
| 
 | |
| class office(Model):
 | |
|     name = CharField(unique=True)
 | |
|     officeid = AutoField()
 | |
| 
 | |
|     class Meta:
 | |
|         database = db
 | |
|         legacy_table_names = False
 | |
| 
 | |
| class location(Model):
 | |
|     name = CharField()
 | |
|     locationid = AutoField()
 | |
|     description = CharField(null=True)
 | |
| 
 | |
|     parent = ForeignKeyField('self', null=True, backref="sublocations")
 | |
| 
 | |
|     class Meta:
 | |
|         database = db
 | |
|         legacy_table_names = False
 | |
| 
 | |
| 
 | |
| 
 | |
| class item(Model):
 | |
|     loc = ForeignKeyField(location, backref="items_here", null=True)
 | |
|     office = ForeignKeyField(office, backref="items_here", null=True)
 | |
|     fullname = CharField(null=True)
 | |
|     description = CharField(null=True)
 | |
|     serial = CharField(null=True)
 | |
|     checkout = BooleanField(default=False)
 | |
|     checkout_user = ForeignKeyField(user, backref="items_held", null=True)
 | |
|     checkout_start = DateTimeField(null=True)
 | |
|     checkout_end = DateTimeField(null=True)
 | |
|     mac = CharField(null=True)
 | |
|     barcode = CharField(unique=True, primary_key=True)
 | |
|     fwver = CharField(null=True)
 | |
|     manufacturer = CharField()
 | |
|     
 | |
|     
 | |
|     last_user = ForeignKeyField(user, null=True) # remove null=True once user auth works
 | |
| 
 | |
|     class Meta:
 | |
|         database = db
 | |
|         legacy_table_names = False
 | |
| 
 | |
| class component(Model):
 | |
|     owner = ForeignKeyField(item, backref="components")
 | |
|     name = CharField()
 | |
|     description = CharField(null=True)
 | |
|     barcode = CharField(unique=True, primary_key=True)
 | |
|     serial = CharField(null=True)
 | |
| 
 | |
|     class Meta:
 | |
|         database = db
 | |
|         legacy_table_names = False
 | |
| 
 | |
| def init():
 | |
|     print("Connecting to database...")
 | |
|     db.connect()
 | |
|     print("Checking & creating tables...")
 | |
|     db.create_tables([location, office, item, component, user])
 | |
|     print("Database initialized.")
 | |
|     global search
 | |
|     print("Creating cache index...")
 | |
|     search = ivs()
 | |
|     add = item.select().dicts()
 | |
|     #print(add)
 | |
|     #print(type(add))
 | |
|     for itm in add:
 | |
|         print(itm)
 | |
|         #print(type(itm))
 | |
|         search.add_document(itm)
 | |
|     print("Cache build complete.")
 | |
| 
 | |
| def search_item(query, filters: dict={}):
 | |
|     #print(filters)
 | |
|     if len(filters) > 0:
 | |
|         filt = ""
 | |
|         for key, val in filters.items():
 | |
|             if key == "office":
 | |
|                 # convert to integer!
 | |
|                 if val != 'all':
 | |
|                     if len(office.select().where(office.name == val).dicts()) > 0:
 | |
|                         val2 = str(office.select().where(office.name == val).dicts()[0]['officeid'])
 | |
|                     #print(val2)
 | |
|                     else:
 | |
|                         # office does not have any parts
 | |
|                         val2 = str(999999)
 | |
|                     filt += key + " = " + val2 + " AND "
 | |
|                 else:
 | |
|                     continue
 | |
|                 
 | |
|             else:
 | |
|                 filt += key + " = " + val + " AND "
 | |
|         filt = filt[0:-4] # remove extra and
 | |
|         #print(filt)
 | |
|         return search.search(query, filt)["hits"]
 | |
|     else:
 | |
|         return search.search(query, "")["hits"]
 | |
| 
 | |
| def find_item(barcode):
 | |
|     return search.get_barcode(barcode)
 | |
| 
 | |
| def create_item(fullname, serial, officename, barcode, location=None, description=None, manufacturer=None, mac=None, fwver=None):
 | |
|     try:
 | |
|         off = office(name=officename)
 | |
|         off.save(force_insert=True)
 | |
|     except IntegrityError:
 | |
|         pass
 | |
|     try:
 | |
|         off = office.select().where(office.name == officename)[0]
 | |
|         itm = item(office=off, barcode=barcode, fullname=fullname, description=description, loc=location, serial=serial, mac=mac, fwver=fwver, manufacturer=manufacturer)
 | |
|         itm.save(force_insert=True)
 | |
|         search.add_document(item.select().where(item.barcode==barcode).dicts()[0])
 | |
|         print("item: " + itm.fullname)
 | |
|         return itm
 | |
|     except IntegrityError:
 | |
|         print("Duplicate item " + fullname)
 | |
|         return False
 | |
| 
 | |
| def delete_item(itm):
 | |
|     #item.delete(itm)
 | |
|     itm.delete_instance()
 | |
|     
 | |
| def item_location_str(itm):
 | |
|     try:
 | |
|         return itm.loc.name
 | |
|     except:
 | |
|         if itm.checkout:
 | |
|             return "Checked out to unknown location"
 | |
|         else:
 | |
|             return "Unknown"
 | |
| 
 | |
| def create_component(parentitem, name, barcode, serial=None, description=None):
 | |
|     itm = parentitem
 | |
|     try:
 | |
|         cmp = component(owner=itm, name=name, barcode=barcode, description=description, serial=serial)
 | |
|         cmp.save(force_insert=True)
 | |
|         print("component: " + cmp.name)
 | |
|         return cmp
 | |
|     except IntegrityError:
 | |
|         print("Duplicate component " + name)
 | |
|         return False
 | |
| 
 | |
| def get_item(barcode):
 | |
|     query = item.select().where(item.barcode == barcode)
 | |
|     if len(query) == 1:
 | |
|         return query[0]
 | |
| 
 | |
|     # check if component exists
 | |
|     return get_component(barcode)
 | |
| 
 | |
| 
 | |
| def get_component(barcode):
 | |
|     query = component.select().where(component.barcode == barcode)
 | |
|     if len(query) == 1:
 | |
|         return query[0]
 | |
|     
 | |
|     return False
 | |
| 
 | |
| def create_user(name, username, password, changepw=False):
 | |
|     try:
 | |
|         usr = user(username=username, name=name, password=password, changepw=changepw)
 | |
|         usr.save(force_insert=True)
 | |
|         return usr
 | |
|     except IntegrityError:
 | |
|         print("User " + username + " already exists.")
 | |
|         return False
 | |
| 
 | |
| def change_password(username, password):
 | |
|     usr = get_user(username)
 | |
|     if usr:
 | |
|         usr.password = password
 | |
|         user.changepw = False
 | |
|         usr.save(force_insert=True)
 | |
|         return True
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| def checkout(user, barcode, loc=None):
 | |
|     itm = get_item(barcode)
 | |
|     if itm:
 | |
|         itm.checkout = True
 | |
|         itm.checkout_user = user
 | |
|         itm.loc = loc
 | |
|         itm.save()
 | |
|         return itm
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| def checkin(user, barcode, loc=None):
 | |
|     itm = get_item(barcode)
 | |
|     if itm:
 | |
|         itm.checkout = False
 | |
|         itm.last_user = user
 | |
|         itm.loc = loc
 | |
|         itm.save()
 | |
|         return itm
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
|         
 | |
| def create_location(name, parent=None):
 | |
|     if parent is not None:
 | |
|         loc = location(name=name, parent=parent)
 | |
|         loc.save()
 | |
|         return loc
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| def _find_parent(loc, parent):
 | |
|     if hasattr(loc, 'parent'):
 | |
|         if loc.parent.locationid == parent.locationid:
 | |
|             return True
 | |
|         else:
 | |
|             return _find_parent(loc.parent, parent)
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| def get_location(name, parent=None):
 | |
|     try:
 | |
|         query = location.select().where(location.name == name)
 | |
|         if parent is not None:
 | |
|             for loc in query:
 | |
|                 if _find_parent(loc, parent):
 | |
|                     return loc
 | |
|             return False
 | |
| 
 | |
|         else:
 | |
|             if len(query) == 1:
 | |
|                 return query[0]
 | |
|             else:
 | |
|                 return False
 | |
|     except:
 | |
|         return False
 | |
| 
 | |
| def get_user(name):
 | |
|     query = user.select().where(user.username == name)
 | |
|     if len(query) == 1:
 | |
|         return query[0]
 | |
| 
 | |
|     query = user.select().where(user.name == name)
 | |
|     if len(query) == 1:
 | |
|         return query[0]
 | |
| 
 | |
|     return False
 | |
| 
 | |
| def user_login(username, password):
 | |
|     user = get_user(username)
 | |
|     if user:
 | |
|         return user.password == password
 | |
|     else:
 | |
|         return False
 | |
| 
 | |
| def test():
 | |
|     costa = create_user("Costa Aralis", "caralpwtwfpis", "12345")
 | |
|     costa.username = "caralis"
 | |
|     costa.save(force_insert=True)
 | |
| 
 | |
|     office = location(name="Chicago CIC")
 | |
|     office.save()
 | |
|     shelf2a = location(name="Shelf 2A", parent=office)
 | |
|     shelf2a.save()
 | |
| 
 | |
|     create_item("BRS50", "BRS50-00242Q2Q-STCY99HHSESXX.X.XX", "12345678", location=shelf2a)
 | |
| 
 | |
|     create_item("BRS50", "BRS50-00242W2W-STCY99HHSESXX.X.XX", "123456789", location=office)
 | |
| 
 | |
|     #brs50 = part(name="BRS50", description="it's a frickin BRS dude", quantity=1)
 | |
|     #brs50.save()
 | |
|     #mybrs = item(owner=brs50, fullname="BRS50-00242Q2Q-STCY99HHSESXX.X.XX", description="This one has 6 dead ports. RIP", loc=shelf2a, barcode="tlg4276p4dj85697")
 | |
|     #mybrs.save()
 | |
|     print("Querying...")
 | |
|     #query = part.select()
 | |
|     query = item.select().where(item.name == "BRS50")
 | |
| 
 | |
|     for brs in query:
 | |
|         for itm in brs.items:
 | |
|             print(itm.fullname)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     init()
 | |
|     test()
 |