.. _datathreads: .. default-domain:: cpp Data Threads ===================== .. csv-table:: Data Threads bring data into the signal chain from external hardware. :widths: 18, 80 "*Type*", ":code:`Plugin::Type::DATA_THREAD`" "*Base Classes*", ":code:`DataThread`, :code:`DataBuffer`" "*Template*", "https://github.com/open-ephys-plugins/data-thread-template" **Data Threads** are special types of :ref:`processorplugins` that communicate with a data acquisition device whose clock is not synchronized with the GUI's internal processing callbacks. Instead of implementing a :code:`process()` method as a "Source" processor would, a Data Thread must implement an :code:`updateBuffer()` method that is called inside a separate thread. Any available data in this buffer will be automatically copied into the GUI's signal chain for further processing. Each Data Thread can generate an arbitrary number of data streams, each with an independent sample clock. The only limitations are that each data stream can only have one event "channel" (with up to 64 TTL lines), and the incoming data cannot contain spike events. Provided that the incoming data fits within these limitations, then it is strongly recommended to derive your plugin from a Data Thread, rather than building a custom Source Processor. To create a new Data Thread, start with the `Data Thread Template `__ and add code that implements the following method: .. function:: bool updateBuffer() Adds any available data to a buffer (using :code:`DataBuffer::addToBuffer()`). Any data in this buffer will be automatically copied into the GUI's signal chain during the next :code:`process()` callback. The size of the data should be the number input of channels multiplied by the total number of samples read per callback. :returns: :code:`true` if communication with the data acquisition device is intact. If the connection to the device is lost, return :code:`false` to terminate acquisition. The following is an example of a minimal implementation of :code:`DataThread::updateBuffer()`: .. code-block:: c++ const int NUM_CHANNELS = 16; const int MAX_SAMPLES_PER_CHANNEL = 1024; int raw_samples[NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL]; float scaled_samples[NUM_CHANNELS * MAX_SAMPLES_PER_BUFFER]; int64 sample_numbers[MAX_SAMPLES_PER_CHANNEL]; uint64 event_codes[MAX_SAMPLES_PER_CHANNEL]; double timestamps[MAX_SAMPLES_PER_CHANNEL]; int64 totalSamples = 0; AcquisitionDevice* device; DataBuffer* dataBuffer; bool CustomDataThread::updateBuffer() { device->readData(&raw_samples, event_codes, NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL); for (int i = 0; i < MAX_SAMPLES_PER_CHANNEL; i++) { for (int j = 0; j < NUM_CHANNELS; j++) { scaled_samples[j + i * NUM_CHANNELS] = raw_samples[j + i * NUM_CHANNELS] * scale_factor; } sample_numbers[i] = totalSamples++; } dataBuffer->addToBuffer(scaled_samples, sample_numbers, timestamps, event_codes, MAX_SAMPLES_PER_CHANNEL); return true; } In order to inform downstream plugins about the datastreams, channels, and data buffers generated by a Data Thread, the following method must be implemented: .. function:: void updateSettings(OwnedArray* continuousChannels, OwnedArray* eventChannels, OwnedArray* spikeChannels, OwnedArray* sourceStreams, OwnedArray* devices, OwnedArray* configurationObjects) Passes pointers to the Source processor's info objects to the Data Thread, to allow them to be configured as needed. Note that only channels that have been added to a :code:`DataStream` object will be registered by downstream processors, and each :code:`DataStream` can only have one :code:`EventChannel` associated with it. Also, for each :code:`DataStream` created, a new :code:`DataBuffer` also needs to be created and added to the :code:`sourceBuffer` array, owned by the DataThread. The following is an example of a minimal implementation of :code:`DataThread::updateSettings()`: .. code-block:: c++ bool CustomDataThread::updateSettings(OwnedArray* continuousChannels, OwnedArray* eventChannels, OwnedArray* spikeChannels, OwnedArray* sourceStreams, OwnedArray* devices, OwnedArray* configurationObjects) { // Clear previous values sourceStreams->clear(); sourceBuffers.clear(); // DataThread class memeber continuousChannels->clear(); eventChannels->clear(); DataStream::Settings settings { "device_stream", // stream name "description", // stream description "identifier", // stream identifier 30000.0 // stream sample rate }; DataStream* stream = new DataStream(settings); sourceStreams->add(stream); // add pointer to owned array // create a data buffer and add it to the sourceBuffer array sourceBuffers.add(new DataBuffer(NUM_CHANNELS, 48000)); dataBuffer = sourceBuffers.getLast(); for (int i = 0; i < NUM_CHANNELS; i++) { ContinuousChannel::Settings settings{ ContinuousChannel::Type::ELECTRODE, // channel type "CH" + String(i+1), // channel name "description", // channel description "identifier", // channel identifier 0.195, // channel bitvolts scaling stream // associated data stream }; continuousChannels->add(new ContinuousChannel(settings)); } EventChannel::Settings settings{ EventChannel::Type::TTL, // channel type (must be TTL) "Device Event Channel", // channel name "description", // channel description "identifier", // channel identifier stream, // associated data stream 8 // maximum number of TTL lines }; eventChannels->add(new EventChannel(settings)); } A Data Thread **must** also implement the following three methods in order to be complete: .. function:: bool foundInputSource() Called after the plugin has initialized, to determine whether a connection to the data acquisition device has been established. :returns bool: :code:`true` if the data source is connected, :code:`false` otherwise. .. function:: bool startAcquisition() Called just before acquisition begins, to signal that Data Thread should start streaming data from its device. It is important to start the Data Thread here. :returns bool: :code:`false` if there is an error in starting data transfer, which will cancel the request to start data acquisition. Minimal implementation of :code:`DataThread::startAcquisition()`: .. code-block:: c++ bool CustomDataThread::startAcquisition() { startThread(); return true; } .. function:: bool stopAcquisition() Called just before acquisition ends, to signal that Data Thread should stop streaming data from its device. It is important to stop the Data Thread here. :returns bool: :code:`false` if there is an error in stopping data transfer (this return value is not currently handled). Minimal implementation of :code:`DataThread::stopAcquisition()`: .. code-block:: c++ bool CustomDataThread::stopAcquisition() { if (isThreadRunning()) { signalThreadShouldExit(); //stop thread } waitForThreadToExit(500); dataBuffer->clear(); return true; } In addition, the following virtual methods can be overriden to extend the functionality of a Data Thread: .. function:: std::unique_ptr createEditor(SourceNode* sourceNode) Creates a custom editor for a Data Thread. If this method is not implemented, then a default editor will be created. See the :ref:`processorplugins` documentation page for more information about editors. Note that the GUI's built-in :code:`Parameter` class does not currently work with Data Threads. :param sourceNode: A pointer to the :code:`SourceNode` object (derived from the :code:`GenericProcessor` class) associated with this Data Thread. .. function:: void handleBroadcastMessage(String msg) Allows the Data Thread plugin to respond to messages sent by other processors during acquisition. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. .. function:: String handleConfigMessage(String msg) Allows the Data Thread plugin to handle a configuration message (usually sent via the OpenEphysHTTPServer) while acquisition is not active. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. :returns String: The response to the sender (e.g., an acknowledgement that the configuration message was handled properly) The following method can be called by a Data Thread to send a message to all other processors in the signal chain while acquisition is active: .. function:: void broadcastMessage(String msg) Allows the Data Thread to broadcast a custom string to all other processors. :param msg: The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way. |