API Reference
ExecutionEngine
- class exengine.kernel.executor.ExecutionEngine
- check_exceptions()
Check if any exceptions have been raised during the execution of events and raise them if so
- publish_notification(notification)
Publish a notification by adding it the publish queue
- register(id, obj, schema=<class 'exengine.kernel.executor.DeviceBase'>)
Wraps an object for use with the ExecutionEngine
The wrapper exposes the public properties and attributes of the wrapped object, converting all get and set access, as well as method calls to Events. Private methods and attributes are not exposed.
After wrapping, the original object should not be used directly anymore. All access should be done through the wrapper, which takes care of thread safety, synchronization, etc.
- Args:
id: Unique id (name) of the device, used by the ExecutionEngine. obj: object to wrap. The object should only be registered once. Use of the original object should be avoided after wrapping,
since access to the original object is not thread safe or otherwise managed by the ExecutionEngine.
- shutdown(immediately=False, wait=True)
Stop all devices, then stop all threads in the thread pool
- Args:
- immediately: If True, all pending events are discarded.
If False, the device finishes all pending events before shutting down.
wait: If True, waits for the device thread to finish and the device to be destroyed
- submit(event)
Submit an event for execution in the worker thread pool. Events are executed as soon as the conditions for execution are met.
The execution threads test if an event is ready to execute by calling the can_start method of the event. If the event is not ready to start, the event is skipped and the next event is checked. After all events have been checked, the threads are suspended.
Events that report can_start = False are responsible for waking up the suspended threads as soon as they become ready to be executed. To enable this, can_start takes a notification listener as an argument, which is notified when the event becomes ready to start (or at least needs to be re-evaluated).
todo: prioritize events?
- Return type:
- See Also:
DeviceBase.submit for submitting events to a specific device.
Returns:
ExecutionFuture.
Notes:
If a callable object with no arguments is submitted, it will be automatically wrapped in a AnonymousCallableEvent.
- subscribe_to_notifications(subscriber, notification_type=None)
Subscribe an object to receive notifications.
- Return type:
None
- Args:
- subscriber (Callable[[Notification], Any]): A callable that takes a single
Notification object as an argument.
- notification_type (Union[NotificationCategory, Type], optional): The type of notification to subscribe to.
this can either be a NotificationCategory or a specific subclass of Notification.
- Returns:
None
- Raises:
TypeError: If the subscriber is not a callable taking exactly one argument.
- unsubscribe_from_notifications(subscriber)
Unsubscribe an object from receiving notifications.
- Return type:
None
- Args:
subscriber (Callable[[Notification], Any]): The callable that was previously subscribed to notifications.
- Returns:
None
- class exengine.kernel.ex_future.ExecutionFuture(event)
- await_data(coordinates, return_data=False, return_metadata=False, processed=False, stored=False)
(Only for AcquisitionEvents that also inherit from DataProducing) Block until the event’s data is acquired/processed/saved, and optionally return the data/metadata. When waiting for the data to be acquired (i.e. before it is processed), since there is no way to guarantee that this function is called before the data is acquired, the data may have already been saved and not readily available in RAM. In this case, the data will be read from disk.
Parameters:
- coordinates: Union[DataCoordinates, Dict[str, Union[int, str]], DataCoordinatesIterator, Sequence[DataCoordinates], Sequence[Dict[str, Union[int, str]]]
A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of DataCoordinates objects/dictionaries. If None, this function will block until the next data is acquired/processed/saved
- return_data: bool
whether to return the data
- return_metadata: bool
whether to return the metadata
- processed: bool
whether to wait until data has been processed. If not data processor is in use, then this parameter has no effect
- stored: bool
whether to wait for data that has been stored. If the call to await data occurs before the data gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
- await_execution(timeout=None)
Block until the event is complete. If event.execute returns a value, it will be returned here. If event.execute raises an exception, it will be raised here as well
- Return type:
Any
- await_notification(notification)
Block until a specific notification is received. If this notification was already received, this function will return immediately.
- is_execution_complete()
Check if the event has completed
- Return type:
bool
- class exengine.device_types.Device(name: str, engine: ExecutionEngine, *args, **kwargs)
Base class that causes the object to be automatically registered on creation.
- Usage:
- class MyDevice(Device):
- def __init__(self, name: str, engine: ExecutionEngine, …):
…
engine = ExecutionEngine() device = MyDevice(“device_name”, engine, …)
- Has the same effect as:
- class MyDevice:
…
engine = ExecutionEngine() device = engine.register(“device_name”, MyDevice(…))
- class exengine.kernel.ex_event_base.ExecutorEvent(*args, **kwargs)
- can_start(condition)
Check if the event can start execution. This method is called by the executor before starting the event.
- Return type:
bool
- abstract execute()
Execute the event. This event is called by the executor, and should be overriden by subclasses to implement the event’s functionality.
- Return type:
Any
- publish_notification(notification)
Publish a notification that will be accessible through Futures and made available to any notification subscribers.
- class exengine.kernel.notification_base.Notification(payload=None)
Base class for creating notifications. Notifications are dispatched by the execution engine and related components to provide asynchronous status updates. They can support a payload of arbitrary type, which can be used to pass data or other information along with the notification. However, notifications are designed to be numerous and lightweight, so using the payload to pass large amounts or complex data is discouraged.
To create a notification, subclass this class and set the category and description class variables and optionally define a payload type. The Generic type parameter should be the type of the payload, if any. @dataclass can be used to simplify the definition of notifications classes, but is not required.
For example:
>>> @dataclass >>> class DataAcquired(Notification[DataCoordinates]): >>> >>> # Define the category and description of the notification shared by all instances of this class >>> category = NotificationCategory.Data >>> description = "Data has been acquired by a camera or other data-producing device and is now available" >>> >>> # payload is the data coordinates of the acquired >>> >>> # Create an instance of the notification >>> notification = DataAcquired(payload=DataCoordinates(t=1, y=2, channel="DAPI"))
Data
- class exengine.kernel.data_storage_base.DataStorage
- close()
Close the dataset, releasing any resources it holds. No more images will be added or requested
- finish()
No more data will be added to the dataset. This method should be called after the last call to put() and makes the dataset read-only.
- get_data(data_coordinates)
Read a single data corresponding to the given coordinates
- Return type:
ndarray[tuple[int,...],dtype[Any]]
- get_metadata(data_coordinates)
Read metadata corresponding to the given coordinates
- Return type:
TypeAliasType
- put(data_coordinates, data, metadata)
Add data and corresponding metadata to the dataset. Once this method has been called, the data and metadata should be immediately available to be read by get_data and get_metadata. For disk-backed storage_backends, this may require temporarily caching the data in memory until it can be written to disk.
Parameters
- data_coordinatesDataCoordinates or dict
Coordinates of the data
- datanpt.NDArray
Data to be stored
- metadatadict
Metadata associated with the data
- class exengine.kernel.data_coords.DataCoordinates(coordinate_dict=None, time=None, channel=None, z=None, **kwargs)
Represents the coordinates of a piece of data (conventionally, a single 2D image). This is a convenience wrapper around a dictionary of axis name to axis value where the axis value can be either an integer or a string.
- class exengine.kernel.data_coords.DataCoordinatesIterator(*args, **kwargs)
- classmethod create(image_coordinate_iterable)
Autoconvert ImageCoordinates, dictionaries, or Iterables thereof to ImageCoordinatesIterator
- Parameters:
image_coordinate_iterable (
Union[Iterable[DataCoordinates],Iterable[Dict[str,Union[int,str]]],DataCoordinates,Dict[str,Union[int,str,DataCoordinatesIterator]]]) – an ImageCoordinates object, a dictionary, an iterable of ImageCoordinates or dictionaries, or an ImageCoordinatesIterator. Valid options include a list of ImageCoordinates, a list of dictionaries, a generator of ImageCoordinates, a generator of dictionaries, etc.
- is_finite()
Check if this iterator is finite (i.e. will eventually run out of elements)
- Return type:
bool
- might_produce_coordinates(coordinates)
Check if this iterator might produce the given coordinates. If this iterator is backed by a finite list of ImageCoordinates, this can be checked definitively. If it is backed by something infinite (like a generator), it will only be possible if more information about the generator is known (e.g. it produces {time: 0}, {time:1}, and continues incrementing).
If not possible to determine definitely, return None
- Return type:
Optional[bool]
- class exengine.kernel.data_handler.DataHandler(engine, storage, process_function=None)
Object that handles acquired data while it is waiting to be saved. This object is thread safe and manages the handoff of images while they are waiting to be saved, providing temporary access to it along the way.
This class manages one or two queues/threads, depending on whether a processing function is provided. If a processing function is provided, the data will be processed in a separate thread before being passed to the data storage_backends object. If no processing function is provided, the data will be passed directly to the data storage_backends object.
- await_completion()
Wait for the threads to finish
- finish()
Signal to the data handler that no more data will be added. This will cause all threads to initiate shutdown and call the finish() method of the storage_backends object.
- get(coordinates, return_data=True, return_metadata=True, processed=None)
Get an image and associated metadata. If they are present, either in the intake queue or the storage_backends queue (if it exists), return them. If not present, get them from the storage_backends object. If not present there, return None
- Return type:
Optional[Tuple[ndarray[tuple[int,...],dtype[Any]],TypeAliasType]]
- put(coordinates, image, metadata, execution_future)
Hand off this image to the data handler. It will handle handoff to the storage_backends object and image processing if requested, as well as providing temporary access to the image and metadata as it passes through this pipeline. If an acquisition future is provided, it will be notified when the image arrives, is processed, and is stored.
Micro-Manager Devices
Standard Events
Detector Events
- class exengine.events.detector_events.ReadoutData(data_coordinates_iterator, detector=None, data_handler=None, num_blocks=None, stop_on_empty=False)
Readout one or more blocks of data (e.g. images) and associated metadata from a Detector device (e.g. a camera)
Parameters:
- data_coordinate_iterator: Iterable[DataCoordinates]
An iterator or list of DataCoordinates objects, which specify the coordinates of the data that will be read out, should be able to provide at least num_images elements (or indefinitely if num_images is None)
- detector: Union[Detector, str]
The Detector object to read data from. Can be the object itself, or the name of the object in the ExecutionEngine’s device registry.
- num_blocks: int
The number of pieces of data (e.g. images) to read out. If None, the readout will continue until the data_coordinate_iterator is exhausted or the Detector is stopped and no more images are available.
- stop_on_empty: bool
(Experimental) If True, the readout will stop when the detector is stopped when there is no data available to read
- data_handler: DataHandler
The DataHandler object that will handle the data read out by this event
- class exengine.events.detector_events.DataAcquiredNotification(payload=None)
- class exengine.events.detector_events.StartCapture(num_blocks, detector=None)
Special device instruction that captures images from a Detector device (e.g. a camera)
- class exengine.events.detector_events.StartContinuousCapture(detector=None)
Tell Detector device to start capturing images continuously, until a stop signal is received
- class exengine.events.detector_events.StopCapture(detector=None)
Tell Detector device to start capturing data continuously, until a stop signal is received
Positioner Events
- class exengine.events.positioner_events.SetPosition2DEvent(device, position)
Set the position of a movable device
- class exengine.events.positioner_events.SetTriggerable2DPositionsEvent(device, positions)
Set the position of a movable device
- class exengine.events.positioner_events.SetPosition1DEvent(device, position)
Set the position of a movable device
- class exengine.events.positioner_events.SetTriggerable1DPositionsEvent(device, positions)
Send a sequence of positions to a 1D positioner that will be triggered by TTL pulses
- class exengine.events.positioner_events.StopTriggerablePositionSequenceEvent(device)
Stop the current triggerable sequence
Property Events
- class exengine.events.property_events.SetPropertiesEvent(devices_prop_names_values)
Set one or more properties (i.e. attributes) of one or more devices
- class exengine.events.property_events.SetTriggerablePropertySequencesEvent(property_sequences)
Set a sequence of must for properties of different devices to be cycled through by hardware triggers The properties should be triggerable
The property_sequence should be a list of tuples, each containing: - The name of the device or the device object itself - The name of the property to set - The sequence of values (e.g. a list) to set the property to
- class exengine.events.property_events.StopTriggerablePropertySequencesEvent(property_sequences)
Stop the current triggerable sequence for one or more properties of different devices
The property_sequence should be a list of tuples, each containing: - The name of the device or the device object itself - The name of the property to with a property sequence to stop
Miscellaneous Events
- class exengine.events.misc_events.Sleep(time_s)
Sleep for a specified amount of time
- class exengine.events.misc_events.SetTimeEvent(time_index, min_start_time=None)
Set the time point
Multi-Dimensional Acquisition Events
- exengine.events.multi_d_events.multi_d_acquisition_events(num_time_points=None, time_interval_s=0, z_start=None, z_end=None, z_step=None, channel_group=None, channels=None, channel_exposures_ms=None, xy_positions=None, xyz_positions=None, position_labels=None, order='tpcz', sequence=None, camera=None, xy_device=None, z_device=None)