diff --git a/public/VERSION b/public/VERSION index 5ba4891..82d222b 100644 --- a/public/VERSION +++ b/public/VERSION @@ -1,4 +1,4 @@ -prev_commit=c3b6918ed24a5dab3fd1fec5fa949caf2cd63b69 -date=2026-01-09 16:09:55 -0500 -branch=36-lente-failing-to-run-on-vm -version=0.8-23-gc3b6918e +prev_commit=714b6725edffd1832edd40bc79cc8738de06fbc9 +date=2026-02-17 19:10:06 -0500 +branch=41-create-counting-example-with-barcode-scanner +version=0.8-32-g714b6725 diff --git a/public/prism/drivers/scanners/Scanners.py b/public/prism/drivers/scanners/Scanners.py new file mode 100644 index 0000000..4b30cd7 --- /dev/null +++ b/public/prism/drivers/scanners/Scanners.py @@ -0,0 +1,55 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Sistemi Corporation, copyright, all rights reserved, 2026 +Martin Guthrie + +""" +try: + # run locally + from stublogger import StubLogger +except: + # run from prism + from public.prism.drivers.common.stublogger import StubLogger + + +# this is the (example) version of the remote hardware +VERSION = "0.2.0" +DRIVER_TYPE = "SCANNER" + + +class Scanner(object): + """ Scanners HW Driver + - this driver actually does nothing (very little) + + """ + def __init__(self, loggerIn=None): + if loggerIn: self.logger = loggerIn + else: self.logger = StubLogger() + + self._stop_prism_player = False + self._close = False + + # TODO: function to send a command to the scanner to alert the User + # that prism is ready for the user to scan. Scanners + # support this feature. + + def stop_prism_player(self): + self._stop_prism_player = True + self.logger.info("stop_prism_player") + + def jig_closed_always(self): + """ "Auto" Jig Open/Close + - NOTE: because Prism calls here every 1sec and because Prism needs to + see the Jig Open/Close, it can take up to 2sec before Prism + is ready to Scan again + """ + if self._stop_prism_player: + self.logger.info(False) + return False + + # toggle response so that Prism thinks the jig is opening and closing. + temp = self._close + self._close = not temp + + return temp diff --git a/public/prism/drivers/scanners/hwdrv_scanners.py b/public/prism/drivers/scanners/hwdrv_scanners.py new file mode 100644 index 0000000..92b9ba0 --- /dev/null +++ b/public/prism/drivers/scanners/hwdrv_scanners.py @@ -0,0 +1,130 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Sistemi Corporation, copyright, all rights reserved, 2026 +Martin Guthrie + +""" +import os +import pyudev +import logging +from core.sys_log import pub_notice + +# import your hardware driver class, and details +from public.prism.drivers.scanners.Scanners import Scanner, VERSION, DRIVER_TYPE + +class HWDriver(object): + """ + HWDriver is a class that installs a HW driver into the shared state. + This is not the HW driver itself, just an installer. + + When a script is loaded, there is a config section that lists all the HWDriver + to call, for example, + + "config": { + # drivers: list of code to initialize the test environment, must be specified + "drivers": ["public.prism.drivers.fake.hwdrv_fake"] + }, + + drivers is a list, so multiple HWDriver can be installed. + + This is a fake HW driver to simulate and show what a HWDriver is required to do. + """ + SFN = os.path.basename(__file__) # for logging path + + # add as required + SCANNER_MANUFACTURERS = ["Datalogic", ] + + def __init__(self): + self.logger = logging.getLogger("{}".format(self.SFN)) + self.logger.info("Start") + self._num_chan = 0 + self.scanners = [] + + def discover_channels(self): + """ determine the number of channels, and popultae hw drivers into shared state + + [ {"id": i, # ~slot number of the channel (see Note 1) + "version": , # version of the driver + "hwdrv": , # instance of your hardware driver + + # optional + "close": None, # register a callback on closing the channel, or None + "play": jig_closed_detect # function for detecting jig closed + "show_pass_fail": jig_led # function for indicating pass/fail (like LED) + "show_msg": jig_display # function for indicating test status (like display) + + # not part of the required block + "unique_id": , # unique id of the hardware (for tracking purposes) + ... + }, ... + ] + + Note: + 1) The hw driver objects are expected to have an 'slot' field, the lowest + id is assigned to channel 0, the next highest to channel 1, etc + + :return: <#>, (use Zero for a device shared across all channels) + where #: >0 number of channels, + 0 does not indicate num channels, like a shared hardware driver + <0 error + + list of drivers + """ + sender = "{}.{}".format(self.SFN, __class__.__name__) # for debug purposes + + # ------------------------------------------------------------------ + # Your specific driver discovery code goes here + # + # Scan attached USB devices and look for scanners + context = pyudev.Context() + for device in context.list_devices(): + _manu = str(device.attributes.get("manufacturer")) + if _manu == "None": continue + + self.logger.info(_manu) + for m in self.SCANNER_MANUFACTURERS: + if m in str(device.attributes.get("manufacturer")): + _scanner = {"id": 0} # this will get re-indexed below + _scanner['hwdrv'] = Scanner() + _scanner['version'] = None + _scanner['close'] = None + _scanner['play'] = _scanner['hwdrv'].jig_closed_always + _scanner['show_pass_fail'] = None + _scanner['show_msg'] = None + _scanner['usb_path'] = device.device_path + self.scanners.append(_scanner) + + # sort based on USB path, slot 0, 1, 2, etc + self.scanners = sorted(self.scanners, key=lambda d: d['usb_path']) + + # fix the slot IDs as the slot order set by the USB path + for idx, t in enumerate(self.scanners): + t["id"] = idx + + self.logger.info(self.scanners) + self._num_chan = len(self.scanners) + + pub_notice("HWDriver:{}: Found {}!".format(self.SFN, self._num_chan), sender=sender) + self.logger.info("Done: {} channels".format(self._num_chan)) + return self._num_chan, DRIVER_TYPE, self.scanners + + def num_channels(self): + return self._num_chan + + +# =============================================================================================== +# Debugging code +# - Test your hardware discover here by running this file from PyCharm (be sure to set the working +# directory as ~/git/scripts, else imports will fail) +# - the purpose is to valid discover_channels() is working +if __name__ == '__main__': + logging.basicConfig() + logger = logging.getLogger() + logger.setLevel(logging.INFO) + + d = HWDriver() + num_channels, driver_type, drivers = d.discover_channels() + logger.info("discover_channels: num channels {}, type {}, drivers {}".format(num_channels, + driver_type, + drivers)) diff --git a/public/prism/drivers/scanners/stublogger.py b/public/prism/drivers/scanners/stublogger.py new file mode 100644 index 0000000..2aaffc6 --- /dev/null +++ b/public/prism/drivers/scanners/stublogger.py @@ -0,0 +1,16 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Sistemi Corporation, copyright, all rights reserved, 2019 +Martin Guthrie + +""" + +class StubLogger(object): + """ stubb out logger if none is provided""" + # TODO: support print to console. + def info(self, *args, **kwargs): pass + def error(self, *args, **kwargs): pass + def debug(self, *args, **kwargs): pass + def warning(self, *args, **kwargs): pass + def critical(self, *args, **kwargs): pass \ No newline at end of file diff --git a/public/prism/scripts/example/barcode_count_v0/barcode_test_01.pdf b/public/prism/scripts/example/barcode_count_v0/barcode_test_01.pdf new file mode 100644 index 0000000..4f67049 Binary files /dev/null and b/public/prism/scripts/example/barcode_count_v0/barcode_test_01.pdf differ diff --git a/public/prism/scripts/example/barcode_count_v0/count_01.scr b/public/prism/scripts/example/barcode_count_v0/count_01.scr new file mode 100644 index 0000000..093afdf --- /dev/null +++ b/public/prism/scripts/example/barcode_count_v0/count_01.scr @@ -0,0 +1,39 @@ +// Example: Count barcoded items +// Barcode string looks like: P###-S##### +// info:product is filled in from the barcode +// dB key0 is used for the serial number +// Multiple Scanners into One Prism PC is supported. +// See example barcode_test_01.pdf for example barcodes used in this test. +// NOTE: To STOP the test, use the STOP Barcode +{ + "subs": { + "Lot": { + "title": "Lot (format #####)", + "type" : "str", "widget": "textinput", "regex": "\\d{5}", "default": "95035" + }, + "Loc": { + "title": "Location", + "type" : "str", "widget": "select", "choices": ["canada/ontario/milton", "us/newyork/bufalo"] + } + }, + "info": { + "product": "N/A", + "bom": "N/A", + "lot": "%%Lot", + "location": "%%Loc" + }, + "config": { + "fail_fast": true, + "drivers": ["public.prism.drivers.scanners.hwdrv_scanners"] + }, + "tests": [ + { + "module": "public.prism.scripts.example.barcode_count_v0.tst00xx", + "options": { "fail_fast": false }, + "items": [ + {"id": "TST000_scan", "enable": true, "regex": "^P\\d{3}-S\\d{5}$", "timeout": 100 }, + {"id": "TST0xxTEARDOWN", "enable": true } + ] + } + ] +} diff --git a/public/prism/scripts/example/barcode_count_v0/tst00xx.py b/public/prism/scripts/example/barcode_count_v0/tst00xx.py new file mode 100644 index 0000000..c7ecf50 --- /dev/null +++ b/public/prism/scripts/example/barcode_count_v0/tst00xx.py @@ -0,0 +1,94 @@ +#! /usr/bin/env python +# -*- coding: utf-8 -*- +""" +Sistemi Corporation, copyright, all rights reserved, 2026 +Martin Guthrie + +""" +import re +import logging +from core.test_item import TestItem +from public.prism.api import ResultAPI + +from public.prism.drivers.scanners.Scanners import DRIVER_TYPE as DRIVER_TYPE_SCANNER + + +# file and class name must match +class tst00xx(TestItem): + + def __init__(self, controller, chan, shared_state): + super().__init__(controller, chan, shared_state) + self.logger = logging.getLogger("tst00xx.{}".format(self.chan)) + + def TST0xxTEARDOWN(self): + """ Always called at the end of testing + - process any cleanup, closing, etc + + {"id": "TST0xxTEARDOWN", "enable": true }, + + """ + ctx = self.item_start() # always first line of test + self.item_end() # always last line of test + + def TST000_scan(self): + """ Scan barcode + - a regex is supplied to confirm barcode format + + {"id": "TST000_scan, "enable": true, "regex": "^P\d{3}-S\d{5}$", "timeout": 10 }, + + """ + ctx = self.item_start() # always first line of test + + self.log_bullet("Scan barcode") + + user_text = self.input_textbox("Scan:", "change") + # Note input_textbox also has a timeout parameter which should be set to + # something just lower than the test item timeout. If input_textbox + # has a timeout, then we should probably STOP Prism player just like below. + + if user_text["success"]: + self.log_bullet(f"SCAN: {user_text['textbox']}") + + # STOPPING - use a scanned "cookie" to indicate stop + if user_text["textbox"] == "P999-STOP": + # Stop Prism Player + self.log_bullet("STOPPING PRISM PLAYER") + drivers = self.shared_state.get_drivers(self.chan, type=DRIVER_TYPE_SCANNER) + drivers[0]["obj"]["hwdrv"].stop_prism_player() + self.item_end(ResultAPI.RECORD_RESULT_DISABLED) + return + + # qualify the text here + if re.match(ctx.item.regex, user_text["textbox"]): + + # update the product from the scanned item + product = user_text["textbox"].split("-")[0] + _info = {"product": product,} + ctx.record.record_info_set(_info) + + sn = user_text["textbox"].split("-")[1] + ctx.record.add_key("sn", sn, slot=0) + _result = ResultAPI.RECORD_RESULT_PASS + + _bullet = f"Barcode: {user_text['textbox']}" + + # Or could make a measurement + # Note: ResultAPI.UNIT_STRING is used to format the measurement correctly in JSON + #_, _result, _bullet = ctx.record.measurement("input", user_text["textbox"], ResultAPI.UNIT_STRING) + + # Here, code could reach out to a dB and do some other kind of work + + else: + _result = ResultAPI.RECORD_RESULT_FAIL + _, _result, _bullet = ctx.record.measurement("input", + user_text["textbox"], + ResultAPI.UNIT_STRING, + force_fail=True) + self.log_bullet(_bullet) + + else: + # operator probably times out... + _result = ResultAPI.RECORD_RESULT_FAIL + self.log_bullet(user_text.get("err", "UNKNOWN ERROR")) + + self.item_end(_result) # always last line of test