Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions public/VERSION
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
prev_commit=c3b6918ed24a5dab3fd1fec5fa949caf2cd63b69
date=2026-01-09 16:09:55 -0500
branch=36-lente-failing-to-run-on-vm
version=0.8-23-gc3b6918e
prev_commit=714b6725edffd1832edd40bc79cc8738de06fbc9
date=2026-02-17 19:10:06 -0500
branch=41-create-counting-example-with-barcode-scanner
version=0.8-32-g714b6725
55 changes: 55 additions & 0 deletions public/prism/drivers/scanners/Scanners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sistemi Corporation, copyright, all rights reserved, 2026
Martin Guthrie

"""
try:
# run locally
from stublogger import StubLogger
except:
# run from prism
from public.prism.drivers.common.stublogger import StubLogger


# this is the (example) version of the remote hardware
VERSION = "0.2.0"
DRIVER_TYPE = "SCANNER"


class Scanner(object):
""" Scanners HW Driver
- this driver actually does nothing (very little)

"""
def __init__(self, loggerIn=None):
if loggerIn: self.logger = loggerIn
else: self.logger = StubLogger()

self._stop_prism_player = False
self._close = False

# TODO: function to send a command to the scanner to alert the User
# that prism is ready for the user to scan. Scanners
# support this feature.

def stop_prism_player(self):
self._stop_prism_player = True
self.logger.info("stop_prism_player")

def jig_closed_always(self):
""" "Auto" Jig Open/Close
- NOTE: because Prism calls here every 1sec and because Prism needs to
see the Jig Open/Close, it can take up to 2sec before Prism
is ready to Scan again
"""
if self._stop_prism_player:
self.logger.info(False)
return False

# toggle response so that Prism thinks the jig is opening and closing.
temp = self._close
self._close = not temp

return temp
130 changes: 130 additions & 0 deletions public/prism/drivers/scanners/hwdrv_scanners.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sistemi Corporation, copyright, all rights reserved, 2026
Martin Guthrie

"""
import os
import pyudev
import logging
from core.sys_log import pub_notice

# import your hardware driver class, and details
from public.prism.drivers.scanners.Scanners import Scanner, VERSION, DRIVER_TYPE

class HWDriver(object):
"""
HWDriver is a class that installs a HW driver into the shared state.
This is not the HW driver itself, just an installer.

When a script is loaded, there is a config section that lists all the HWDriver
to call, for example,

"config": {
# drivers: list of code to initialize the test environment, must be specified
"drivers": ["public.prism.drivers.fake.hwdrv_fake"]
},

drivers is a list, so multiple HWDriver can be installed.

This is a fake HW driver to simulate and show what a HWDriver is required to do.
"""
SFN = os.path.basename(__file__) # for logging path

# add as required
SCANNER_MANUFACTURERS = ["Datalogic", ]

def __init__(self):
self.logger = logging.getLogger("{}".format(self.SFN))
self.logger.info("Start")
self._num_chan = 0
self.scanners = []

def discover_channels(self):
""" determine the number of channels, and popultae hw drivers into shared state

[ {"id": i, # ~slot number of the channel (see Note 1)
"version": <VERSION>, # version of the driver
"hwdrv": <object>, # instance of your hardware driver

# optional
"close": None, # register a callback on closing the channel, or None
"play": jig_closed_detect # function for detecting jig closed
"show_pass_fail": jig_led # function for indicating pass/fail (like LED)
"show_msg": jig_display # function for indicating test status (like display)

# not part of the required block
"unique_id": <unique_id>, # unique id of the hardware (for tracking purposes)
...
}, ...
]

Note:
1) The hw driver objects are expected to have an 'slot' field, the lowest
id is assigned to channel 0, the next highest to channel 1, etc

:return: <#>, <list> (use Zero for a device shared across all channels)
where #: >0 number of channels,
0 does not indicate num channels, like a shared hardware driver
<0 error

list of drivers
"""
sender = "{}.{}".format(self.SFN, __class__.__name__) # for debug purposes

# ------------------------------------------------------------------
# Your specific driver discovery code goes here
#
# Scan attached USB devices and look for scanners
context = pyudev.Context()
for device in context.list_devices():
_manu = str(device.attributes.get("manufacturer"))
if _manu == "None": continue

self.logger.info(_manu)
for m in self.SCANNER_MANUFACTURERS:
if m in str(device.attributes.get("manufacturer")):
_scanner = {"id": 0} # this will get re-indexed below
_scanner['hwdrv'] = Scanner()
_scanner['version'] = None
_scanner['close'] = None
_scanner['play'] = _scanner['hwdrv'].jig_closed_always
_scanner['show_pass_fail'] = None
_scanner['show_msg'] = None
_scanner['usb_path'] = device.device_path
self.scanners.append(_scanner)

# sort based on USB path, slot 0, 1, 2, etc
self.scanners = sorted(self.scanners, key=lambda d: d['usb_path'])

# fix the slot IDs as the slot order set by the USB path
for idx, t in enumerate(self.scanners):
t["id"] = idx

self.logger.info(self.scanners)
self._num_chan = len(self.scanners)

pub_notice("HWDriver:{}: Found {}!".format(self.SFN, self._num_chan), sender=sender)
self.logger.info("Done: {} channels".format(self._num_chan))
return self._num_chan, DRIVER_TYPE, self.scanners

def num_channels(self):
return self._num_chan


# ===============================================================================================
# Debugging code
# - Test your hardware discover here by running this file from PyCharm (be sure to set the working
# directory as ~/git/scripts, else imports will fail)
# - the purpose is to valid discover_channels() is working
if __name__ == '__main__':
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.INFO)

d = HWDriver()
num_channels, driver_type, drivers = d.discover_channels()
logger.info("discover_channels: num channels {}, type {}, drivers {}".format(num_channels,
driver_type,
drivers))
16 changes: 16 additions & 0 deletions public/prism/drivers/scanners/stublogger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sistemi Corporation, copyright, all rights reserved, 2019
Martin Guthrie

"""

class StubLogger(object):
""" stubb out logger if none is provided"""
# TODO: support print to console.
def info(self, *args, **kwargs): pass
def error(self, *args, **kwargs): pass
def debug(self, *args, **kwargs): pass
def warning(self, *args, **kwargs): pass
def critical(self, *args, **kwargs): pass
Binary file not shown.
39 changes: 39 additions & 0 deletions public/prism/scripts/example/barcode_count_v0/count_01.scr
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Example: Count barcoded items
// Barcode string looks like: P###-S#####
// info:product is filled in from the barcode
// dB key0 is used for the serial number
// Multiple Scanners into One Prism PC is supported.
// See example barcode_test_01.pdf for example barcodes used in this test.
// NOTE: To STOP the test, use the STOP Barcode
{
"subs": {
"Lot": {
"title": "Lot (format #####)",
"type" : "str", "widget": "textinput", "regex": "\\d{5}", "default": "95035"
},
"Loc": {
"title": "Location",
"type" : "str", "widget": "select", "choices": ["canada/ontario/milton", "us/newyork/bufalo"]
}
},
"info": {
"product": "N/A",
"bom": "N/A",
"lot": "%%Lot",
"location": "%%Loc"
},
"config": {
"fail_fast": true,
"drivers": ["public.prism.drivers.scanners.hwdrv_scanners"]
},
"tests": [
{
"module": "public.prism.scripts.example.barcode_count_v0.tst00xx",
"options": { "fail_fast": false },
"items": [
{"id": "TST000_scan", "enable": true, "regex": "^P\\d{3}-S\\d{5}$", "timeout": 100 },
{"id": "TST0xxTEARDOWN", "enable": true }
]
}
]
}
94 changes: 94 additions & 0 deletions public/prism/scripts/example/barcode_count_v0/tst00xx.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
Sistemi Corporation, copyright, all rights reserved, 2026
Martin Guthrie

"""
import re
import logging
from core.test_item import TestItem
from public.prism.api import ResultAPI

from public.prism.drivers.scanners.Scanners import DRIVER_TYPE as DRIVER_TYPE_SCANNER


# file and class name must match
class tst00xx(TestItem):

def __init__(self, controller, chan, shared_state):
super().__init__(controller, chan, shared_state)
self.logger = logging.getLogger("tst00xx.{}".format(self.chan))

def TST0xxTEARDOWN(self):
""" Always called at the end of testing
- process any cleanup, closing, etc

{"id": "TST0xxTEARDOWN", "enable": true },

"""
ctx = self.item_start() # always first line of test
self.item_end() # always last line of test

def TST000_scan(self):
""" Scan barcode
- a regex is supplied to confirm barcode format

{"id": "TST000_scan, "enable": true, "regex": "^P\d{3}-S\d{5}$", "timeout": 10 },

"""
ctx = self.item_start() # always first line of test

self.log_bullet("Scan barcode")

user_text = self.input_textbox("Scan:", "change")
# Note input_textbox also has a timeout parameter which should be set to
# something just lower than the test item timeout. If input_textbox
# has a timeout, then we should probably STOP Prism player just like below.

if user_text["success"]:
self.log_bullet(f"SCAN: {user_text['textbox']}")

# STOPPING - use a scanned "cookie" to indicate stop
if user_text["textbox"] == "P999-STOP":
# Stop Prism Player
self.log_bullet("STOPPING PRISM PLAYER")
drivers = self.shared_state.get_drivers(self.chan, type=DRIVER_TYPE_SCANNER)
drivers[0]["obj"]["hwdrv"].stop_prism_player()
self.item_end(ResultAPI.RECORD_RESULT_DISABLED)
return

# qualify the text here
if re.match(ctx.item.regex, user_text["textbox"]):

# update the product from the scanned item
product = user_text["textbox"].split("-")[0]
_info = {"product": product,}
ctx.record.record_info_set(_info)

sn = user_text["textbox"].split("-")[1]
ctx.record.add_key("sn", sn, slot=0)
_result = ResultAPI.RECORD_RESULT_PASS

_bullet = f"Barcode: {user_text['textbox']}"

# Or could make a measurement
# Note: ResultAPI.UNIT_STRING is used to format the measurement correctly in JSON
#_, _result, _bullet = ctx.record.measurement("input", user_text["textbox"], ResultAPI.UNIT_STRING)

# Here, code could reach out to a dB and do some other kind of work

else:
_result = ResultAPI.RECORD_RESULT_FAIL
_, _result, _bullet = ctx.record.measurement("input",
user_text["textbox"],
ResultAPI.UNIT_STRING,
force_fail=True)
self.log_bullet(_bullet)

else:
# operator probably times out...
_result = ResultAPI.RECORD_RESULT_FAIL
self.log_bullet(user_text.get("err", "UNKNOWN ERROR"))

self.item_end(_result) # always last line of test