Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

nidaqmx stream_readers and stream_writers versus read and write

I'm using the NI PCIe 6353 and was testing the difference between streaming the reads and writes to/from ai and ao versus calling read and write explicitly on a task.

 

Below I've posted the code used for testing the streaming:

 

 

 

 

import time
import nidaqmx
import numpy as np
import nidaqmx as dq
from nidaqmx.stream_readers import (
    AnalogSingleChannelReader, AnalogMultiChannelReader)
from nidaqmx.stream_writers import (
    AnalogSingleChannelWriter, AnalogMultiChannelWriter)

class AnalogInStream(nidaqmx.Task):
    def __init__(self, device, channels, nr_samples):
        nidaqmx.Task.__init__(self)
        for ch in channels:
            self.ai_channels.add_ai_voltage_chan(device.name + "/ai"+str(ch))

        self.nr_channels = len(channels)
        self.nr_samples = nr_samples

        self.acq_data = np.zeros((self.nr_channels, self.nr_samples), dtype = np.float64)
        self.reader = AnalogMultiChannelReader(self.in_stream)

    def configure_clock(self, sample_rate, device):
        try:
            self.timing.cfg_samp_clk_timing(sample_rate,source='/'+device.name+'/ao/SampleClock',samps_per_chan=self.nr_samples)
        except NameError:
            pass

    def acquire_data(self):
        self.reader.read_many_sample(self.acq_data, number_of_samples_per_channel=self.nr_samples)

class AnalogOutStream(nidaqmx.Task):
    def __init__(self, device, channel, nr_samples):
        nidaqmx.Task.__init__(self)
        self.ao_channels.add_ao_voltage_chan(device.name+"/ao"+str(channel))

        self.nr_channels = 1
        self.nr_samples = nr_samples

        self.writer = AnalogSingleChannelWriter(self.out_stream)

        self.write_data = np.zeros(nr_samples)

        self.change_flag = True

    def configure_clock(self, sample_rate):
        self.timing.cfg_samp_clk_timing(sample_rate, samps_per_chan=self.nr_samples)

    def update_data(self, data):
        self.write_data = data
        self.change_flag = True

    def perform_write(self):
        if self.change_flag:
            self.writer.write_many_sample(self.write_data)
            self.change_flag = False
        self.start()

def gaussian(x, mu, sig):
    return 1./(np.sqrt(2.*np.pi)*sig)*np.exp(-np.power((x - mu)/sig, 2.)/2)

if __name__ == "__main__":
    x = np.linspace(0,1,2000)
    data = gaussian(x, 0.2, 0.01)+gaussian(x,0.8,0.01)
    data /= data.max()
    data *= 5

    scan_time = 4e-3
    sample_rate = int(len(data)/scan_time)
    print(f'sample rate = {sample_rate/1e6:.1f} MHz')

    dev = dq.system.Device(name = 'Dev1')
    ao = AnalogOutStream(dev, 0, len(data))
    ai = AnalogInStream(dev, [0,1], len(data))
    ai.configure_clock(sample_rate, ao.devices[0])
    ao.configure_clock(sample_rate)
    ao.update_data(data)

    nr_loops = 1000
    timing = np.zeros(nr_loops)
    for idx in range(nr_loops):
        start = time.time()
        ai.start()
        ao.perform_write()
        ai.acquire_data()
        ai.stop()
        ao.stop()
        stop = time.time()
        timing[idx] = stop-start

    print(f'executed in {timing.mean():.2e} +/- {timing.std():.2e} s')
    
    ai.close()
    ao.close()

 

 

 

 

 

And here is the code for the calling read and write on the task directly:

 

 

 

 

import time
import nidaqmx
import numpy as np
import nidaqmx as dq

class AnalogIn(nidaqmx.Task):
    def __init__(self, device, channels, nr_samples):
        nidaqmx.Task.__init__(self)
        for ch in channels:
            self.ai_channels.add_ai_voltage_chan(device.name + "/ai"+str(ch))

        self.nr_channels = len(channels)
        self.nr_samples = nr_samples

        self.in_stream.input_buf_size = nr_samples

        self.acq_data = np.zeros((self.nr_channels, self.nr_samples), dtype = np.float64)

    def configure_clock(self, sample_rate, device):
        try:
            self.timing.cfg_samp_clk_timing(sample_rate,source='/'+device.name+'/ao/SampleClock',samps_per_chan=self.nr_samples)
        except NameError:
            pass

    def acquire_data(self):
        self.acq_data = self.read(number_of_samples_per_channel = self.nr_samples)

class AnalogOut(nidaqmx.Task):
    def __init__(self, device, channel, nr_samples):
        nidaqmx.Task.__init__(self)
        self.ao_channels.add_ao_voltage_chan(device.name+"/ao"+str(channel))

        self.nr_channels = 1
        self.nr_samples = nr_samples
        self.out_stream.output_buf_size = self.nr_samples

        self.write_data = np.zeros(nr_samples)

    def configure_clock(self, sample_rate):
        self.timing.cfg_samp_clk_timing(sample_rate, samps_per_chan=self.nr_samples)

    def update_data(self, data):
        self.write_data = data
        self.change_flag = True

    def perform_write(self):
        self.write(self.write_data)
        self.start()

def gaussian(x, mu, sig):
    return 1./(np.sqrt(2.*np.pi)*sig)*np.exp(-np.power((x - mu)/sig, 2.)/2)

if __name__ == "__main__":
    x = np.linspace(0,1,2000)
    data = gaussian(x, 0.2, 0.01)+gaussian(x,0.8,0.01)
    data /= data.max()
    data *= 5

    scan_time = 4e-3
    sample_rate = int(len(data)/scan_time)
    print(f'sample rate = {sample_rate/1e6:.1f} MHz')

    dev = dq.system.Device(name = 'Dev1')
    ao = AnalogOut(dev, 0, len(data))
    ai = AnalogIn(dev, [0,1], len(data))
    ai.configure_clock(sample_rate, ao.devices[0])
    ao.configure_clock(sample_rate)
    ao.update_data(data)

def gaussian(x, mu, sig):
    return 1./(np.sqrt(2.*np.pi)*sig)*np.exp(-np.power((x - mu)/sig, 2.)/2)

if __name__ == "__main__":
    x = np.linspace(0,1,2000)
    data = gaussian(x, 0.2, 0.01)+gaussian(x,0.8,0.01)
    data /= data.max()
    data *= 5

    scan_time = 4e-3
    sample_rate = int(len(data)/scan_time)
    print(f'sample rate = {sample_rate/1e6:.1f} MHz')

    dev = dq.system.Device(name = 'Dev1')
    ao = AnalogOut(dev, 0, len(data))
    ai = AnalogIn(dev, [0,1], len(data))
    ai.configure_clock(sample_rate, ao.devices[0])
    ao.configure_clock(sample_rate)
    ao.update_data(data)

    nr_loops = 1000
    timing = np.zeros(nr_loops)
    for idx in range(nr_loops):
        start = time.time()
        ai.start()
        ao.perform_write()
        ai.acquire_data()
        ai.stop()
        ao.stop()
        stop = time.time()
        timing[idx] = stop-start

    print(f'executed in {timing.mean():.2e} +/- {timing.std():.2e} s')

    ai.close()
    ao.close()

 

 

 

 

 

They seem to behave similarly, except the streaming method sometimes goes faster, see the graphs belowtest_daq_regular.pngtest_daq_streaming.pngThe write and read sample rate combined with the number of samples gives a total acquisition time of 4 ms. I expect some overhead from transferring the data to the device and back, but not almost 5 ms.

 

My questions are:

  • is is there any way to reduce this up this overhead of ~5 ms.
  • I'm confused why the stream method isn't faster than the regular write/read method, but that could simply be because the arrays aren't that big. The arrays size will not exceed ~2000 hence I didn't test this further. 
  • it seems the changes in execution time vary in increments of ms, I can't figure out why that is.

 

The code is used to scan a cavity and acquire multiple photodiode signals (for a scanning cavity lock) and I was looking at ways to speed it up. Between scans a software PID controller uses the scan data to control lasers; so the scan cannot run continuously and has to be started in software.

 

 

0 Kudos
Message 1 of 2
(4,950 Views)

Did you get the answer?

0 Kudos
Message 2 of 2
(1,336 Views)