.. _datathreads: .. default-domain:: cpp Data Threads ===================== .. csv-table:: Data Threads bring data into the signal chain from external hardware. :widths: 18, 80 "*Type*", ":code:`Plugin::Type::DATA_THREAD`" "*Base Classes*", ":code:`DataThread`, :code:`DataBuffer`" "*Template*", "https://github.com/open-ephys-plugins/data-thread-template" **Data Threads** are special types of :ref:`processorplugins` that communicate with a data acquisition device whose clock is not synchronized with the GUI's internal processing callbacks. Instead of implementing a :code:`process()` method as a "Source" processor would, a Data Thread must implement an :code:`updateBuffer()` method that is called inside a separate thread. Any available data in this buffer will be automatically copied into the GUI's signal chain for further processing. Each Data Thread can generate an arbitrary number of data streams, each with an independent sample clock. The only limitations are that each data stream can only have one event "channel" (with up to 64 TTL lines), and the incoming data cannot contain spike events. Provided that the incoming data fits within these limitations, then it is strongly recommended to derive your plugin from a Data Thread, rather than building a custom Source Processor. To create a new Data Thread, start with the `Data Thread Template `__ and add code that implements the following method: .. function:: bool updateBuffer() Adds any available data to a buffer (using :code:`DataBuffer::addToBuffer()`). Any data in this buffer will be automatically copied into the GUI's signal chain during the next :code:`process()` callback. The size of the data should be the number input of channels multiplied by the total number of samples read per callback. **Important:** As of GUI version 1.0, :code:`DataBuffer::addToBuffer()` requires samples for each channel to be consecutively ordered in memory. For example, if you have 16 channels and read 1024 samples per channel in a single callback, all 1024 samples for channel 1 should come first. This is done to improve the efficiency with which the data can be copied into the buffers used for downstream processing. :returns: :code:`true` if communication with the data acquisition device is intact. If the connection to the device is lost, return :code:`false` to terminate acquisition. The following is an example of a minimal implementation of :code:`DataThread::updateBuffer()`: .. code-block:: c++ const int NUM_CHANNELS = 16; const int MAX_SAMPLES_PER_CHANNEL = 1024; int raw_samples[NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL]; float scaled_samples[NUM_CHANNELS * MAX_SAMPLES_PER_BUFFER]; int64 sample_numbers[MAX_SAMPLES_PER_CHANNEL]; uint64 event_codes[MAX_SAMPLES_PER_CHANNEL]; double timestamps[MAX_SAMPLES_PER_CHANNEL]; int64 totalSamples = 0; AcquisitionDevice* device; DataBuffer* dataBuffer; float scale_factor = 1e-6; // multiply by this to convert raw samples to microvolts bool CustomDataThread::updateBuffer() { device->readData(&raw_samples, event_codes, NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL); for (int i = 0; i < MAX_SAMPLES_PER_CHANNEL; i++) { for (int j = 0; j < NUM_CHANNELS; j++) { scaled_samples[i + j * MAX_SAMPLES_PER_CHANNEL] = raw_samples[j + i * NUM_CHANNELS] * scale_factor; } sample_numbers[i] = totalSamples++; } dataBuffer->addToBuffer(scaled_samples, sample_numbers, timestamps, event_codes, MAX_SAMPLES_PER_CHANNEL); return true; } In order to inform downstream plugins about the type of data generated by a Data Thread, the following method must be implemented: .. function:: void updateSettings(OwnedArray* continuousChannels, OwnedArray* eventChannels, OwnedArray* spikeChannels, OwnedArray* sourceStreams, OwnedArray* devices, OwnedArray* configurationObjects) Passes pointers to the Source processor's info objects to the Data Thread, to allow them to be configured as needed. Note that only channels that have been added to a :code:`DataStream` object will be registered by downstream processors, and each :code:`DataStream` can only have one :code:`EventChannel` associated with it. Also, for each :code:`DataStream` created, a new :code:`DataBuffer` also needs to be created and added to the :code:`sourceBuffer` array, owned by the DataThread. The following is an example of a minimal implementation of :code:`DataThread::updateSettings()`: .. code-block:: c++ bool CustomDataThread::updateSettings(OwnedArray* continuousChannels, OwnedArray* eventChannels, OwnedArray* spikeChannels, OwnedArray* sourceStreams, OwnedArray* devices, OwnedArray* configurationObjects) { // Clear previous values sourceStreams->clear(); sourceBuffers.clear(); // DataThread class member continuousChannels->clear(); eventChannels->clear(); DataStream::Settings settings { "device_stream", // stream name "description", // stream description "identifier", // stream identifier 30000.0 // stream sample rate }; DataStream* stream = new DataStream(settings); sourceStreams->add(stream); // add pointer to owned array // create a data buffer and add it to the sourceBuffer array sourceBuffers.add(new DataBuffer(NUM_CHANNELS, 48000)); dataBuffer = sourceBuffers.getLast(); for (int i = 0; i < NUM_CHANNELS; i++) { ContinuousChannel::Settings settings{ ContinuousChannel::Type::ELECTRODE, // channel type "CH" + String(i+1), // channel name "description", // channel description "identifier", // channel identifier 0.195, // channel bitvolts scaling stream // associated data stream }; continuousChannels->add(new ContinuousChannel(settings)); } EventChannel::Settings settings{ EventChannel::Type::TTL, // channel type (must be TTL) "Device Event Channel", // channel name "description", // channel description "identifier", // channel identifier stream, // associated data stream 8 // maximum number of TTL lines }; eventChannels->add(new EventChannel(settings)); } A Data Thread **must** also implement the following three pure virtual methods in order to be complete: .. function:: bool foundInputSource() Called after the plugin has initialized, to determine whether a connection to the data acquisition device has been established. :returns bool: :code:`true` if the data source is connected, :code:`false` otherwise. .. function:: bool startAcquisition() Called just before acquisition begins, to signal that Data Thread should start streaming data from its device. It is important to start the Data Thread here. :returns bool: :code:`false` if there is an error in starting data transfer, which will cancel the request to start data acquisition. Minimal implementation of :code:`DataThread::startAcquisition()`: .. code-block:: c++ bool CustomDataThread::startAcquisition() { startThread(); return true; } .. function:: bool stopAcquisition() Called just before acquisition ends, to signal that Data Thread should stop streaming data from its device. It is important to stop the Data Thread here. :returns bool: :code:`false` if there is an error in stopping data transfer (this return value is not currently handled). Minimal implementation of :code:`DataThread::stopAcquisition()`: .. code-block:: c++ bool CustomDataThread::stopAcquisition() { if (isThreadRunning()) { signalThreadShouldExit(); //stop thread } waitForThreadToExit(500); dataBuffer->clear(); return true; } In addition, the following virtual methods can be overridden to extend the functionality of a Data Thread: .. function:: std::unique_ptr createEditor(SourceNode* sourceNode) Creates a custom editor for a Data Thread. If this method is not implemented, then a default editor will be created. See the :ref:`processorplugins` documentation page for more information about editors. Note that the GUI's built-in :code:`Parameter` class does not currently work with Data Threads. :param sourceNode: A pointer to the :code:`SourceNode` object (derived from the :code:`GenericProcessor` class) associated with this Data Thread. .. function:: void initialize(bool signalChainIsLoading) Allows the Data Thread plugin to set its initial state without blocking the rest of the GUI. :param signalChainIsLoading: :code:`true` if the signal chain is loading, :code:`false` if the plugin is being created from scratch. .. function:: bool isReady() Allows the Data Thread plugin to block the start of acquisition if it is not ready to acquire data. :param bool: :code:`true` if is is safe to start acquisition (default), :code:`false` if the Data Thread needs to prevent acquisition for some reason. .. function:: void handleBroadcastMessage(String msg) Allows the Data Thread plugin to respond to messages sent by other processors during acquisition. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. .. function:: String handleConfigMessage(String msg) Allows the Data Thread plugin to handle a configuration message (usually sent via the OpenEphysHTTPServer) while acquisition is not active. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. :returns String: The response to the sender (e.g., an acknowledgement that the configuration message was handled properly) The following method can be called by a Data Thread to send a message to all other processors in the signal chain while acquisition is active: .. function:: void broadcastMessage(String msg) Allows the Data Thread to broadcast a custom string to all other processors. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. If a Data Thread uses the GUI's built-in :code:`Parameter` class to store its settings, the following methods must be implemented: .. function:: void registerParameters() Registers the parameters that will be used by the Data Thread. This method is called when the Data Thread is created, and should be used to set up any parameters that will be used by the plugin. The Data Thread should call :code:`addBooleanParameter`, :code:`addIntParameter`, :code:`addStringParameter`, etc. only inside this method. .. function:: void parameterValueChanged(Parameter* parameter) Called when a parameter value is changed. This method should be used to update the Data Thread's settings based on the new parameter value. :param parameter: The parameter that was changed. The Data Thread should check which parameter was changed and update its settings accordingly. |