Data Threads#

Data Threads bring data into the signal chain from external hardware.#

Type

Plugin::Type::DATA_THREAD

Base Classes

DataThread, DataBuffer

Template

open-ephys-plugins/data-thread-template

Data Threads are special types of Processor Plugins that communicate with a data acquisition device whose clock is not synchronized with the GUI’s internal processing callbacks. Instead of implementing a process() method as a “Source” processor would, a Data Thread must implement an updateBuffer() method that is called inside a separate thread. Any available data in this buffer will be automatically copied into the GUI’s signal chain for further processing.

Each Data Thread can generate an arbitrary number of data streams, each with an independent sample clock. The only limitations are that each data stream can only have one event “channel” (with up to 64 TTL lines), and the incoming data cannot contain spike events. Provided that the incoming data fits within these limitations, then it is strongly recommended to derive your plugin from a Data Thread, rather than building a custom Source Processor.

To create a new Data Thread, start with the Data Thread Template and add code that implements the following method:

bool updateBuffer()#

Adds any available data to a buffer (using DataBuffer::addToBuffer()). Any data in this buffer will be automatically copied into the GUI’s signal chain during the next process() callback. The size of the data should be the number input of channels multiplied by the total number of samples read per callback.

Returns:

true if communication with the data acquisition device is intact. If the connection to the device is lost, return false to terminate acquisition.

The following is an example of a minimal implementation of DataThread::updateBuffer():

const int NUM_CHANNELS = 16;
const int MAX_SAMPLES_PER_CHANNEL = 1024;
int raw_samples[NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL];
float scaled_samples[NUM_CHANNELS * MAX_SAMPLES_PER_BUFFER];
int64 sample_numbers[MAX_SAMPLES_PER_CHANNEL];
uint64 event_codes[MAX_SAMPLES_PER_CHANNEL];
double timestamps[MAX_SAMPLES_PER_CHANNEL];
int64 totalSamples = 0;
AcquisitionDevice* device;
DataBuffer* dataBuffer;

bool CustomDataThread::updateBuffer()
{

   device->readData(&raw_samples, event_codes, NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL);

   for (int i = 0; i < MAX_SAMPLES_PER_CHANNEL; i++)
   {
      for (int j = 0; j < NUM_CHANNELS; j++)
      {
         scaled_samples[j + i * NUM_CHANNELS] = raw_samples[j + i * NUM_CHANNELS] * scale_factor;
      }
      sample_numbers[i] = totalSamples++;
   }

   dataBuffer->addToBuffer(scaled_samples,
                           sample_numbers,
                           timestamps,
                           event_codes,
                           MAX_SAMPLES_PER_CHANNEL);

   return true;

}

In order to inform downstream plugins about the datastreams, channels, and data buffers generated by a Data Thread, the following method must be implemented:

void updateSettings(OwnedArray<ContinuousChannel> *continuousChannels, OwnedArray<EventChannel> *eventChannels, OwnedArray<SpikeChannel> *spikeChannels, OwnedArray<DataStream> *sourceStreams, OwnedArray<DeviceInfo> *devices, OwnedArray<ConfigurationObject> *configurationObjects)#

Passes pointers to the Source processor’s info objects to the Data Thread, to allow them to be configured as needed. Note that only channels that have been added to a DataStream object will be registered by downstream processors, and each DataStream can only have one EventChannel associated with it. Also, for each DataStream created, a new DataBuffer also needs to be created and added to the sourceBuffer array, owned by the DataThread.

The following is an example of a minimal implementation of DataThread::updateSettings():

bool CustomDataThread::updateSettings(OwnedArray<ContinuousChannel>* continuousChannels,
     OwnedArray<EventChannel>* eventChannels,
     OwnedArray<SpikeChannel>* spikeChannels,
     OwnedArray<DataStream>* sourceStreams,
     OwnedArray<DeviceInfo>* devices,
     OwnedArray<ConfigurationObject>* configurationObjects)
{
   // Clear previous values
   sourceStreams->clear();
   sourceBuffers.clear(); // DataThread class memeber
   continuousChannels->clear();
   eventChannels->clear();

   DataStream::Settings settings
   {
      "device_stream", // stream name
      "description",   // stream description
      "identifier",    // stream identifier
      30000.0          // stream sample rate
   };

   DataStream* stream = new DataStream(settings);

   sourceStreams->add(stream); // add pointer to owned array

   // create a data buffer and add it to the sourceBuffer array
   sourceBuffers.add(new DataBuffer(NUM_CHANNELS, 48000));
   dataBuffer = sourceBuffers.getLast();

   for (int i = 0; i < NUM_CHANNELS; i++)
   {
      ContinuousChannel::Settings settings{
                             ContinuousChannel::Type::ELECTRODE, // channel type
                             "CH" + String(i+1), // channel name
                             "description",      // channel description
                             "identifier",       // channel identifier
                             0.195,              // channel bitvolts scaling
                             stream              // associated data stream
                     };

      continuousChannels->add(new ContinuousChannel(settings));
   }

   EventChannel::Settings settings{
                     EventChannel::Type::TTL, // channel type (must be TTL)
                     "Device Event Channel",  // channel name
                     "description",           // channel description
                     "identifier",            // channel identifier
                     stream,                  // associated data stream
                     8                        // maximum number of TTL lines
             };

   eventChannels->add(new EventChannel(settings));
}

A Data Thread must also implement the following three methods in order to be complete:

bool foundInputSource()#

Called after the plugin has initialized, to determine whether a connection to the data acquisition device has been established.

Returns bool:

true if the data source is connected, false otherwise.

bool startAcquisition()#

Called just before acquisition begins, to signal that Data Thread should start streaming data from its device. It is important to start the Data Thread here.

Returns bool:

false if there is an error in starting data transfer, which will cancel the request to start data acquisition.

Minimal implementation of DataThread::startAcquisition():

bool CustomDataThread::startAcquisition()
{
   startThread();
   return true;
}
bool stopAcquisition()#

Called just before acquisition ends, to signal that Data Thread should stop streaming data from its device. It is important to stop the Data Thread here.

Returns bool:

false if there is an error in stopping data transfer (this return value is not currently handled).

Minimal implementation of DataThread::stopAcquisition():

bool CustomDataThread::stopAcquisition()
{
   if (isThreadRunning())
   {
      signalThreadShouldExit(); //stop thread
   }

   waitForThreadToExit(500);
   dataBuffer->clear();

   return true;
}

In addition, the following virtual methods can be overriden to extend the functionality of a Data Thread:

std::unique_ptr<GenericEditor> createEditor(SourceNode *sourceNode)#

Creates a custom editor for a Data Thread. If this method is not implemented, then a default editor will be created. See the Processor Plugins documentation page for more information about editors. Note that the GUI’s built-in Parameter class does not currently work with Data Threads.

Parameters:

sourceNode – A pointer to the SourceNode object (derived from the GenericProcessor class) associated with this Data Thread.

void handleBroadcastMessage(String msg)#

Allows the Data Thread plugin to respond to messages sent by other processors during acquisition.

Parameters:

msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.

String handleConfigMessage(String msg)#

Allows the Data Thread plugin to handle a configuration message (usually sent via the OpenEphysHTTPServer) while acquisition is not active.

Parameters:

msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.

Returns String:

The response to the sender (e.g., an acknowledgement that the configuration message was handled properly)

The following method can be called by a Data Thread to send a message to all other processors in the signal chain while acquisition is active:

void broadcastMessage(String msg)#

Allows the Data Thread to broadcast a custom string to all other processors.

Parameters:

msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.