Source code for pyelink.events

"""EyeLink event processing.

This module provides the EventProcessor class for managing event retrieval
and processing from the EyeLink tracker.
"""

from __future__ import annotations

import logging
import time
from concurrent.futures import Future, ThreadPoolExecutor
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from .core import EyeLink

import numpy as np

from .utils import RingBuffer

logger = logging.getLogger(__name__)


[docs] class EventProcessor: """Processes blink, fixation, and saccade events from EyeLink tracker. This class manages: - Event retrieval from tracker - Buffering events in ring buffers - Background thread for continuous event monitoring """
[docs] def __init__(self, device: EyeLink, buffer_length: int = 0) -> None: """Initialize event processor. Args: device: Connected EyeLink instance buffer_length: Number of events to store in ring buffers (0 = no buffering) """ self.device = device self.buffer_length = buffer_length # Initialize event buffers if needed if buffer_length != 0: self.fixdur_buf = RingBuffer(maxlen=buffer_length) self.blinkdur_buf = RingBuffer(maxlen=buffer_length) self.pupsize_buf = RingBuffer(maxlen=buffer_length) # Thread pool executor for background event processing self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="EventProcessor") self._event_future: Future | None = None self._event_stop = True
[docs] def get_event(self) -> tuple[str | None, list]: """Get the latest blink or fixation event over the link. The link must have been activated first: tracker.startRecording(0, 0, 1, 1) If both eyes are used, the left one is chosen by default. Eye indices: 0 - left eye 1 - right eye 2 - binocular Data type constants from pylink: 4 - ENDBLINK event 8 - ENDFIX (fixation end) event 200 - Sample data 0x3F/0 - No data available Returns: tuple: (event_type, event_prop) - event_type: 'blink', 'fixation', or None - event_prop: List of event properties """ # Use these values if nothing else is produced event_type = None event_prop = [] if self.device.realconnect: # Keep getting samples until a new sample is found timeout = 0.010 # Don't wait for new samples longer than 10 ms t0 = time.time() while (time.time() - t0) < timeout: data_type = self.device.get_next_data() if data_type == 4: # ENDBLINK event event_type = "blink" blink_event = self.device.get_float_data() if blink_event is not None: event_prop.append(blink_event.getEndTime() - blink_event.getStartTime()) elif data_type == 8: # ENDFIX (fixation end) event event_type = "fixation" fix_event = self.device.get_float_data() if fix_event is not None: event_prop.extend([ fix_event.getEndTime() - fix_event.getStartTime(), fix_event.getAveragePupilSize(), ]) elif data_type in {0x3F, 0}: # No data available break else: continue return event_type, event_prop
[docs] def start_event_thread(self) -> None: """Start event buffer thread.""" if self.buffer_length == 0: logger.warning("Cannot start event thread: buffer_length is 0") return if self._event_future is not None and not self._event_future.done(): logger.warning("Event thread already running") return # First clear buffers from old data self.fixdur_buf.clear() self.pupsize_buf.clear() self.blinkdur_buf.clear() # Start the thread self._event_stop = False self._event_future = self._executor.submit(self._event_loop) logger.info("Event thread started")
def _event_loop(self) -> None: """Continuously read events into the ring buffer (called by event thread).""" k = 0 while True: if self._event_stop: break event_type, event_prop = self.get_event() # If an actual event happened, write to buffer if event_type: # Take different action depending on what event it is if "blink" in event_type: self.blinkdur_buf.append(event_prop[0]) elif "fixation" in event_type: self.fixdur_buf.append(event_prop[0]) self.pupsize_buf.append(event_prop[1]) if np.mod(k, 10) == 0: time.sleep(0.001) k += 1
[docs] def stop_event_thread(self) -> None: """Stop event thread and wait for it to finish.""" if self._event_future is None: return self._event_stop = True # Wait for thread to finish gracefully (up to 5 seconds) try: self._event_future.result(timeout=5.0) except TimeoutError: logger.warning("Event thread did not stop within timeout") except Exception: logger.exception("Exception in event thread")
[docs] def shutdown(self) -> None: """Shutdown the thread pool executor gracefully.""" self.stop_event_thread() self._executor.shutdown(wait=True, cancel_futures=True) logger.info("EventProcessor executor shutdown complete")
[docs] def flush_events(self) -> None: """Clear the event buffers.""" if self.buffer_length != 0: self.fixdur_buf.clear() self.pupsize_buf.clear() self.blinkdur_buf.clear()