Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

Two Channel analog output

THIS PROGRAM FOR SINGLE CHANNEL WORKS GREAT .. EXECUTES WRITING OF TEN SAWTOOTH CYCLES...... (NOTE THAT THE START COMMAND IS AFTER THE WRITE COMMAND!!!!

 

I HAD TO REMOVE THE PROGRAM FROM THE MESSAGE BODY DUE TO SIZE LIMITATIONS>> BUT I HAVE ATTACHED THE SAME.. 

 

 

 

-----------------------------------------------------------------

BUT THIS PROGRAM FOR TWO CHANNELS GIVES ME THE FOLLOWING ERROR:  (SPECIFIED OPERATION CANNOT BE PERFORMED WHILE THE TASK IS RUNNING)..

---------------------------------------------------------------------------------------------------------------------

import numpy

from PyDAQmx.DAQmxFunctions import *
from PyDAQmx.DAQmxConstants import *

SAMPLING_RATE_R = 1000
SAMPLE_SIZE_R = 1000

SAMPLE_SIZE_WX = 10000
CYCLES_X = 10
SAMPLING_RATE_WX = 500

SAMPLES_PER_CYCLE_X = int(SAMPLE_SIZE_WX/CYCLES_X)
V_MAX_X = 0
V_MIN_X = -3


SAMPLE_SIZE_WY = 1

NUM_ROWS = 5

class MultiChannelAnalogInput():
    """Class to create a multi-channel analog input
    
    Usage: AI = MultiChannelInput(physicalChannel)
        physicalChannel: a string or a list of strings
    optional parameter: limit: tuple or list of tuples, the AI limit values
                        reset: Boolean
    Methods:
        read(name), return the value of the input name
        readAll(), return a dictionary name:value
    """
    def __init__(self,physicalChannel, limit = None, reset = False):
        if type(physicalChannel) == type(""):
            self.physicalChannel = [physicalChannel]
        else:
            self.physicalChannel = physicalChannel
        self.numberOfChannel = physicalChannel.__len__()
        if limit is None:
            self.limit = dict([(name, (-10.0,10.0)) for name in self.physicalChannel])
        elif type(limit) == tuple:
            self.limit = dict([(name, limit) for name in self.physicalChannel])
        else:
            self.limit = dict([(name, limit[i]) for  i,name in enumerate(self.physicalChannel)])           
        if reset:
            DAQmxResetDevice(physicalChannel[0].split('/')[0] )
    def configure(self):
        # Create one task handle per Channel
        taskHandles = dict([(name,TaskHandle(0)) for name in self.physicalChannel])
        for name in self.physicalChannel:
            DAQmxCreateTask("",byref(taskHandles[name]))
            DAQmxCreateAIVoltageChan(taskHandles[name],name,"",DAQmx_Val_RSE,
                                     self.limit[name][0],self.limit[name][1],
                                     DAQmx_Val_Volts,None)
            DAQmxCfgSampClkTiming(taskHandles[name],"",SAMPLING_RATE_R,DAQmx_Val_Rising,
                                  DAQmx_Val_FiniteSamps,1000)
        self.taskHandles = taskHandles
    def readAll(self):
        return dict([(name,self.read(name)) for name in self.physicalChannel])
    def read(self,name = None):
        if name is None:
            name = self.physicalChannel[0]
        taskHandle = self.taskHandles[name]                    
        DAQmxStartTask(taskHandle)
        
        Voltage_Read = numpy.zeros((SAMPLE_SIZE_R,), dtype=numpy.float64)
        read = int32()
        
        DAQmxReadAnalogF64(taskHandle,SAMPLE_SIZE_R,10.0,DAQmx_Val_GroupByChannel,Voltage_Read,SAMPLE_SIZE_R,byref(read),None)
        
        DAQmxStopTask(taskHandle)
        
        avg_voltage = sum(Voltage_Read)/float(len(Voltage_Read))
        return (avg_voltage)


class MultiChannelAnalogOutput():
    """Class to create a multi-channel analog output
    
    Usage: AI = MultiChannelOutput(physicalChannel)
        physicalChannel: a string or a list of strings
    optional parameter: limit: tuple or list of tuples, the AI limit values
                        reset: Boolean
    Methods:
        write(name), return the value of the input name
        writeAll(), return a dictionary name:value
    """
    def __init__(self, physicalChannel, voltage, limit = None, reset = False):
        if type(physicalChannel) == type(""):
            self.physicalChannel = [physicalChannel]
        else:
            self.physicalChannel = physicalChannel
        self.numberOfChannel = physicalChannel.__len__()
        if limit is None:
            self.limit = dict([(name, (-9,9)) for name in self.physicalChannel])
        elif type(limit) == tuple:
            self.limit = dict([(name, limit) for name in self.physicalChannel])
        else:
            self.limit = dict([(name, limit[i]) for  i,name in enumerate(self.physicalChannel)])           
        if reset:
            DAQmxResetDevice(physicalChannel[0].split('/')[0] )
            
        self.voltage = dict([(name, voltage[i]) for  i,name in enumerate(self.physicalChannel)])            
            
    def configure(self):
        # Create one task handle per Channel
        taskHandles = dict([(name,TaskHandle(0)) for name in self.physicalChannel])
        for name in self.physicalChannel:
            DAQmxCreateTask("",byref(taskHandles[name]))
            DAQmxCreateAOVoltageChan(taskHandles[name],name,"", 
                                     self.limit[name][0],self.limit[name][1],
                                     DAQmx_Val_Volts,None)
            DAQmxCfgSampClkTiming(taskHandles[name],"",SAMPLING_RATE_WX,DAQmx_Val_Rising,
                                  DAQmx_Val_FiniteSamps,SAMPLE_SIZE_WX)
        self.taskHandles = taskHandles
    def writeAll(self):
        return dict([(name,self.write(name)) for name in self.physicalChannel])
    def write(self,name = None):
        if name is None:
            name = self.physicalChannel[0]
        taskHandle = self.taskHandles[name]     
        volt = self.voltage[name]               
        
        print(name)
        print(volt)
        
        try:
                    
            if name == "DAQ/ao1":
            #Generate Voltage Values (Triangular)
                Voltage_X = numpy.zeros(SAMPLE_SIZE_WX, dtype=numpy.float64)
                print("Voltage Generated for Channel X:")
                for i in range(0,SAMPLE_SIZE_WX):
                    j = i%SAMPLES_PER_CYCLE_X
                    if j <= (SAMPLES_PER_CYCLE_X)/2:                         
                        Voltage_X[i] = V_MIN_X + j * (V_MAX_X - V_MIN_X)/(SAMPLES_PER_CYCLE_X/2)
                    else:
                        Voltage_X[i] = V_MAX_X - (j-(SAMPLES_PER_CYCLE_X/2)) * (V_MAX_X - V_MIN_X)/(SAMPLES_PER_CYCLE_X/2)
                print(Voltage_X)
            
                DAQmxWriteAnalogF64(taskHandle,SAMPLE_SIZE_WX,1,10.0,DAQmx_Val_GroupByChannel,Voltage_X,None,None)
            
                        
            elif name == "DAQ/ao0":
                Voltage_Y = numpy.zeros(SAMPLE_SIZE_WY, dtype=numpy.float64)
                Voltage_Y[0] = volt + 0.5 
                print("Voltage Generated for Channel Y:")
                print(Voltage_Y)
                DAQmxWriteAnalogF64(taskHandle,SAMPLE_SIZE_WY,1,10.0,DAQmx_Val_GroupByChannel,Voltage_Y,None,None)
                
            DAQmxStartTask(taskHandle)          #I Start the TASK AFTER THE WRITE STATEMENT!!!!!
            
            DAQmxWaitUntilTaskDone(taskHandle, 20)
            
            
        
        except DAQError as err:
            print ("DAQmx Error: %s"%err)
        
        finally:
            if taskHandle:
                # DAQmx Stop Code
                DAQmxStopTask(taskHandle)
                DAQmxClearTask(taskHandle)
            
            
if __name__ == '__main__':       
    
    for k in range(0,NUM_ROWS):
        
        multipleAI = MultiChannelAnalogInput(["DAQ/ai3","DAQ/ai5"])
        multipleAI.configure()
        Voltage_In = multipleAI.readAll()
        print(Voltage_In)
        print("at row", k+1)
        
        Vol_X = Voltage_In["DAQ/ai5"]
        Vol_Y = Voltage_In["DAQ/ai3"]
        
        
        multipleAO = MultiChannelAnalogOutput(["DAQ/ao1", "DAQ/ao0"], [Vol_X, Vol_Y])
        #multipleAO = MultiChannelAnalogOutput(["DAQ/ao1", "DAQ/ao0"])
        multipleAO.configure()
        multipleAO.writeAll()

If I start the TASK BEFORE THE WRITE STATEMENT... THE Write command EXECUTES ONLY ONE CYCLE OF THE SAWTOOTH WAVEWORM... instead of all the 10 cycles.. Something's off.. about the way SAMPLING_RATE and SAMPLE_SIZE behave.. 

 

Download All
0 Kudos
Message 1 of 4
(4,241 Views)

Hi Ravi,

 

It is intended behavior that you write before you start the task in an Analog Output in DAQmx. In order to understand why it is important to understand what the DAQmx write actually does:

 

“The NI-DAQmx Write Function moves samples from the Application Development Environment (ADE) Memory to the PC Buffer in RAM.”

 

When you start the task, then the samples are moved from the PC Buffer to the DAQ FIFO where they are written to the physical channel by the device. The DAQmxWriteAnalog 64 function also has an autostart parameter that would start the task automatically with the call to DAQmx write:

 

DAQmxWriteAnalogF64

http://zone.ni.com/reference/en-XX/help/370471AE-01/daqmxcfunc/daqmxwriteanalogf64/

 

It looks like you are using a third-party API, however, so please note that it would not be officially supported by us.  The general concepts and structure of using the DAQmx driver still applies as I outlined above.

 

Regards,

 

Kyle S.
Applications Engineer
National Instruments
http://www.ni.com/support

0 Kudos
Message 2 of 4
(4,217 Views)

Hi Kyle, thanks a lot for the message... I have made it to work.. but I am not sure I quite understand the reason.. so I am putting down the most important function calls with a brief explanation of the program.. hope youcan illuminate.. 

 

I am running a loop, where

 

(A) A triangular voltage waveform is executed along one Channel (DAQ/ao1) and

 

(B) just one voltage value is written to second channel (DAQ/ao0)

 

So during configuration: 

as shown in the highlighted text.. I only configure the sample clock for DAQ/ao1..  since there's only one value being written to DAQ/ao0

 

def configure(self):
# Create one task handle per Channel
taskHandles = dict([(name,TaskHandle(0)) for name in self.physicalChannel])
for name in self.physicalChannel:
DAQmxCreateTask("",byref(taskHandles[name]))
DAQmxCreateAOVoltageChan(taskHandles[name],name,"",
self.limit[name][0],self.limit[name][1],DAQmx_Val_Volts,None)

if name == "DAQ/ao1":
DAQmxCfgSampClkTiming(taskHandles[name],"",SAMPLING_RATE_WX,DAQmx_Val_Rising,
DAQmx_Val_FiniteSamps,SAMPLE_SIZE_WX)

self.taskHandles = taskHandles

During the Write process:

 

(A) for DAQ/ao1 I call the Write function first (which would move samples from the ADE Memory to the PC Buffer ) followed by the StartTask function.. However I have to wait for the task to complete.. otherwise I get error.. The Autostart flag is set to 0.. 

 

(B) I start the task and then call the write function.. The Autostart flag is set to 0.. Any other changes to the ordering of the function calls returns errors.. 

 

if name == "DAQ/ao1":
            #Generate Voltage Values (Triangular)
                Voltage_X = numpy.zeros(SAMPLE_SIZE_WX, dtype=numpy.float64)
                print("Voltage Generated for Channel X:")
                for i in range(0,SAMPLE_SIZE_WX):
                    j = i%SAMPLES_PER_CYCLE_X
                    if j <= (SAMPLES_PER_CYCLE_X)/2:                         
                        Voltage_X[i] = V_MIN_X + j * (V_MAX_X - V_MIN_X)/(SAMPLES_PER_CYCLE_X/2)
                    else:
                        Voltage_X[i] = V_MAX_X - (j-(SAMPLES_PER_CYCLE_X/2)) * (V_MAX_X - V_MIN_X)/(SAMPLES_PER_CYCLE_X/2)
                #print(Voltage_X)
                DAQmxWriteAnalogF64(taskHandle,SAMPLE_SIZE_WX,0,10.0,DAQmx_Val_GroupByChannel,
Voltage_X,None,None) DAQmxStartTask(taskHandle) DAQmxWaitUntilTaskDone(taskHandle, 60)
elif name == "DAQ/ao0": Voltage_Y = numpy.zeros(SAMPLE_SIZE_WY, dtype=numpy.float64) Voltage_Y[0] = volt + 1.0 print("Voltage Generated for Channel Y:") print(Voltage_Y) DAQmxStartTask(taskHandle) DAQmxWriteAnalogF64(taskHandle,SAMPLE_SIZE_WY,0,10.0,
DAQmx_Val_GroupByChannel,Voltage_Y,None,None)

Can you let me know if this makes sense.. and if so why? and is there a better way to do this?

 

thanks and regards,

 

Ravi

 

0 Kudos
Message 3 of 4
(4,153 Views)

Hi Ravi,

 

Please note that since you are using a third-party API, that what you are doing would not be officially supported by us. 

 

The reason that the single sample write is different is that he output is not buffered. Please refer to the following help page for an explanation:

 

DAQmxWriteAnalogF64

 

http://zone.ni.com/reference/en-XX/help/370471AE-01/daqmxcfunc/daqmxwriteanalogf64/

 

Regards,

 

Kyle S.
Applications Engineer
National Instruments
http://www.ni.com/support

0 Kudos
Message 4 of 4
(4,124 Views)