from PyQt5.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QLabel, QSizePolicy, QPushButton, QMessageBox from PyQt5.QtCore import Qt import glob import requests import serial import re import time import math from UpdateWorker import UpdateWorker class BlockStackApp(QWidget): def __init__(self, fullscreen): super().__init__() self.initUI(fullscreen) self.check_usb_devices() self.add_bottom_block() self.update_count = 11 def initUI(self, fullscreen): self.layout = QVBoxLayout() self.layout.setSpacing(5) self.layout.setContentsMargins(5, 5, 5, 5) self.layout.setAlignment(Qt.AlignLeft | Qt.AlignTop) self.setLayout(self.layout) self.setWindowTitle('462filament Bench GUI') self.setStyleSheet(""" QWidget { background-color: #2E2E2E; color: #FFFFFF; height: 100px; } QLabel { background-color: #4A4A4A; border: 1px solid #5A5A5A; padding: 2px; margin: 2px; height: 30px; width: 250px; } QPushButton, QComboBox { background-color: #5A5A5A; border: 1px solid #6A6A6A; padding: 2px; margin: 2px; height: 25px; width: 100px; } """) if fullscreen: self.showFullScreen() else: self.showMaximized() def check_usb_devices(self): bench_list = [] for port in glob.glob('/dev/ttyUSB*'): try: with serial.Serial(port, 1000000, timeout=1) as ser: response = ser.read_all().decode('utf-8').strip() ser.write(b'i') time.sleep(1) response = ser.read_all().decode('utf-8').strip() serial_number = self.extract_serial_number(response) if serial_number: print(f"Found bench on port {port}") bench_id, bench_name = self.get_bench_id(serial_number) if bench_id: bench_list.append((port, bench_id, bench_name, serial_number)) else: print(f"Second check port {port}") response = ser.read_all().decode('utf-8').strip() ser.write(b'i') time.sleep(1) response = ser.read_all().decode('utf-8').strip() serial_number = self.extract_serial_number(response) if serial_number: print(f"Found bench on port {port}") bench_id, bench_name = self.get_bench_id(serial_number) if bench_id: bench_list.append((port, bench_id, serial_number)) else: print(f"Ignoring port {port}") except serial.SerialException as e: print(f"Could not open port {port}: {e}") bench_list.sort(key=lambda x: x[1]) self.blocks = {} for port, bench_id, bench_name, serial_number in bench_list: block_id = f"{bench_id}" self.worker = UpdateWorker(serial_number, bench_id, bench_name, port, block_id, self) self.layout.addWidget(self.worker.block) self.worker.change_color_signal.connect(self.change_button_color) self.worker.update_signal.connect(self.update_block) self.worker.start() self.blocks[block_id] = self.worker.block def extract_serial_number(self, response): match = re.match(r"id:([0-9A-Fa-f]+)", response) if match: return match.group(1) return None def get_bench_id(self, serial_number): url = f"http://462filament/api.php?data=benchid&serial={serial_number}" try: response = requests.get(url) if response.status_code == 200: data = response.json() return data.get("id"), data.get("name") except requests.RequestException as e: print(f"Error making request to API: {e}") return None def add_bottom_block(self): # Create a new block for the bottom section bottom_block_widget = QWidget() bottom_block_layout = QHBoxLayout() bottom_block_layout.setAlignment(Qt.AlignRight|Qt.AlignBottom) bottom_block_widget.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) # Add a close button btn_close = QPushButton("Close") btn_close.clicked.connect(self.confirm_close) bottom_block_layout.addWidget(btn_close) btn_close.setStyleSheet(f"background-color: #FF0000;") bottom_block_widget.setLayout(bottom_block_layout) self.layout.addWidget(bottom_block_widget) def confirm_close(self): # Create a confirmation dialog confirmation = QMessageBox.question( self, "Confirm Exit", "Are you sure you want to exit?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) # Close the application if the user confirms if confirmation == QMessageBox.Yes: self.close() def change_button_color(self, block_id, button_name, color): block = self.blocks.get(block_id) if block: button = getattr(block, button_name, None) if button: button.setStyleSheet(f"background-color: {color};") def update_mode(self, mode): # Update button colors based on mode if mode == 1: self.change_button_color(self.sender().block.btn_go_left, "#0000FF") self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A") self.change_button_color(self.sender().block.btn_reset, "#5A5A5A") elif mode == 2: self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A") self.change_button_color(self.sender().block.btn_go_right, "#0000FF") self.change_button_color(self.sender().block.btn_reset, "#5A5A5A") elif mode == 65535: self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A") self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A") self.change_button_color(self.sender().block.btn_reset, "#5A5A5A") else: self.change_button_color(self.sender().block.btn_go_left, "#5A5A5A") self.change_button_color(self.sender().block.btn_go_right, "#5A5A5A") self.change_button_color(self.sender().block.btn_reset, "#0000FF") def handle_data_received(self, data): # Handle the data received from the serial port print(f"Data received: {data}") def update_block(self, is_running, dial_number, diameter_value, position_value): ts = round(time.time() * 1000) # Update the block with diameter_value and position_value worker = self.sender() diameter_avg = 0.0000 est_wgh = 0 if dial_number == 0: worker.block.len_label.setText(f"{position_value/1000:.3f}") if True: if worker.reset_data: worker.diameter_total = diameter_value worker.diameter_count = 1 worker.diameter_min = diameter_value worker.diameter_max = diameter_value worker.clear_graph() worker.data_list.clear() worker.reset_data = False else: if is_running: worker.diameter_total += diameter_value # to calc average diameter worker.diameter_count += 1 worker.block.min_label.setText(f"{worker.diameter_min:.3f} ") worker.block.max_label.setText(f"{worker.diameter_max:.3f} ") worker.position_value = position_value # push data to memory for report worker.data_list.append((ts, dial_number, position_value, diameter_value)) # push data to graph and update it self.update_graph(worker, dial_number, diameter_value, position_value) # calculate and display average diameter if is_running: diameter_avg = worker.diameter_total / worker.diameter_count if worker.diameter_count > 0 else 0.0 worker.block.avg_label.setText(f"{diameter_avg:.4f}") # calculate and display estimated weight vol = math.pi * (diameter_avg / 2) ** 2 * position_value # all in mm → mm³ est_wgh = worker.density * vol/1000 # density in g·cm³, vol in mm³ worker.block.wht_label.setText(f"{est_wgh/1000:.3f} ") def update_graph(self, worker, dial_number,diameter_value,position_value): # def update_graph(self, worker, dial_number, diameter_value, diameter_value2, diameter_value3): # diameter_value2 = diameter_value + random.randrange(-300, 300, 10)/1000 # diameter_value3 = diameter_value + random.randrange(-300, 300, 10)/1000 # Update the graph with the new diameter values x_data1 = worker.line1.get_xdata() y_data1 = worker.line1.get_ydata() x_data2 = worker.line2.get_xdata() y_data2 = worker.line2.get_ydata() x_data3 = worker.line3.get_xdata() y_data3 = worker.line3.get_ydata() x_data1 = list(x_data1) y_data1 = list(y_data1) x_data2 = list(x_data2) y_data2 = list(y_data2) x_data3 = list(x_data3) y_data3 = list(y_data3) # Append new data points match dial_number: case 0: x_data1.append(position_value) y_data1.append(diameter_value) worker.diameter_value1 = diameter_value case 1: x_data2.append(position_value) y_data2.append(diameter_value) worker.diameter_value2 = diameter_value case 2: x_data3.append(position_value) y_data3.append(diameter_value) worker.diameter_value3 = diameter_value # Keep only the last 100 values if not worker.is_running: # Keep only the last 100 values if len(y_data1) > 200: y_data1 = y_data1[-200:] y_data2 = y_data2[-200:] y_data3 = y_data3[-200:] # Update x-axis values to reflect the rolling window x_data1 = list(range(len(y_data1))) x_data2 = list(range(len(y_data2))) x_data3 = list(range(len(y_data3))) all_y_data = y_data1 + y_data2 + y_data3 worker.diameter_min = min(all_y_data) worker.diameter_max = max(all_y_data) worker.block.min_label.setText(f"{worker.diameter_min:.3f} ") worker.block.max_label.setText(f"{worker.diameter_max:.3f} ") diameter_avg = sum(all_y_data) / len(all_y_data) if all_y_data else 0 worker.block.avg_label.setText(f"{diameter_avg:.4f}") # # worker running, trim number of values to keep graph aligned # else: # nvalues = min(len(y_data1), len(y_data2), len(y_data3)) + 1 # y_data1 = y_data1[-nvalues:] # y_data2 = y_data2[-nvalues:] # y_data3 = y_data3[-nvalues:] # # Update x-axis values to reflect the rolling window # x_data1 = list(range(len(y_data1))) # x_data2 = list(range(len(y_data2))) # x_data3 = list(range(len(y_data3))) diameter_calc = worker.diameter_value1 diameter_n = 1 # Update the lines with the new data worker.line1.set_data(x_data1, y_data1) if len(y_data2) > 0: worker.line2.set_data(x_data2, y_data2) diameter_calc += worker.diameter_value2 diameter_n += 1 if len(y_data3) > 0: worker.line3.set_data(x_data3, y_data3) diameter_calc += worker.diameter_value3 diameter_n += 1 worker.diameter_value = diameter_calc / diameter_n worker.block.dia_label.setText(f"{worker.diameter_value:.3f} ") # Rescale the graph worker.line1.axes.relim() worker.line1.axes.autoscale_view() # Redraw the canvas worker.canvas.draw()