Source code for rubu.temas

import socket
import urllib.request
import cv2
from threading import Thread, Lock
import threading
from datetime import datetime
import numpy as np


[docs] class Connect: """This class alows the Temas connection. """
[docs] def __init__(self, hostname="temas", ip_address=""): """Initial values of class Connect.""" global ip ip = "" if ip_address == "": ip = self.get_ip_from_hostname(hostname) else: ip = ip_address self.ip = ip print(ip)
[docs] def get_ip_from_hostname(self, hostname, max_attempts=4): """Attempts to determine the IP address of the hostname.""" for attempt in range(max_attempts): try: ip = socket.gethostbyname(hostname) return ip except socket.gaierror: print(f"Attempt {attempt + 1}/{max_attempts}: Could not resolve hostname '{hostname}'. Retrying...") print(f"Error: Could not resolve hostname '{hostname}' after {max_attempts} attempts.") return None
class Socket: """This class send commands to Temas """ def __init__(self, port, ip_address=""): """Initial values of class Socket.""" global ip if ip_address == "": self.ip = ip else: self.ip = ip_address self.port = port self.buffer = "" self.data = "" def send_cmd(self, command, text=False, path=""): """Send commands :param command: A command is given :type command: List, optional :return: Data of socket :class:`temas.send_cmd` :raises: Exception """ try: self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.client.connect((self.ip, self.port)) self.client.sendall((command).encode()) if text: response = '' while True: buffer = self.client.recv(4096) if not buffer: break response += buffer.decode("utf-8") now = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") filename = path + now + '-laser.ply' with open(filename, 'w') as f: text2 = response.replace("\\n", "\n") text2 = text2.replace("b'", "") text2 = text2.replace("'", "") f.write(text2) # f.write(self.data) self.client.close() else: self.buffer = self.client.recv(4096) self.data = self.buffer.decode() self.client.close() except: raise Exception("Sending failed - IP: " + self.ip + " PORT: " + str(self.port)) return self.data
[docs] class Camera: """This class allows the control of camera using urllib."""
[docs] def __init__(self, port=8081, ip_address=""): """Initializes the Camera class""" if ip_address == "": global ip self.ip = ip else: self.ip = ip_address self.port = port self.stream = bytes() self.html_page = f"http://{self.ip}:{self.port}/stream.mjpg" self.file = urllib.request.urlopen(self.html_page) self.frame = None self.kill = True self.lock = Lock() self.thread = Thread(target=self.update, args=()) self.thread.daemon = True
[docs] def update(self): """Continuously fetches frames from the MJPEG stream.""" while not self.kill: try: chunk = self.file.read(4096) if not chunk: continue self.stream += chunk a = self.stream.find(b'\xff\xd8') # Start of JPEG b = self.stream.find(b'\xff\xd9') # End of JPEG if a != -1 and b != -1: jpg = self.stream[a:b + 2] self.stream = self.stream[b + 2:] img_array = np.frombuffer(jpg, dtype=np.uint8) frame = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if frame is not None: with self.lock: self.frame = frame except Exception as e: print(f"Camera stream error: {e}") continue
[docs] def start_thread(self): """Starts the thread to fetch frames.""" self.kill = False self.thread.start()
[docs] def stop_thread(self): """Stops the thread.""" self.kill = True self.file.close()
[docs] def get_frame(self): """Returns the most recent frame :return: Frame :rtype: numpy.ndarray or None """ with self.lock: return self.frame
[docs] def set_exposure_time(self, microseconds): """Set exposure time of visual camera 50-33000µs :return: Exposure time :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_exposure/" + str(microseconds) response = urllib.request.urlopen(url) return response.read().decode()
[docs] def set_brightness(self, brightness_percent): """Set brightness of visual camera 0-100 :return: Brightness :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_brightness/" + str(brightness_percent) response = urllib.request.urlopen(url) return response.read().decode()
[docs] def set_saturation(self, saturation_percent): """Set saturation of visual camera 0-100 :return: Saturation :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_saturation/" + str(saturation_percent) response = urllib.request.urlopen(url) return response.read().decode()
[docs] def set_contrast(self, contrast_percent): """Set contrast of visual camera 0-100 :return: Contrast :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_contrast/" + str(contrast_percent) response = urllib.request.urlopen(url) return response.read().decode()
[docs] def set_lens_position(self, lens_position_percent): """Set lens position of visual camera 0-100 :return: Lens position :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_lens_position/" + str(lens_position_percent) response = urllib.request.urlopen(url) return response.read().decode()
[docs] def set_gain(self, gain_value): """Set gain of visual camera 1.123-16.0 :return: Gain :rtype: String """ url = "http://" + self.ip + ":" + str(self.port) + "/set_gain/" + str(float(gain_value)) response = urllib.request.urlopen(url) return response.read().decode()
[docs] class Control: """This class alows the control of motors and laser. """
[docs] def __init__(self, port=8082, ip_address=""): self.lock = threading.Lock() self.port_control = port global ip self.ip = ip if ip_address == "" else ip_address try: self.temas = Socket(self.port_control, ip_address=self.ip) except Exception as e: print(f"Fehler beim Erstellen des Sockets: {e}")
[docs] def get_pcl(self, path): """Retrieve the latest point cloud from the laser scan (e.g., located at C:/temp/). :return: .ply file """ self.temas.send_cmd('get_pcl\n', text=True, path=path)
[docs] def distance(self): """Laser distance measurment :return: Value of the distance [cm] :rtype: String """ value = self.temas.send_cmd('distance\n') return value
[docs] def mean_distance(self): """Mean laser distance measurement :return: Value of the mean_distance [cm] :rtype: String """ value = self.temas.send_cmd('distance_mean_fix_number\n') return value
[docs] def move_pos(self, phi, theta): """Move to the specified pan and tilt angles in degrees (e.g., 12.5, 11) with a step size of 0.5°. :return: Status message indicating if the move was successful. Returns True if successful. :rtype: String """ if (phi * 2) % 1 != 0 or (theta * 2) % 1 != 0: return "Error: Only values in 0.5° steps are allowed." value = self.temas.send_cmd('move_pos|' + str(phi) + '|' + str(theta) + '|' + '\n') return value
[docs] def get_pos(self): """Get motor positions :return: pan, tilt positions [°] :rtype: String """ value = self.temas.send_cmd('get_position\n') return value
[docs] def move_home(self): """Move to home position""" value = self.temas.send_cmd('move_home\n') return value
[docs] def move(self, direction): """Sends a movement command to the motor.""" with self.lock: value = self.temas.send_cmd(f"move_{direction}\n") return value
[docs] def move_threaded(self, direction): """Starts the movement in a separate thread.""" threading.Thread(target=self.move, args=(direction,), daemon=True).start()
[docs] def move_right(self): """Move motor right in 1° steps""" self.move_threaded("right")
[docs] def move_left(self): """Move motor left in 1° steps""" self.move_threaded("left")
[docs] def move_up(self): """Move motor up in 1° steps""" self.move_threaded("up")
[docs] def move_down(self): """Move motor down in 1° steps""" self.move_threaded("down")
[docs] def move_right_fine(self): """Move motor right in 0.5° steps""" self.move_threaded("right_fine")
[docs] def move_left_fine(self): """Move motor left in 0.5° steps""" self.move_threaded("left_fine")
[docs] def move_up_fine(self): """Move motor up in 0.5° steps""" self.move_threaded("up_fine")
[docs] def move_down_fine(self): """Move motor down in 0.5° steps""" self.move_threaded("down_fine")
[docs] def start_point_cloud_scan(self, theta1, theta2, phi1, phi2, color=1): """ Starts a point cloud scan between the specified pan and tilt angle ranges. :param theta1: Minimum elevation angle in degrees (-30 to 90, step size 0.5°) :param theta2: Maximum elevation angle in degrees (-30 to 90, step size 0.5°) :param phi1: Minimum azimuth angle in degrees (-60 to 60, step size 0.5°) :param phi2: Maximum azimuth angle in degrees (-60 to 60, step size 0.5°) :param color: Color mode (default = 1) :return: Status message indicating if the scan was started successfully. Returns True if successful. :rtype: String """ # Hilfsfunktion zur Schrittgrößenprüfung def is_valid_angle(value, min_val, max_val): return min_val <= value <= max_val and (value * 2) % 1 == 0 if not all([ is_valid_angle(theta1, -30, 90), is_valid_angle(theta2, -30, 90), is_valid_angle(phi1, -60, 60), is_valid_angle(phi2, -60, 60) ]): return "Error: Angles must be in valid ranges and in 0.5° steps." value = self.temas.send_cmd( 'start_point_cloud_scan|' + str(theta1) + '|' + str(theta2) + '|' + str(phi1) + '|' + str(phi2) + '|' + str( color) + '|' + '\n') return value
[docs] def stop_point_cloud_scan(self): """Stop point cloud scan :return: Status message indicating if the scan was stopped successfully. Returns True if successful. :rtype: String """ value = self.temas.send_cmd('stop_point_cloud_scan\n') return value
[docs] def get_point_cloud_scan_percent(self): """Point cloud scan status in perecent :return: Point cloud scan status [%] :rtype: String """ value = self.temas.send_cmd('get_point_cloud_scan_percent\n') return value
[docs] class Common: """This class included the common settings of Temas"""
[docs] def __init__(self, port=8083, ip_address=""): """Initial values of class Common""" global ip if ip_address == "": self.ip = ip else: self.ip = ip_address self.port = port try: self.temas = Socket(self.port, ip_address=self.ip) except: pass
[docs] def shutdown(self): """Shutdown Temas :return: Status message indicating if the shutdown was successful. Returns True if successful. :rtype: String """ value = self.temas.send_cmd('temas_shutdown\n') return value
[docs] def restart(self): """Restart Temas :return: Status message indicating if the restart was successful. Returns True if successful. :rtype: String """ value = self.temas.send_cmd('temas_restart\n') return value
[docs] def get_temperature(self): """Read Temperature of Temas :return: Temperature Temas [°C] :rtype: String """ value = self.temas.send_cmd('get_temperature\n') return value
[docs] def get_ip(self): """Get ip address of Temas :return: Ip address :rtype: String """ global ip value = ip return value
[docs] def get_sn(self): """Get serial number of Temas :return: Serial number is returned :rtype: String """ value = self.temas.send_cmd('get_sn\n') return value
[docs] def get_fw_version(self): """Get firmware version of Temas :return: Firmware version is returned :rtype: String """ value = self.temas.send_cmd('get_fw_version\n') return value
[docs] def get_hostname(self): """Get hostname of Temas :return: Hostname is returned :rtype: String """ value = self.temas.send_cmd('get_hostname\n') return value
[docs] def set_hostname(self, hostname): """Set hostname of Temas""" value = self.temas.send_cmd('set_hostname|' + str(hostname) + '|' + '\n') return value
[docs] def get_port_camera(self): """Get port of Temas visual camera :return: Port from visual camera :rtype: String """ value = self.temas.send_cmd('get_port_camera\n') return value
[docs] def get_port_tof_camera(self): """Get port of Temas tof camera :return: Port from tof camera :rtype: String """ value = self.temas.send_cmd('get_port_tof_camera\n') return value
[docs] def get_port_common(self): """Get port of Temas common settings :return: Port from Temas common settings :rtype: String """ value = self.temas.send_cmd('get_port_common\n') return value
[docs] def get_port_control(self): """Get port of control device :return: Port from control device :rtype: String """ value = self.temas.send_cmd('get_port_control\n') return value
[docs] def get_mac(self): """Get the mac address of Temas :return: Returns the mac address :rtype: String """ value = self.temas.send_cmd('get_mac\n') return value
[docs] def get_laser_x_los(self): """Get the x value los of Laser in pixels relative to the visual frame. :return: Returns the x value los (px) :rtype: String """ value = self.temas.send_cmd('get_laser_x_los\n') return value
[docs] def get_laser_y_los(self): """Get the y value los of Laser in pixels relative to the visual frame. :return: Returns the y value los :rtype: String """ value = self.temas.send_cmd('get_laser_y_los\n') return value
[docs] def set_static_ip(self, ip, gateway, dns): """static ip /24 dhcp :param ip: IP address :param gateway: Default gateway for the network :param dns: DNS server address """ self.temas.send_cmd('set_static_ip|' + str(ip) + '|' + str(gateway) + '|' + str(dns) + '|' + '\n')
[docs] def set_direct_static_ip(self, ip): """static ip /24 without dhcp :param ip: IP address """ self.temas.send_cmd('set_direct_static_ip|' + str(ip) + '|' + '\n')
[docs] def near_mode_on(self): """tof near mode on""" self.temas.send_cmd('near_mode_on\n')
[docs] def near_mode_off(self): """tof near mode off""" self.temas.send_cmd('near_mode_off\n')