Multifunction DAQ

cancel
Showing results for 
Search instead for 
Did you mean: 

Simultaneously stream data from analog inputs and counter for incremental rotary encoder with python - NI USB-6210

Solved!
Go to solution

I have a NI USB-6210 that we are using to capture voltages on 12 analog input pins, and also connected to an incremental rotary encoder to report angular position. I have successfully gotten a python script up and running that will capture data from the analog pins and the rotary encoder using the .read() function, but this method is obviously quite slow so I can only capture data

at about 60 Hz. The intended method is to use the buffer.

 

I have a script that will collect voltage data from the analog pins at 1000 Hz, but I can't seem to get it working for the rotary encoder, let alone working simultaneously. This script was adapted from this discussion on stack overflow: https://stackoverflow.com/questions/60947373/continuous-analog-read-from-national-instruments-daq-wi...

 

Here is the code snippet that works to collect data from the analog pins:

 

def transducer_reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize_callback is passed to num_samples
    global transducer_data

    if running:
        analog_buffer_in = np.zeros((analog_channels_in, num_samples))  
        transudcerDataStreamIn.read_many_sample(analog_buffer_in, num_samples, timeout=nidaqmx.constants.WAIT_INFINITELY)
        
        transducer_data = np.append(transducer_data, analog_buffer_in, axis=1)  # appends buffered data to total variable data

    return 0  


transducer_task = nidaqmx.Task()
transducer_task.ai_channels.add_ai_voltage_chan("Dev1/ai0:5, Dev1/ai8:13")  # has to match with chans_in
transducer_task.timing.cfg_samp_clk_timing(rate=sampling_freq_in, sample_mode=AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)

transducerDataStreamIn = AnalogMultiChannelReader(transducer_task.in_stream)
transducer_task.register_every_n_samples_acquired_into_buffer_event(bufsize_callback, transducer_reading_task_callback)

running = True
transducer_task.start()

 

However, naively adapting the above code to try collect rotary encoder data does not seem to work. I believe the timing configuration is supposed to be implicit, but I get an error stating that this timing type is invalid for this channel, and to use DAQmx_Val_SampClk or DAQmx_Val_OnDemand instead. Reconfiguring to sample clock throws an error stating that an external timer must be used, and I can't find anything on the 'OnDemand' option in the nidaqmx API documentation?

 

def encoder_reading_task_callback(task_idx, event_type, num_samples, callback_data):  # bufsize_callback is passed to num_samples
    global encoder_data

    if running:

        counter_buffer_in = np.zeros((counter_channels_in, num_samples))  
        counterDataStreamIn.read_many_sample_double(counter_buffer_in, num_samples, timeout=nidaqmx.constants.WAIT_INFINITELY)
        
        encoder_data = np.append(encoder_data, counter_buffer_in, axis=1)  # appends buffered data to total variable data

    return 0 

encoder_task = nidaqmx.Task()
encoder_task.ci_channels.add_ci_ang_encoder_chan('Dev1/ctr1', decoding_type=EncoderType.X_4, units=AngleUnits.DEGREES, pulses_per_rev=3000)
encoder_task.timing.cfg_implicit_timing(sample_mode=AcquisitionType.CONTINUOUS)

counterDataStreamIn = CounterReader(encoder_task.in_stream)
encoder_task.register_every_n_samples_acquired_into_buffer_event(bufsize_callback, encoder_reading_task_callback)

running = True
encoder_task.start()

 

Ultimately I am hoping to be able to stream analog pin data with an associated rotary encoder angular position at ~1000 Hz. If there is a better implementation method that someone can point me to I'd really appreciate it!

0 Kudos
Message 1 of 3
(758 Views)
Solution
Accepted by topic author Xyphota

Python is an interpreter language hence it is relatively slower than C programming language. For streaming using buffer, it is okay to go with Python since you don't call the API very often. But if you are going to do polling with on-demand timing, you should consider another language.

 

You can use the AI sample clock to time the counter task. It will synchronize the encoder task to the AI task. Make sure you start the encoder task before the AI task.

encoder_task.timing.cfg_samp_clk_timing(rate=sampling_freq_in, source="Dev1/ai/SampleClock" sample_mode=AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)

-------------------------------------------------------
Control Lead | Intelline Inc
Message 2 of 3
(730 Views)

Thank you so much! After sorting out some new bugs, I was able to get it working. I was wondering how to get encoder routine to reference the clock that the analog pins routine is using.

 

In case anyone else stumbles across this thread, the correct syntax requires a leading '/' character, so this is my line is now:

 

encoder_task.timing.cfg_samp_clk_timing(rate=sampling_freq_in, source="/Dev1/ai/SampleClock" sample_mode=AcquisitionType.CONTINUOUS,samps_per_chan=buffer_in_size_cfg)

 

This thread was helpful for diagnosing the syntax:

https://forums.ni.com/t5/Multifunction-DAQ/Python-multi-tasking-using-cfg-samp-clk-timing-question/t...

Message 3 of 3
(712 Views)