fdr1gui/BlockStackApp.py
2025-04-29 15:56:50 +02:00

331 lines
12 KiB
Python

from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QSizePolicy, QPushButton, QMessageBox
from PyQt5.QtCore import Qt
import glob
import requests
import serial
import re
import time
import math
from UpdateWorker import UpdateWorker
class BlockStackApp(QWidget):
def __init__(self, fullscreen):
super().__init__()
self.initUI(fullscreen)
self.check_usb_devices()
self.add_bottom_block()
self.update_count = 11
def initUI(self, fullscreen):
self.layout = QVBoxLayout()
self.layout.setSpacing(5)
self.layout.setContentsMargins(5, 5, 5, 5)
self.layout.setAlignment(Qt.AlignLeft | Qt.AlignTop)
self.setLayout(self.layout)
self.setWindowTitle('462filament Bench GUI')
self.setStyleSheet("""
QWidget {
background-color: #2E2E2E;
color: #FFFFFF;
height: 100px;
}
QLabel {
background-color: #4A4A4A;
border: 1px solid #5A5A5A;
padding: 2px;
margin: 2px;
height: 30px;
width: 250px;
}
QPushButton, QComboBox {
background-color: #5A5A5A;
border: 1px solid #6A6A6A;
padding: 2px;
margin: 2px;
height: 25px;
width: 100px;
}
""")
if fullscreen:
self.showFullScreen()
else:
self.showMaximized()
def check_usb_devices(self):
bench_list = []
for port in glob.glob('/dev/ttyUSB*'):
try:
with serial.Serial(port, 1000000, timeout=1) as ser:
response = ser.read_all().decode('utf-8').strip()
ser.write(b'i')
time.sleep(1)
response = ser.read_all().decode('utf-8').strip()
serial_number = self.extract_serial_number(response)
if serial_number:
print(f"Found bench on port {port}")
bench_id, bench_name = self.get_bench_id(serial_number)
if bench_id:
bench_list.append((port, bench_id, bench_name, serial_number))
else:
print(f"Second check port {port}")
response = ser.read_all().decode('utf-8').strip()
ser.write(b'i')
time.sleep(1)
response = ser.read_all().decode('utf-8').strip()
serial_number = self.extract_serial_number(response)
if serial_number:
print(f"Found bench on port {port}")
bench_id, bench_name = self.get_bench_id(serial_number)
if bench_id:
bench_list.append((port, bench_id, serial_number))
else:
print(f"Ignoring port {port}")
except serial.SerialException as e:
print(f"Could not open port {port}: {e}")
bench_list.sort(key=lambda x: x[1])
self.blocks = {}
for port, bench_id, bench_name, serial_number in bench_list:
block_id = f"{bench_id}"
self.worker = UpdateWorker(serial_number, bench_id, bench_name, port, block_id, self)
self.layout.addWidget(self.worker.block)
self.worker.change_color_signal.connect(self.change_button_color)
self.worker.update_signal.connect(self.update_block)
self.worker.start()
self.blocks[block_id] = self.worker.block
def extract_serial_number(self, response):
match = re.match(r"id:([0-9A-Fa-f]+)", response)
if match:
return match.group(1)
return None
def get_bench_id(self, serial_number):
url = f"http://462filament/api.php?data=benchid&serial={serial_number}"
try:
response = requests.get(url)
if response.status_code == 200:
data = response.json()
return data.get("id"), data.get("name")
except requests.RequestException as e:
print(f"Error making request to API: {e}")
return None
def add_bottom_block(self):
# Create a new block for the bottom section
bottom_block_widget = QWidget()
bottom_block_layout = QHBoxLayout()
bottom_block_layout.setAlignment(Qt.AlignRight|Qt.AlignBottom)
bottom_block_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
# Add a close button
btn_close = QPushButton("Close")
btn_close.clicked.connect(self.confirm_close)
bottom_block_layout.addWidget(btn_close)
btn_close.setStyleSheet(f"background-color: #FF0000;")
bottom_block_widget.setLayout(bottom_block_layout)
self.layout.addWidget(bottom_block_widget)
def confirm_close(self):
# Create a confirmation dialog
confirmation = QMessageBox.question(
self,
"Confirm Exit",
"Are you sure you want to exit?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
# Close the application if the user confirms
if confirmation == QMessageBox.Yes:
self.close()
def change_button_color(self, block_id, button_name, color):
block = self.blocks.get(block_id)
if block:
button = getattr(block, button_name, None)
if button:
button.setStyleSheet(f"background-color: {color};")
def update_mode(self, mode):
# Update button colors based on mode
if mode == 1:
self.change_button_color(self.sender().block.btn_go_left, "#0000FF")
self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A")
self.change_button_color(self.sender().block.btn_reset, "#5A5A5A")
elif mode == 2:
self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A")
self.change_button_color(self.sender().block.btn_go_right, "#0000FF")
self.change_button_color(self.sender().block.btn_reset, "#5A5A5A")
elif mode == 65535:
self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A")
self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A")
self.change_button_color(self.sender().block.btn_reset, "#5A5A5A")
else:
self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A")
self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A")
self.change_button_color(self.sender().block.btn_reset, "#0000FF")
def handle_data_received(self, data):
# Handle the data received from the serial port
print(f"Data received: {data}")
def update_block(self, is_running, dial_number, diameter_value, position_value):
ts = round(time.time() * 1000)
# Update the block with diameter_value and position_value
worker = self.sender()
diameter_avg = 0.0000
est_wgh = 0
if dial_number == 0:
worker.block.len_label.setText(f"{position_value/1000:.3f}")
if True:
if worker.reset_data:
worker.diameter_total = diameter_value
worker.diameter_count = 1
worker.diameter_min = diameter_value
worker.diameter_max = diameter_value
worker.clear_graph()
worker.data_list.clear()
worker.reset_data = False
else:
if is_running:
worker.diameter_total += diameter_value # to calc average diameter
worker.diameter_count += 1
worker.block.min_label.setText(f"{worker.diameter_min:.3f} ")
worker.block.max_label.setText(f"{worker.diameter_max:.3f} ")
worker.position_value = position_value
# push data to memory for report
worker.data_list.append((ts, dial_number, position_value, diameter_value))
# push data to graph and update it
self.update_graph(worker, dial_number, diameter_value, position_value)
# calculate and display average diameter
if is_running:
diameter_avg = worker.diameter_total / worker.diameter_count if worker.diameter_count > 0 else 0.0
worker.block.avg_label.setText(f"{diameter_avg:.4f}")
# calculate and display estimated weight
vol = math.pi * (diameter_avg / 2) ** 2 * position_value # all in mm → mm³
est_wgh = worker.density * vol/1000 # density in g·cm³, vol in mm³
worker.block.wht_label.setText(f"{est_wgh/1000:.3f} ")
def update_graph(self, worker, dial_number,diameter_value,position_value):
# def update_graph(self, worker, dial_number, diameter_value, diameter_value2, diameter_value3):
# diameter_value2 = diameter_value + random.randrange(-300, 300, 10)/1000
# diameter_value3 = diameter_value + random.randrange(-300, 300, 10)/1000
# Update the graph with the new diameter values
x_data1 = worker.line1.get_xdata()
y_data1 = worker.line1.get_ydata()
x_data2 = worker.line2.get_xdata()
y_data2 = worker.line2.get_ydata()
x_data3 = worker.line3.get_xdata()
y_data3 = worker.line3.get_ydata()
x_data1 = list(x_data1)
y_data1 = list(y_data1)
x_data2 = list(x_data2)
y_data2 = list(y_data2)
x_data3 = list(x_data3)
y_data3 = list(y_data3)
# Append new data points
match dial_number:
case 0:
x_data1.append(position_value)
y_data1.append(diameter_value)
worker.diameter_value1 = diameter_value
case 1:
x_data2.append(position_value)
y_data2.append(diameter_value)
worker.diameter_value2 = diameter_value
case 2:
x_data3.append(position_value)
y_data3.append(diameter_value)
worker.diameter_value3 = diameter_value
# Keep only the last 100 values
if not worker.is_running:
# Keep only the last 100 values
if len(y_data1) > 200:
y_data1 = y_data1[-200:]
y_data2 = y_data2[-200:]
y_data3 = y_data3[-200:]
# Update x-axis values to reflect the rolling window
x_data1 = list(range(len(y_data1)))
x_data2 = list(range(len(y_data2)))
x_data3 = list(range(len(y_data3)))
all_y_data = y_data1 + y_data2 + y_data3
worker.diameter_min = min(all_y_data)
worker.diameter_max = max(all_y_data)
worker.block.min_label.setText(f"{worker.diameter_min:.3f} ")
worker.block.max_label.setText(f"{worker.diameter_max:.3f} ")
diameter_avg = sum(all_y_data) / len(all_y_data) if all_y_data else 0
worker.block.avg_label.setText(f"{diameter_avg:.4f}")
# # worker running, trim number of values to keep graph aligned
# else:
# nvalues = min(len(y_data1), len(y_data2), len(y_data3)) + 1
# y_data1 = y_data1[-nvalues:]
# y_data2 = y_data2[-nvalues:]
# y_data3 = y_data3[-nvalues:]
# # Update x-axis values to reflect the rolling window
# x_data1 = list(range(len(y_data1)))
# x_data2 = list(range(len(y_data2)))
# x_data3 = list(range(len(y_data3)))
diameter_calc = worker.diameter_value1
diameter_n = 1
# Update the lines with the new data
worker.line1.set_data(x_data1, y_data1)
if len(y_data2) > 0:
worker.line2.set_data(x_data2, y_data2)
diameter_calc += worker.diameter_value2
diameter_n += 1
if len(y_data3) > 0:
worker.line3.set_data(x_data3, y_data3)
diameter_calc += worker.diameter_value3
diameter_n += 1
worker.diameter_value = diameter_calc / diameter_n
worker.block.dia_label.setText(f"{worker.diameter_value:.3f} ")
# Rescale the graph
worker.line1.axes.relim()
worker.line1.axes.autoscale_view()
# Redraw the canvas
worker.canvas.draw()