09-23-2024 09:28 AM
I have a python script that reads data from channel ai0, filters the samples, and then writes the filtered data to channel ao0. (sampling rate = 1 kHz, buffer size = 100)
The user is able to scale the filtered samples and apply a delay by adjusting the size of a deque array (defined as d in the code below) by using the arrow keys in the keyboard.
The code has been running continuously for about 3 days now, but I've seen x16 warnings in the terminal with the following error:
"While writing to the buffer during a regeneration, the actual data generated might have alternated between old data and new data. That is, while the driver was replacing the old pattern in the buffer with the new pattern, the device might have generated a portion of new data, then a portion of old data, and then a portion of new data again. Reduce the sample rate, use a larger buffer, or refer to documentation about DAQmx Write for information about other ways to avoid this warning."
I'm quite new to DSP and I don't know what's going on here. Why is this happening very sporadically? I haven't tried increasing the buffer size yet, and I would rather not adjust the sampling rate. However, I would like to understand what's going on first.
import numpy as np
from nidaqmx.stream_readers import AnalogMultiChannelReader
from nidaqmx.stream_writers import AnalogMultiChannelWriter
from nidaqmx import constants
import nidaqmx
from datetime import datetime
from scipy import signal
from scipy.optimize import curve_fit
import iir_filter
import keyboard
import collections
import time
# Parameters
sampling_freq_in = 1000 # in Hz
buffer_in_size = 100
bufsize_callback = buffer_in_size
buffer_in_size_cfg = round(buffer_in_size * 1)
lowcut = 0.8
highcut = 1.2
order = 3
sos = signal.iirfilter(order, [lowcut, highcut], btype='band', analog=False, ftype='butter', fs=sampling_freq_in, output='sos')
f1 = iir_filter.IIR_filter(sos)
buffer_in = np.zeros((1, buffer_in_size))
y = np.zeros((1, buffer_in_size))
scale = 1.0
delay_step = 0
d = collections.deque(maxlen=buffer_in_size)
def cfg_read_task(acquisition):
acquisition.ai_channels.add_ai_voltage_chan("Dev1/ai0")
acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=buffer_in_size_cfg)
def cfg_write_task(acquisition):
acquisition.ao_channels.add_ao_voltage_chan("Dev1/ao0")
acquisition.timing.cfg_samp_clk_timing(rate=sampling_freq_in, sample_mode=constants.AcquisitionType.CONTINUOUS,
samps_per_chan=buffer_in_size_cfg)
def reading_task_callback(task_idx, event_type, num_samples, callback_data):
global y_long,counter1,t_current,amplitudes,params,d
if running:
stream_in.read_many_sample(buffer_in, num_samples, timeout=constants.WAIT_INFINITELY)
for i in np.arange(0,len(buffer_in[0])):
d.append(f1.filter(buffer_in[0][i]))
y[0] = np.array(d)[0:buffer_in_size]
y[0] = scale*y[0]
stream_out.write_many_sample(y)
return 0
# Configure and setup the tasks
task_in = nidaqmx.Task()
task_out = nidaqmx.Task()
cfg_read_task(task_in)
cfg_write_task(task_out)
stream_in = AnalogMultiChannelReader(task_in.in_stream)
stream_out = AnalogMultiChannelWriter(task_out.out_stream)
stream_out.write_many_sample(buffer_in)
task_in.register_every_n_samples_acquired_into_buffer_event(bufsize_callback, reading_task_callback)
running = True
time_start = datetime.now()
task_in.start()
task_out.start()
while True:
# Wait for the next event.
event = keyboard.read_event()
if event.event_type == keyboard.KEY_DOWN and event.name == 'left':
delay_step += 5
if delay_step >= 1000:
d = collections.deque(maxlen=buffer_in_size)
delay_step = 0
else:
d = collections.deque(maxlen=buffer_in_size+delay_step)
print('phase delayed by ',np.round(delay_step*360*.001,3),' degrees')
elif event.event_type == keyboard.KEY_DOWN and event.name == 'right':
delay_step -= 5
if delay_step <= 0:
d = collections.deque(maxlen=buffer_in_size)
delay_step = 0
else:
d = collections.deque(maxlen=buffer_in_size+delay_step)
print('phase delayed by ',np.round(delay_step*360*.001,3),' degrees')
elif event.event_type == keyboard.KEY_DOWN and event.name == 'i':
scale = -1*scale
print('amplitude inverted!')
elif event.event_type == keyboard.KEY_DOWN and event.name == 'n':
scale = 1.0
print('amplitude set to 1.0!')
elif event.event_type == keyboard.KEY_DOWN and event.name == 'up':
scale += .01
print('amplitude increased to',np.round(scale,3))
elif event.event_type == keyboard.KEY_DOWN and event.name == 'down':
scale -= .01
print('amplitude decreased to',np.round(scale,3))
elif event.event_type == keyboard.KEY_DOWN and event.name == 'esc':
break
running = False
task_in.close()
task_out.close()
09-24-2024 10:37 AM
I'm LabVIEW-only and can't speak to any of the Python syntax. But it appears that the # samples you write to your output task may be equal to its full buffer size.
This puts a pretty difficult demand on the DAQmx driver. It tries to manage the process cleanly, but doesn't quite succeed on a few rare occasions. Keep in mind that at a seeming call rate of 10/sec and only a handful of warnings per day, you're getting a success rate > 99.999%!
Long ago before the driver was made much smarter about managing such things, it was necessary to do a lot more of the management ourselves in our own code. My "rule of thumb" was to write about 1/3 of a buffer per function call while managing the timing & logic for when the next 1/3 would be neither too early nor too late.
I kinda suspect that the new & smarter DAQmx algorithm may still benefit from breaking things up this way. Overwriting the entire buffer all at once is a special edge case that puts extra demands on the driver to deal with bus timing and logic perfectly. 2 or 3 consecutive write calls of 1/2 or 1/3 buffer each might help avoid this.
-Kevin P
09-24-2024 03:24 PM
Thank you very much for your response Kevin. I really appreciate the insight here!
I know you can't speak to any of the python syntax, but would it make sense if I split the array in three and call the write_many_samples routine three consecutive times? Or do I have to write the samples in three distinct function calls?
def reading_task_callback(task_idx, event_type, num_samples, callback_data):
global y_long,counter1,t_current,amplitudes,params,d
if running:
stream_in.read_many_sample(buffer_in, num_samples, timeout=constants.WAIT_INFINITELY)
for i in np.arange(0,len(buffer_in[0])):
d.append(f1.filter(buffer_in[0][i]))
y[0] = np.array(d)[0:buffer_in_size]
y[0] = scale*y[0]
a, b, c = np.array_split(y[0], 3)
stream_out.write_many_sample(np.array([a]))
stream_out.write_many_sample(np.array([b]))
stream_out.write_many_sample(np.array([c]))
# stream_out.write_many_sample(y)
return 0
09-25-2024 06:25 AM
Your code looks to me like what I was suggesting to try. Instead of 1 function call to write the entire buffer full all at once, break it up into 3 consecutive calls that each write 1/3 of a buffer.
Here's my thought process, based partly on knowledge and partly on educated guesses about how DAQmx handles stuff:
- in a continuous output task, the size of a DAQmx output buffer gets set based on the # of samples written to the task prior to starting it.
- DAQmx will then treat that buffer as though it were circular. It'll maintain "pointers" and other info to keep track of things:
1. a write pointer to track the position for placing new data when you call a DAQmx write() function
2. a transfer pointer to track the position for delivering the next chunk of data to the device via DMA
3. some means (pointer? flags?) for determining which buffer data has already been transferred and is therefore eligible to be overwritten during a subsequent write() call
- DAQmx *tries* to avoid writing new data into the buffer at positions that would overwrite data that hasn't yet been transferred. It can usually succeed, but not quite always as you've found
- I *think* that trying to write the whole buffer all at once makes this job especially difficult, because the transfer pointer may be a moving target during the time that the write() is executing. Also, it's *guaranteed* that the data you're trying to write will overlap with the transfer pointer, making "collision" avoidance necessary.
- by writing only, say, 1/3 of a buffer at a time, you provide DAQmx with some wiggle room that I *hope* it's able to make good use of. Roughly 2/3 of the calls won't require careful choregraphy to avoid a "collision". For the other 1/3, there'd be an option to simply wait for the transfer pointer to advance far enough to be out of the way before coming in behind it to replace 1/3 of the contents.
(When you write the entire buffer all at once, it's not possible for the transfer pointer to get out of the way because every single sample position is meant to be overwritten. The transfer pointer is *always* pointing to a vulnerable position.)
-Kevin P
10-02-2024 09:16 AM
Thank you so much Kevin. This resolved my problem 🙂