You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
5.0 KiB
Python

#!/usr/bin/env python3
"""Interactions with the Meilisearch API for adding and searching cables."""
import sys
from meilisearch import Client
from meilisearch.task import TaskInfo
from meilisearch.errors import MeilisearchApiError
import time
DEFAULT_URL = "http://meilisearch:7700"
DEFAULT_APIKEY = "fluffybunnyrabbit" # I WOULD RECOMMEND SOMETHING MORE SECURE
DEFAULT_INDEX = "items"
DEFAULT_FILTERABLE_ATTRS = ["office", "loc", "fullname", "serial", "checkout_user", "checkout", "barcode", "manufacturer"] # default filterable attributes
class InventorySearch:
"""Class for interacting with the Meilisearch API."""
def __init__(self,
url: str = None,
api_key: str = None,
index: str = None,
filterable_attrs: list = None):
"""Connect to Meilisearch and perform first-run tasks as necessary.
:param url: Address of the Meilisearch server. Defaults to ``http://localhost:7700`` if unspecified.
:param api_key: API key used to authenticate with Meilisearch. It is highly recommended to set this as something
secure if you can access this endpoint publicly, but you can ignore this and set Meilisearch's default API key
to ``fluffybunnyrabbit``.
:param index: The name of the index to configure. Defaults to ``cables`` if unspecified.
:param filterable_attrs: List of all the attributes we want to filter by."""
# connect to Meilisearch
url = url or DEFAULT_URL
api_key = api_key or DEFAULT_APIKEY
filterable_attrs = filterable_attrs or DEFAULT_FILTERABLE_ATTRS
self.index = index or DEFAULT_INDEX
self.client = Client(url, api_key)
# create the index if it does not exist already
try:
self.client.get_index(self.index)
self.client.delete_index(self.index)
time.sleep(0.05)
self.client.create_index(self.index, {'primaryKey': 'barcode'})
time.sleep(0.05)
except MeilisearchApiError as _:
self.client.create_index(self.index, {'primaryKey': 'barcode'})
# make a variable to easily reference the index
self.idxref = self.client.index(self.index)
time.sleep(0.05)
# disable typos, we have serial numbers and such that should be exact match
self.idxref.update_typo_tolerance({'enabled': False})
# update filterable attributes if needed
self.idxref.update_distinct_attribute('barcode')
self.update_filterables(filterable_attrs)
time.sleep(0.05)
def add_document(self, document: dict) -> TaskInfo:
"""Add a cable to the Meilisearch index.
:param document: Dictionary containing all the cable data.
:returns: A TaskInfo object for the addition of the new document."""
return self.idxref.add_documents(document)
def add_documents(self, documents: list):
"""Add a list of cables to the Meilisearch index.
:param documents: List of dictionaries containing all the cable data.
:returns: A TaskInfo object for the last new document."""
taskinfo = None
for i in documents:
taskinfo = self.add_document(i)
return taskinfo
def update_filterables(self, filterables: list):
"""Update filterable attributes and wait for database to fully index. If the filterable attributes matches the
current attributes in the database, don't update (saves reindexing).
:param filterables: List of all filterable attributes"""
#existing_filterables = self.idxref.get_filterable_attributes()
#if len(set(existing_filterables).difference(set(filterables))) > 0:
taskref = self.idxref.update_filterable_attributes(filterables)
#self.client.wait_for_task(taskref.index_uid)
def search(self, query: str, filters: str = None):
"""Execute a search query on the Meilisearch index.
:param query: Seach query
:param filters: A meilisearch compatible filter statement.
:returns: The search results dict. Actual results are in a list under "hits", but there are other nice values that are useful in the root element."""
if filters:
q = self.idxref.search(query, {"filter": filters})
else:
q = self.idxref.search(query)
return q
def _filter_one(self, filter: str):
"""Get the first item to match a filter.
:param filter: A meilisearch compatible filter statement.
:returns: A dict containing the results; If no results found, an empty dict."""
q = self.search("", filter)
if q["estimatedTotalHits"] != 0:
return q["hits"][0]
else:
return dict()
def get_barcode(self, barcode: str):
"""Get a specific barcode.
:param uuid: The barcode to search for."""
return self._filter_one(f"barcode = {barcode}")
# entrypoint
if __name__ == "__main__":
ivs = InventorySearch()