03-06-2023 08:53 AM
I am trying to make a code with the python module nidaqmx (PXIe 6365, PXIe 6739 and communication with thunderbolt) wich does :
- read one sample with the analog inputs
- then, calculate the output signal with it
- and finally write the result into the analog outputs
- the inputs and outputs have also to be synchronized
Here is the code :
# -*- coding: utf-8 -*-
import nidaqmx
import numpy as np
class PXI():
def __init__(self):
system = nidaqmx.system.System.local()
self.system = system
self.devices = [system.devices[i].name for i in range(len(system.devices)-1)]
self.task_AI = None
self.task_AO = None
self.Fs = None
self.AIchannels = None
self.dev_AI = None
self.AIconfig = None
self.AI_trig = None
def config_AnalogIn(self,device,Fs,channels,config="AsymRef",trig=False,AI_trig=None,delay=0):
self.task_AI = nidaqmx.Task()
if config == "AsymRef": config = nidaqmx.constants.TerminalConfiguration.RSE
AnIn = np.array([ch.name for ch in self.system.devices[device].ai_physical_chans])
if trig and (AI_trig is not None):
self.task_AI.ai_channels.add_ai_voltage_chan(AnIn[AI_trig],terminal_config=config)
for chan in AnIn[channels]:
if not trig or chan != AnIn[AI_trig]:
self.task_AI.ai_channels.add_ai_voltage_chan(chan,terminal_config=config)
if trig:
self.task_AI.triggers.start_trigger.cfg_anlg_edge_start_trig(AnIn[AI_trig],trigger_level = 1)
self.task_AI.triggers.start_trigger.delay_units = nidaqmx.constants.DigitalWidthUnits.SECONDS
if delay>0: self.task_AI.triggers.start_trigger.delay = delay
self.Fs = Fs
self.AIchannels = channels
self.dev_AI = device
self.AIconfig = config
self.AI_trig = AI_trig
def config_AnalogOut(self,device,channels,trig_AI=False):
self.task_AO = nidaqmx.Task()
AnOut = np.array([ch.name for ch in self.system.devices[device].ao_physical_chans])
for chan in AnOut[channels]:
self.task_AO.ao_channels.add_ao_voltage_chan(chan)
if trig_AI:
self.task_AO.triggers.start_trigger.cfg_dig_edge_start_trig(self.task_AI.triggers.start_trigger.term)
def config_sync(self,dev_AI,dev_AO,channels_In,channels_Out,Fs):
self.config_AnalogIn(dev_AI,Fs,channels_In)
self.config_AnalogOut(dev_AO,channels_Out,True)
def config_stream(self,dev_AI,dev_AO,channels_In,channels_Out,Fs):
self.config_sync(dev_AI, dev_AO, channels_In, channels_Out, Fs)
self.reader = nidaqmx.stream_readers.AnalogMultiChannelReader(self.task_AI.in_stream)
self.writer = nidaqmx.stream_writers.AnalogMultiChannelWriter(self.task_AO.in_stream,auto_start=False)
self.task_AO.out_stream.regen_mode = nidaqmx.constants.RegenerationMode.DONT_ALLOW_REGENERATION
def acquire_stream(self,Nsamples):
self.task_AO.timing.cfg_samp_clk_timing(self.Fs,samps_per_chan=1)
self.task_AI.timing.cfg_samp_clk_timing(self.Fs,samps_per_chan=1)
data = np.zeros((len(self.AIchannels),Nsamples))
temp = np.zeros((1,len(self.AIchannels)))
dV = 1e-7
n = 0
def writeCallback(task_idx, every_n_samples_event_type, num_of_samples, callback_data):
global data, n, temp
self.reader.read_many_sample(temp,1)
data[:,n] = temp.copy();
self.writer.write_many_sample(temp+dV) # write the array
n += 1
return 0
self.task_AO.register_every_n_samples_transferred_from_buffer_event(Nsamples,writeCallback)
self.task_AO.start()
self.writer.write_many_sample(temp)
return np.arange(Nsamples)/self.Fs,data
def close_all(self):
self.task_AI.close()
self.task_AO.close()
And in an other file :
# -*- coding: utf-8 -*-
import numpy as np
import matplotlib.pyplot as plt
import pypxi
dt = 0.5
Fs = 500
Nsamples = int(Fs*dt)
channels_In = [16]
channels_Out = [32]
pulse = np.zeros(Nsamples)
pulse[2000:3000] = 2
PXI = pypxi.PXI()
PXI.config_stream(0,1,channels_In,channels_Out,Fs)
t,data = PXI.acquire_stream(Nsamples)
plt.close()
plt.plot(t,data)
PXI.close_all()
All the functions works excepted the function "acquire_stream", wich return the error :
An internal error occurred.
Task Name: _unnamedTask<A3>
Status Code: -88700
Not very helpfull...
03-06-2023 11:10 AM
- read one sample with the analog inputs
- then, calculate the output signal with it
- and finally write the result into the analog outputs
- the inputs and outputs have also to be synchronized
Based on your requirement, you should consider using NI-DAQmx Hardware-Timed Single-Point Mode.
And since you need to process data and write new signals to the AO ASAP, you should not use Python. Python executes much slower and introduces latency to your program.
From C++ vs Python: Full Comparison - History-Computer:
Depending on the complexity of calculations, C++ is anywhere from 10 to 100 times faster than Python. Python programs also tend to use more RAM than applications built with C++. However, many programmers acknowledge that the simple syntax of Python makes it a much faster language for development.
If you have LabVIEW, there is an example available in Application Case 1 of NI-DAQmx Hardware-Timed Single Point Lateness Checking
If you don't, you can refer to the ANSI C example and make some modifications to use HWTSP instead of Continuous Sampling.
C:\Users\Public\Documents\National Instruments\NI-DAQ\Examples\DAQmx ANSI C\Synchronization\Multi-Function\Synch AI-AO