Data Threads#
Type |
|
Base Classes |
|
Template |
Data Threads are special types of Processor Plugins that communicate with a data acquisition device whose clock is not synchronized with the GUI’s internal processing callbacks. Instead of implementing a process()
method as a “Source” processor would, a Data Thread must implement an updateBuffer()
method that is called inside a separate thread. Any available data in this buffer will be automatically copied into the GUI’s signal chain for further processing.
Each Data Thread can generate an arbitrary number of data streams, each with an independent sample clock. The only limitations are that each data stream can only have one event “channel” (with up to 64 TTL lines), and the incoming data cannot contain spike events. Provided that the incoming data fits within these limitations, then it is strongly recommended to derive your plugin from a Data Thread, rather than building a custom Source Processor.
To create a new Data Thread, start with the Data Thread Template and add code that implements the following method:
-
bool updateBuffer()#
Adds any available data to a buffer (using
DataBuffer::addToBuffer()
). Any data in this buffer will be automatically copied into the GUI’s signal chain during the nextprocess()
callback. The size of the data should be the number input of channels multiplied by the total number of samples read per callback.Important: As of GUI version 1.0,
DataBuffer::addToBuffer()
requires samples for each channel to be consecutively ordered in memory. For example, if you have 16 channels and read 1024 samples per channel in a single callback, all 1024 samples for channel 1 should come first. This is done to improve the efficiency with which the data can be copied into the buffers used for downstream processing.- Returns:
true
if communication with the data acquisition device is intact. If the connection to the device is lost, returnfalse
to terminate acquisition.
The following is an example of a minimal implementation of DataThread::updateBuffer()
:
const int NUM_CHANNELS = 16;
const int MAX_SAMPLES_PER_CHANNEL = 1024;
int raw_samples[NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL];
float scaled_samples[NUM_CHANNELS * MAX_SAMPLES_PER_BUFFER];
int64 sample_numbers[MAX_SAMPLES_PER_CHANNEL];
uint64 event_codes[MAX_SAMPLES_PER_CHANNEL];
double timestamps[MAX_SAMPLES_PER_CHANNEL];
int64 totalSamples = 0;
AcquisitionDevice* device;
DataBuffer* dataBuffer;
float scale_factor = 1e-6; // multiply by this to convert raw samples to microvolts
bool CustomDataThread::updateBuffer()
{
device->readData(&raw_samples, event_codes, NUM_CHANNELS * MAX_SAMPLES_PER_CHANNEL);
for (int i = 0; i < MAX_SAMPLES_PER_CHANNEL; i++)
{
for (int j = 0; j < NUM_CHANNELS; j++)
{
scaled_samples[i + j * MAX_SAMPLES_PER_CHANNEL] = raw_samples[j + i * NUM_CHANNELS] * scale_factor;
}
sample_numbers[i] = totalSamples++;
}
dataBuffer->addToBuffer(scaled_samples,
sample_numbers,
timestamps,
event_codes,
MAX_SAMPLES_PER_CHANNEL);
return true;
}
In order to inform downstream plugins about the type of data generated by a Data Thread, the following method must be implemented:
-
void updateSettings(OwnedArray<ContinuousChannel> *continuousChannels, OwnedArray<EventChannel> *eventChannels, OwnedArray<SpikeChannel> *spikeChannels, OwnedArray<DataStream> *sourceStreams, OwnedArray<DeviceInfo> *devices, OwnedArray<ConfigurationObject> *configurationObjects)#
Passes pointers to the Source processor’s info objects to the Data Thread, to allow them to be configured as needed. Note that only channels that have been added to a
DataStream
object will be registered by downstream processors, and eachDataStream
can only have oneEventChannel
associated with it. Also, for eachDataStream
created, a newDataBuffer
also needs to be created and added to thesourceBuffer
array, owned by the DataThread.
The following is an example of a minimal implementation of DataThread::updateSettings()
:
bool CustomDataThread::updateSettings(OwnedArray<ContinuousChannel>* continuousChannels,
OwnedArray<EventChannel>* eventChannels,
OwnedArray<SpikeChannel>* spikeChannels,
OwnedArray<DataStream>* sourceStreams,
OwnedArray<DeviceInfo>* devices,
OwnedArray<ConfigurationObject>* configurationObjects)
{
// Clear previous values
sourceStreams->clear();
sourceBuffers.clear(); // DataThread class member
continuousChannels->clear();
eventChannels->clear();
DataStream::Settings settings
{
"device_stream", // stream name
"description", // stream description
"identifier", // stream identifier
30000.0 // stream sample rate
};
DataStream* stream = new DataStream(settings);
sourceStreams->add(stream); // add pointer to owned array
// create a data buffer and add it to the sourceBuffer array
sourceBuffers.add(new DataBuffer(NUM_CHANNELS, 48000));
dataBuffer = sourceBuffers.getLast();
for (int i = 0; i < NUM_CHANNELS; i++)
{
ContinuousChannel::Settings settings{
ContinuousChannel::Type::ELECTRODE, // channel type
"CH" + String(i+1), // channel name
"description", // channel description
"identifier", // channel identifier
0.195, // channel bitvolts scaling
stream // associated data stream
};
continuousChannels->add(new ContinuousChannel(settings));
}
EventChannel::Settings settings{
EventChannel::Type::TTL, // channel type (must be TTL)
"Device Event Channel", // channel name
"description", // channel description
"identifier", // channel identifier
stream, // associated data stream
8 // maximum number of TTL lines
};
eventChannels->add(new EventChannel(settings));
}
A Data Thread must also implement the following three pure virtual methods in order to be complete:
-
bool foundInputSource()#
Called after the plugin has initialized, to determine whether a connection to the data acquisition device has been established.
- Returns bool:
true
if the data source is connected,false
otherwise.
-
bool startAcquisition()#
Called just before acquisition begins, to signal that Data Thread should start streaming data from its device. It is important to start the Data Thread here.
- Returns bool:
false
if there is an error in starting data transfer, which will cancel the request to start data acquisition.
Minimal implementation of DataThread::startAcquisition()
:
bool CustomDataThread::startAcquisition()
{
startThread();
return true;
}
-
bool stopAcquisition()#
Called just before acquisition ends, to signal that Data Thread should stop streaming data from its device. It is important to stop the Data Thread here.
- Returns bool:
false
if there is an error in stopping data transfer (this return value is not currently handled).
Minimal implementation of DataThread::stopAcquisition()
:
bool CustomDataThread::stopAcquisition()
{
if (isThreadRunning())
{
signalThreadShouldExit(); //stop thread
}
waitForThreadToExit(500);
dataBuffer->clear();
return true;
}
In addition, the following virtual methods can be overridden to extend the functionality of a Data Thread:
-
std::unique_ptr<GenericEditor> createEditor(SourceNode *sourceNode)#
Creates a custom editor for a Data Thread. If this method is not implemented, then a default editor will be created. See the Processor Plugins documentation page for more information about editors. Note that the GUI’s built-in
Parameter
class does not currently work with Data Threads.- Parameters:
sourceNode – A pointer to the
SourceNode
object (derived from theGenericProcessor
class) associated with this Data Thread.
-
void initialize(bool signalChainIsLoading)#
Allows the Data Thread plugin to set its initial state without blocking the rest of the GUI.
- Parameters:
signalChainIsLoading –
true
if the signal chain is loading,false
if the plugin is being created from scratch.
-
bool isReady()#
Allows the Data Thread plugin to block the start of acquisition if it is not ready to acquire data.
- Parameters:
bool –
true
if is is safe to start acquisition (default),false
if the Data Thread needs to prevent acquisition for some reason.
-
void handleBroadcastMessage(String msg)#
Allows the Data Thread plugin to respond to messages sent by other processors during acquisition.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.
-
String handleConfigMessage(String msg)#
Allows the Data Thread plugin to handle a configuration message (usually sent via the OpenEphysHTTPServer) while acquisition is not active.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.
- Returns String:
The response to the sender (e.g., an acknowledgement that the configuration message was handled properly)
The following method can be called by a Data Thread to send a message to all other processors in the signal chain while acquisition is active:
-
void broadcastMessage(String msg)#
Allows the Data Thread to broadcast a custom string to all other processors.
- Parameters:
msg – The message that was sent. There are no restrictions on how this string will be formatted; each plugin is responsible for parsing this message in the appropriate way.
If a Data Thread uses the GUI’s built-in Parameter
class to store its settings, the following methods must be implemented:
-
void registerParameters()#
Registers the parameters that will be used by the Data Thread. This method is called when the Data Thread is created, and should be used to set up any parameters that will be used by the plugin. The Data Thread should call
addBooleanParameter
,addIntParameter
,addStringParameter
, etc. only inside this method.
-
void parameterValueChanged(Parameter *parameter)#
Called when a parameter value is changed. This method should be used to update the Data Thread’s settings based on the new parameter value.
- Parameters:
parameter – The parameter that was changed. The Data Thread should check which parameter was changed and update its settings accordingly.