170 lines
5.0 KiB
Python
Executable File
170 lines
5.0 KiB
Python
Executable File
#!/usr/bin/python3
|
|
|
|
import pyvisa as visa
|
|
|
|
import sys
|
|
|
|
import numpy as np
|
|
|
|
from PyQt6.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QLabel, QLineEdit, QPushButton
|
|
|
|
from matplotlib.backends.backend_qtagg import FigureCanvas
|
|
from matplotlib.backends.backend_qtagg import \
|
|
NavigationToolbar2QT as NavigationToolbar
|
|
from matplotlib.backends.qt_compat import QtWidgets
|
|
from matplotlib.figure import Figure
|
|
|
|
|
|
class FFTWindow(QMainWindow):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.setWindowTitle("FFT Probe for RIGOL Oscilloscope")
|
|
|
|
central_widget = QWidget()
|
|
self.setCentralWidget(central_widget)
|
|
layout = QVBoxLayout()
|
|
central_widget.setLayout(layout)
|
|
|
|
self.findDevice_button = QPushButton("Find and Connect Device")
|
|
self.findDevice_button.clicked.connect(self.Find_Device)
|
|
layout.addWidget(self.findDevice_button)
|
|
|
|
self.device_lineEdit = QLineEdit()
|
|
self.device_lineEdit.setReadOnly(True)
|
|
layout.addWidget(self.device_lineEdit)
|
|
|
|
self.perform_button = QPushButton("Get Data and FFT")
|
|
self.perform_button.clicked.connect(self.perform_fft)
|
|
layout.addWidget(self.perform_button)
|
|
|
|
self.figureWF = Figure()
|
|
self.canvasWF = FigureCanvas(self.figureWF)
|
|
layout.addWidget(self.canvasWF)
|
|
|
|
self.figureFFT = Figure()
|
|
self.canvasFFT = FigureCanvas(self.figureFFT)
|
|
layout.addWidget(self.canvasFFT)
|
|
|
|
layout.addWidget(NavigationToolbar(self.canvasFFT, self))
|
|
|
|
self.isConnected = False
|
|
|
|
def Find_Device(self):
|
|
self.rm = visa.ResourceManager('@py')
|
|
self.device_lineEdit.setText("Please wait, finding device....")
|
|
haha = self.rm.list_resources()
|
|
if len(haha) >= 1 :
|
|
print( haha )
|
|
self.device_lineEdit.setText("Openning " + haha[0])
|
|
try:
|
|
self.osc = self.rm.open_resource(str(haha[0]))
|
|
except:
|
|
self.device_lineEdit.setText('Unable to open the device')
|
|
return
|
|
# osc.timeout = 25000
|
|
self.isConnected = True
|
|
haha = self.Query('*IDN?')
|
|
self.device_lineEdit.setText(haha)
|
|
else:
|
|
self.device_lineEdit.setText('Unable to find any device')
|
|
|
|
|
|
def Query(self, cmd : str) -> str :
|
|
if self.isConnected :
|
|
haha = self.osc.query(cmd).rstrip('\n')
|
|
# print(cmd + " : |" + haha + "|")
|
|
return haha
|
|
else:
|
|
return ""
|
|
|
|
def Write(self, cmd : str):
|
|
if self.isConnected :
|
|
self.osc.write(cmd)
|
|
|
|
def AskWaveParameter(self):
|
|
haha = self.Query('WAVEFORM:Preamble?')
|
|
result = haha.split(',')
|
|
# print(" Format :" + result[0])
|
|
# print(" Mode :" + result[1])
|
|
# print(" Points :" + result[2])
|
|
# print("Average :" + result[3])
|
|
# print(" X-Step :" + result[4] + " sec")
|
|
# print("X-Start :" + result[5] + " sec")
|
|
# print(" X-Ref :" + result[6] )
|
|
# print(" Y-Step :" + result[7] + " V" )
|
|
# print("Y-Start :" + result[8] + " V")
|
|
# print(" Y-Ref :" + result[9] )
|
|
return result
|
|
|
|
|
|
def perform_fft(self):
|
|
if self.isConnected == False :
|
|
return
|
|
|
|
self.Write('RUN')
|
|
self.Write('WAVEFORM:RESET')
|
|
self.Write('WAVEFORM:SOURCE CHAN1')
|
|
self.Write('WAVEFORM:FORMAT BYTE')
|
|
self.Write('WAVEFORM:MODE NORM')
|
|
|
|
para = self.AskWaveParameter()
|
|
wfSize = int(para[2])
|
|
|
|
self.Query('WAVEFORM:SOURCE?')
|
|
self.Query('WAVEFORM:MODE?')
|
|
self.Query('WAVEFORM:FORMAT?')
|
|
|
|
# print("----------------- get data ")
|
|
|
|
[status, wfLen] = self.Query('WAVEFORM:STATUS?').split(',')
|
|
if status == "IDLE" and int(wfLen) >= 0 :
|
|
print("---- get waveform data")
|
|
self.osc.write('WAVEFORM:DATA?')
|
|
binary = self.osc.read_bytes(int(wfSize) +11 + 1) #
|
|
# print(binary)
|
|
|
|
# header = binary[:11]
|
|
data_points = binary[11:]
|
|
# Decode the 1-byte data points
|
|
decoded_data = [byte for byte in data_points]
|
|
# print("Decoded data:", decoded_data)
|
|
|
|
signal = np.array(decoded_data)
|
|
signal = signal[:-1]
|
|
|
|
timeStep = float(para[4])
|
|
sampling_rate = 1./timeStep # Sampling rate (Hz)
|
|
duration = wfSize * timeStep # Duration of signal (seconds)
|
|
num_samples = wfSize
|
|
timeList = np.linspace(0, duration, num_samples)
|
|
|
|
fft_result = np.fft.fft(signal)
|
|
frequencies = np.fft.fftfreq(num_samples, 1 / sampling_rate)
|
|
|
|
print("---- do FFT")
|
|
self.figureWF.clear()
|
|
ax = self.figureWF.add_subplot(111)
|
|
ax.plot(timeList * 1e6, signal)
|
|
ax.set_title('WaveForm')
|
|
ax.set_xlabel('Time (us)')
|
|
ax.set_ylabel('Amplitude')
|
|
ax.grid(True)
|
|
self.canvasWF.draw()
|
|
|
|
self.figureFFT.clear()
|
|
ax = self.figureFFT.add_subplot(111)
|
|
# ax.plot(frequencies / 1e6, np.abs(fft_result))
|
|
ax.loglog(frequencies[:num_samples//2] / 1e6, np.abs(fft_result)[:num_samples//2]) # get the positive half
|
|
ax.set_title('FFT Result')
|
|
ax.set_xlabel('Frequency [MHz]')
|
|
ax.set_ylabel('Amplitude')
|
|
ax.set_xlim(0, 100)
|
|
ax.grid(True)
|
|
self.canvasFFT.draw()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app = QApplication(sys.argv)
|
|
window = FFTWindow()
|
|
window.show()
|
|
sys.exit(app.exec()) |