Data Threads#
Type |
|
Base Classes |
|
Template |
Data Threads are special types of Processor Plugins that communicate with a data acquisition device whose clock is not synchronized with the GUI’s internal processing callbacks. Instead of implementing a process()
method as a “Source” processor would, a Data Thread must implement an updateBuffer()
method that is called inside a separate thread. Any available data in this buffer will be automatically copied into the GUI’s signal chain for further processing.
Each Data Thread can generate an arbitrary number of data streams, each with an independent sample clock. The only limitations are that each data stream can only have one event “channel” (with up to 64 TTL lines), and the incoming data cannot contain spike events. Provided that the incoming data fits within these limitations, then it is strongly recommended to derive your plugin from a Data Thread, rather than building a custom Source Processor.
To create a new Data Thread, start with the Data Thread Template and add code that implements the following method:
-
bool updateBuffer()#
Adds any available data to a buffer (using
DataBuffer::addToBuffer()
). Any data in this buffer will be automatically copied into the GUI’s signal chain during the nextprocess()
callback. The size of the data should be the number input of channels multiplied by the total number of samples read per callback.- Returns:
true
if communication with the data acquisition device is intact. If the connection to the device is lost, returnfalse
to terminate acquisition.
The following is an example of a minimal implementation of DataThread::updateBuffer()
:
const int NUM_CHANNELS = 16;
const int MAX_SAMPLES_PER_CHANNEL = 1024;
int raw_samples[NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL];
float scaled_samples[NUM_CHANNELS * MAX_SAMPLES_PER_BUFFER];
int64 sample_numbers[MAX_SAMPLES_PER_CHANNEL];
uint64 event_codes[MAX_SAMPLES_PER_CHANNEL];
double timestamps[MAX_SAMPLES_PER_CHANNEL];
int64 totalSamples = 0;
AcquisitionDevice* device;
DataBuffer* dataBuffer;
bool CustomDataThread::updateBuffer()
{
device->readData(&raw_samples, event_codes, NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL);
for (int i = 0; i < MAX_SAMPLES_PER_CHANNEL; i++)
{
for (int j = 0; j < NUM_CHANNELS; j++)
{
scaled_samples[j + i * NUM_CHANNELS] = raw_samples[j + i * NUM_CHANNELS] * scale_factor;
}
sample_numbers[i] = totalSamples++;
}
dataBuffer->addToBuffer(scaled_samples,
sample_numbers,
timestamps,
event_codes,
MAX_SAMPLES_PER_CHANNEL);
return true;
}
In order to inform downstream plugins about the datastreams, channels, and data buffers generated by a Data Thread, the following method must be implemented:
-
void updateSettings(OwnedArray<ContinuousChannel> *continuousChannels, OwnedArray<EventChannel> *eventChannels, OwnedArray<SpikeChannel> *spikeChannels, OwnedArray<DataStream> *sourceStreams, OwnedArray<DeviceInfo> *devices, OwnedArray<ConfigurationObject> *configurationObjects)#
Passes pointers to the Source processor’s info objects to the Data Thread, to allow them to be configured as needed. Note that only channels that have been added to a
DataStream
object will be registered by downstream processors, and eachDataStream
can only have oneEventChannel
associated with it. Also, for eachDataStream
created, a newDataBuffer
also needs to be created and added to thesourceBuffer
array, owned by the DataThread.
The following is an example of a minimal implementation of DataThread::updateSettings()
:
bool CustomDataThread::updateSettings(OwnedArray<ContinuousChannel>* continuousChannels,
OwnedArray<EventChannel>* eventChannels,
OwnedArray<SpikeChannel>* spikeChannels,
OwnedArray<DataStream>* sourceStreams,
OwnedArray<DeviceInfo>* devices,
OwnedArray<ConfigurationObject>* configurationObjects)
{
// Clear previous values
sourceStreams->clear();
sourceBuffers.clear(); // DataThread class memeber
continuousChannels->clear();
eventChannels->clear();
DataStream::Settings settings
{
"device_stream", // stream name
"description", // stream description
"identifier", // stream identifier
30000.0 // stream sample rate
};
DataStream* stream = new DataStream(settings);
sourceStreams->add(stream); // add pointer to owned array
// create a data buffer and add it to the sourceBuffer array
sourceBuffers.add(new DataBuffer(NUM_CHANNELS, 48000));
dataBuffer = sourceBuffers.getLast();
for (int i = 0; i < NUM_CHANNELS; i++)
{
ContinuousChannel::Settings settings{
ContinuousChannel::Type::ELECTRODE, // channel type
"CH" + String(i+1), // channel name
"description", // channel description
"identifier", // channel identifier
0.195, // channel bitvolts scaling
stream // associated data stream
};
continuousChannels->add(new ContinuousChannel(settings));
}
EventChannel::Settings settings{
EventChannel::Type::TTL, // channel type (must be TTL)
"Device Event Channel", // channel name
"description", // channel description
"identifier", // channel identifier
stream, // associated data stream
8 // maximum number of TTL lines
};
eventChannels->add(new EventChannel(settings));
}
A Data Thread must also implement the following three methods in order to be complete:
-
bool foundInputSource()#
Called after the plugin has initialized, to determine whether a connection to the data acquisition device has been established.
- Returns bool:
true
if the data source is connected,false
otherwise.
-
bool startAcquisition()#
Called just before acquisition begins, to signal that Data Thread should start streaming data from its device. It is important to start the Data Thread here.
- Returns bool:
false
if there is an error in starting data transfer, which will cancel the request to start data acquisition.
Minimal implementation of DataThread::startAcquisition()
:
bool CustomDataThread::startAcquisition()
{
startThread();
return true;
}
-
bool stopAcquisition()#
Called just before acquisition ends, to signal that Data Thread should stop streaming data from its device. It is important to stop the Data Thread here.
- Returns bool:
false
if there is an error in stopping data transfer (this return value is not currently handled).
Minimal implementation of DataThread::stopAcquisition()
:
bool CustomDataThread::stopAcquisition()
{
if (isThreadRunning())
{
signalThreadShouldExit(); //stop thread
}
waitForThreadToExit(500);
dataBuffer->clear();
return true;
}
In addition, the following virtual methods can be overriden to extend the functionality of a Data Thread:
-
std::unique_ptr<GenericEditor> createEditor(SourceNode *sourceNode)#
Creates a custom editor for a Data Thread. If this method is not implemented, then a default editor will be created. See the Processor Plugins documentation page for more information about editors. Note that the GUI’s built-in
Parameter
class does not currently work with Data Threads.- Parameters:
sourceNode – A pointer to the
SourceNode
object (derived from theGenericProcessor
class) associated with this Data Thread.
-
void handleBroadcastMessage(String msg)#
Allows the Data Thread plugin to respond to messages sent by other processors during acquisition.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.
-
String handleConfigMessage(String msg)#
Allows the Data Thread plugin to handle a configuration message (usually sent via the OpenEphysHTTPServer) while acquisition is not active.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.
- Returns String:
The response to the sender (e.g., an acknowledgement that the configuration message was handled properly)
The following method can be called by a Data Thread to send a message to all other processors in the signal chain while acquisition is active:
-
void broadcastMessage(String msg)#
Allows the Data Thread to broadcast a custom string to all other processors.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.