API Reference
ExecutionEngine
- class exengine.kernel.executor.ExecutionEngine(*args, **kwargs)
- check_exceptions()
Check if any exceptions have been raised during the execution of events and raise them if so
- classmethod get_device(device_name)
Get a device by name
- classmethod on_main_executor_thread()
Check if the current thread is an executor thread
- publish_notification(notification)
Publish a notification by adding it the publish queue
- shutdown()
Stop all threads managed by this executor and wait for them to finish
- submit(event_or_events, thread_name=None, prioritize=False, use_free_thread=False)
Submit one or more acquisition events or callable objects for execution.
This method handles the submission of acquisition events or callable objects to be executed on active threads. It provides options for event prioritization, thread allocation, and performance optimization.
- Return type:
Union[ExecutionFuture,Iterable[ExecutionFuture]]
Parameters:
- event_or_eventsUnion[ExecutorEvent, Iterable[ExecutorEvent], Callable[[], Any], Iterable[Callable[[], Any]]]
A single ExecutorEvent, an iterable of ExecutorEvents, or a callable object with no arguments.
- thread_namestr, optional (default=None)
Name of the thread to submit the event to. If None, the thread is determined by the ‘use_free_thread’ parameter.
- prioritizebool, optional (default=False)
If True, execute the event(s) before any others in the queue on its assigned thread. Useful for system-wide changes affecting other events, like hardware adjustments.
- use_free_threadbool, optional (default=False)
If True, execute the event(s) on an available thread with an empty queue, creating a new thread if needed. Useful for operations like cancelling or stopping events awaiting signals. If False, execute on the primary thread.
Returns:
- Union[AcquisitionFuture, Iterable[AcquisitionFuture]]
For a single event or callable: returns a single ExecutionFuture. For multiple events: returns an Iterable of ExecutionFutures.
Notes:
Use ‘prioritize’ for critical system changes that should occur before other queued events.
‘use_free_thread’ is essential for operations that need to run independently, like cancellation events.
If a callable object with no arguments is submitted, it will be automatically wrapped in a AnonymousCallableEvent.
- subscribe_to_notifications(subscriber, notification_type=None)
Subscribe an object to receive notifications.
- Return type:
None
- Args:
- subscriber (Callable[[Notification], Any]): A callable that takes a single
Notification object as an argument.
- notification_type (Union[NotificationCategory, Type], optional): The type of notification to subscribe to.
this can either be a NotificationCategory or a specific subclass of Notification.
- Returns:
None
- Raises:
TypeError: If the subscriber is not a callable taking exactly one argument.
- unsubscribe_from_notifications(subscriber)
Unsubscribe an object from receiving notifications.
- Return type:
None
- Args:
subscriber (Callable[[Notification], Any]): The callable that was previously subscribed to notifications.
- Returns:
None
- class exengine.kernel.ex_future.ExecutionFuture(event)
- await_data(coordinates, return_data=False, return_metadata=False, processed=False, stored=False)
(Only for AcquisitionEvents that also inherit from DataProducing) Block until the event’s data is acquired/processed/saved, and optionally return the data/metadata. When waiting for the data to be acquired (i.e. before it is processed), since there is no way to guarantee that this function is called before the data is acquired, the data may have already been saved and not readily available in RAM. In this case, the data will be read from disk.
Parameters:
- coordinates: Union[DataCoordinates, Dict[str, Union[int, str]], DataCoordinatesIterator, Sequence[DataCoordinates], Sequence[Dict[str, Union[int, str]]]
A single DataCoordinates object/dictionary, or Sequence (i.e. list or tuple) of DataCoordinates objects/dictionaries. If None, this function will block until the next data is acquired/processed/saved
- return_data: bool
whether to return the data
- return_metadata: bool
whether to return the metadata
- processed: bool
whether to wait until data has been processed. If not data processor is in use, then this parameter has no effect
- stored: bool
whether to wait for data that has been stored. If the call to await data occurs before the data gets passed off to the storage_backends class, then it will be stored in memory and returned immediately without having to retrieve
- await_execution(timeout=None)
Block until the event is complete. If event.execute returns a value, it will be returned here. If event.execute raises an exception, it will be raised here as well
- Return type:
Any
- await_notification(notification)
Block until a specific notification is received. If this notification was already received, this function will return immediately.
- is_execution_complete()
Check if the event has completed
- Return type:
bool
- class exengine.device_types.Device(name, no_executor=False, no_executor_attrs=('_name',))
Required base class for all devices usable with the execution engine
Device classes should inherit from this class and implement the abstract methods. The DeviceMetaclass will wrap all methods and attributes in the class to make them thread-safe and to optionally record all method calls and attribute accesses.
Attributes with a trailing _noexec will not be wrapped and will be executed directly on the calling thread. This is useful for attributes that are not hardware related and can bypass the complexity of the executor.
Device implementations can also implement functionality through properties (i.e. attributes that are actually methods) by defining a getter and setter method for the property.
- class exengine.kernel.ex_event_base.ExecutorEvent(*args, **kwargs)
- abstract execute()
Execute the event. This event is called by the executor, and should be overriden by subclasses to implement the event’s functionality.
- Return type:
Any
- publish_notification(notification)
Publish a notification that will be accessible through Futures and made available to any notification subscribers.
- class exengine.kernel.notification_base.Notification(payload=None)
Base class for creating notifications. Notifications are dispatched by the execution engine and related components to provide asynchronous status updates. They can support a payload of arbitrary type, which can be used to pass data or other information along with the notification. However, notifications are designed to be numerous and lightweight, so using the payload to pass large amounts or complex data is discouraged.
To create a notification, subclass this class and set the category and description class variables and optionally define a payload type. The Generic type parameter should be the type of the payload, if any. @dataclass can be used to simplify the definition of notifications classes, but is not required.
For example:
>>> @dataclass >>> class DataAcquired(Notification[DataCoordinates]): >>> >>> # Define the category and description of the notification shared by all instances of this class >>> category = NotificationCategory.Data >>> description = "Data has been acquired by a camera or other data-producing device and is now available" >>> >>> # payload is the data coordinates of the acquired >>> >>> # Create an instance of the notification >>> notification = DataAcquired(payload=DataCoordinates(t=1, y=2, channel="DAPI"))
Data
- class exengine.kernel.data_storage_base.DataStorage
- close()
Close the dataset, releasing any resources it holds. No more images will be added or requested
- finish()
No more data will be added to the dataset. This method should be called after the last call to put() and makes the dataset read-only.
- get_data(data_coordinates)
Read a single data corresponding to the given coordinates
- Return type:
ndarray[Any,dtype[Any]]
- get_metadata(data_coordinates)
Read metadata corresponding to the given coordinates
- Return type:
TypeAliasType
- put(data_coordinates, data, metadata)
Add data and corresponding metadata to the dataset. Once this method has been called, the data and metadata should be immediately available to be read by get_data and get_metadata. For disk-backed storage_backends, this may require temporarily caching the data in memory until it can be written to disk.
Parameters
- data_coordinatesDataCoordinates or dict
Coordinates of the data
- datanpt.NDArray
Data to be stored
- metadatadict
Metadata associated with the data
- class exengine.kernel.data_coords.DataCoordinates(coordinate_dict=None, time=None, channel=None, z=None, **kwargs)
Represents the coordinates of a piece of data (conventionally, a single 2D image). This is a convenience wrapper around a dictionary of axis name to axis value where the axis value can be either an integer or a string.
- class exengine.kernel.data_coords.DataCoordinatesIterator(*args, **kwargs)
- classmethod create(image_coordinate_iterable)
Autoconvert ImageCoordinates, dictionaries, or Iterables thereof to ImageCoordinatesIterator
- Parameters:
image_coordinate_iterable (
Union[Iterable[DataCoordinates],Iterable[Dict[str,Union[int,str]]],DataCoordinates,Dict[str,Union[int,str,DataCoordinatesIterator]]]) – an ImageCoordinates object, a dictionary, an iterable of ImageCoordinates or dictionaries, or an ImageCoordinatesIterator. Valid options include a list of ImageCoordinates, a list of dictionaries, a generator of ImageCoordinates, a generator of dictionaries, etc.
- is_finite()
Check if this iterator is finite (i.e. will eventually run out of elements)
- Return type:
bool
- might_produce_coordinates(coordinates)
Check if this iterator might produce the given coordinates. If this iterator is backed by a finite list of ImageCoordinates, this can be checked definitively. If it is backed by something infinite (like a generator), it will only be possible if more information about the generator is known (e.g. it produces {time: 0}, {time:1}, and continues incrementing).
If not possible to determine definitely, return None
- Return type:
Optional[bool]
- class exengine.kernel.data_handler.DataHandler(storage, process_function=None, _executor=None)
Object that handles acquired data while it is waiting to be saved. This object is thread safe and manages the handoff of images while they are waiting to be saved, providing temporary access to it along the way.
This class manages one or two queues/threads, depending on whether a processing function is provided. If a processing function is provided, the data will be processed in a separate thread before being passed to the data storage_backends object. If no processing function is provided, the data will be passed directly to the data storage_backends object.
- await_completion()
Wait for the threads to finish
- finish()
Signal to the data handler that no more data will be added. This will cause all threads to initiate shutdown and call the finish() method of the storage_backends object.
- get(coordinates, return_data=True, return_metadata=True, processed=None)
Get an image and associated metadata. If they are present, either in the intake queue or the storage_backends queue (if it exists), return them. If not present, get them from the storage_backends object. If not present there, return None
- Return type:
Optional[Tuple[ndarray[Any,dtype[Any]],TypeAliasType]]
- put(coordinates, image, metadata, execution_future)
Hand off this image to the data handler. It will handle handoff to the storage_backends object and image processing if requested, as well as providing temporary access to the image and metadata as it passes through this pipeline. If an acquisition future is provided, it will be notified when the image arrives, is processed, and is stored.
Micro-Manager Devices
Standard Events
Detector Events
- class exengine.events.detector_events.ReadoutData(data_coordinates_iterator, detector=None, data_handler=None, num_blocks=None, stop_on_empty=False)
Readout one or more blocks of data (e.g. images) and associated metadata from a Detector device (e.g. a camera)
Parameters:
- data_coordinate_iterator: Iterable[DataCoordinates]
An iterator or list of DataCoordinates objects, which specify the coordinates of the data that will be read out, should be able to provide at least num_images elements (or indefinitely if num_images is None)
- detector: Union[Detector, str]
The Detector object to read data from. Can be the object itself, or the name of the object in the ExecutionEngine’s device registry.
- num_blocks: int
The number of pieces of data (e.g. images) to read out. If None, the readout will continue until the data_coordinate_iterator is exhausted or the Detector is stopped and no more images are available.
- stop_on_empty: bool
(Experimental) If True, the readout will stop when the detector is stopped when there is no data available to read
- data_handler: DataHandler
The DataHandler object that will handle the data read out by this event
- class exengine.events.detector_events.DataAcquiredNotification(payload=None)
- class exengine.events.detector_events.StartCapture(num_blocks, detector=None)
Special device instruction that captures images from a Detector device (e.g. a camera)
- class exengine.events.detector_events.StartContinuousCapture(detector=None)
Tell Detector device to start capturing images continuously, until a stop signal is received
- class exengine.events.detector_events.StopCapture(detector=None)
Tell Detector device to start capturing data continuously, until a stop signal is received
Positioner Events
- class exengine.events.positioner_events.SetPosition2DEvent(device, position)
Set the position of a movable device
- class exengine.events.positioner_events.SetTriggerable2DPositionsEvent(device, positions)
Set the position of a movable device
- class exengine.events.positioner_events.SetPosition1DEvent(device, position)
Set the position of a movable device
- class exengine.events.positioner_events.SetTriggerable1DPositionsEvent(device, positions)
Send a sequence of positions to a 1D positioner that will be triggered by TTL pulses
- class exengine.events.positioner_events.StopTriggerablePositionSequenceEvent(device)
Stop the current triggerable sequence
Property Events
- class exengine.events.property_events.SetPropertiesEvent(devices_prop_names_values)
Set one or more properties (i.e. attributes) of one or more devices
- class exengine.events.property_events.SetTriggerablePropertySequencesEvent(property_sequences)
Set a sequence of must for properties of different devices to be cycled through by hardware triggers The properties should be triggerable
The property_sequence should be a list of tuples, each containing: - The name of the device or the device object itself - The name of the property to set - The sequence of values (e.g. a list) to set the property to
- class exengine.events.property_events.StopTriggerablePropertySequencesEvent(property_sequences)
Stop the current triggerable sequence for one or more properties of different devices
The property_sequence should be a list of tuples, each containing: - The name of the device or the device object itself - The name of the property to with a property sequence to stop
Miscellaneous Events
- class exengine.events.misc_events.Sleep(time_s)
Sleep for a specified amount of time
- class exengine.events.misc_events.SetTimeEvent(time_index, min_start_time=None)
Set the time point
Multi-Dimensional Acquisition Events
- exengine.events.multi_d_events.multi_d_acquisition_events(num_time_points=None, time_interval_s=0, z_start=None, z_end=None, z_step=None, channel_group=None, channels=None, channel_exposures_ms=None, xy_positions=None, xyz_positions=None, position_labels=None, order='tpcz', sequence=None, camera=None, xy_device=None, z_device=None)