hexapod pc client
parent
d365e6fa4c
commit
ac755a260d
@ -0,0 +1,259 @@
|
||||
"""
|
||||
Copyright (C) 2017 - PRESENT Zhengyu Peng, https://zpeng.me
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
----------
|
||||
|
||||
` `
|
||||
-:. -#:
|
||||
-//:. -###:
|
||||
-////:. -#####:
|
||||
-/:.://:. -###++##:
|
||||
.. `://:- -###+. :##:
|
||||
`:/+####+. :##:
|
||||
.::::::::/+###. :##:
|
||||
.////-----+##: `:###:
|
||||
`-//:. :##: `:###/.
|
||||
`-//:. :##:`:###/.
|
||||
`-//:+######/.
|
||||
`-/+####/.
|
||||
`+##+.
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
.+:
|
||||
|
||||
"""
|
||||
|
||||
import sys
|
||||
from PySide6 import QtWidgets, QtCore, QtGui
|
||||
from PySide6.QtCore import Qt
|
||||
from PySide6.QtCore import QThread, QFile
|
||||
from PySide6.QtUiTools import QUiLoader
|
||||
|
||||
import psutil
|
||||
import socket
|
||||
|
||||
from pathlib import Path
|
||||
import json
|
||||
|
||||
from tcpclient import TCPClient
|
||||
|
||||
QtWidgets.QApplication.setAttribute(
|
||||
QtCore.Qt.AA_EnableHighDpiScaling, True) # enable highdpi scaling
|
||||
QtWidgets.QApplication.setAttribute(
|
||||
QtCore.Qt.AA_UseHighDpiPixmaps, True) # use highdpi icons
|
||||
|
||||
|
||||
class MyApp(QtWidgets.QMainWindow):
|
||||
|
||||
def __init__(self):
|
||||
super(MyApp, self).__init__()
|
||||
|
||||
self.status_message = ['● Idle', '● Idle',
|
||||
'● Idle', '● Idle', '● Idle', '']
|
||||
|
||||
config_file = Path('config.json')
|
||||
|
||||
if config_file.exists():
|
||||
self.config = json.load(open('config.json', 'r'))
|
||||
else:
|
||||
self.config = dict()
|
||||
json.dump(self.config, open('config.json', 'w+'))
|
||||
|
||||
"""Load UI"""
|
||||
ui_file_name = "mainwindow.ui"
|
||||
ui_file = QFile(ui_file_name)
|
||||
loader = QUiLoader()
|
||||
self.ui = loader.load(ui_file)
|
||||
|
||||
ui_file.close()
|
||||
self.init_ui()
|
||||
|
||||
self.ui.comboBox_Interface.currentIndexChanged.connect(
|
||||
self.on_interface_selection_change)
|
||||
self.ui.button_Refresh.clicked.connect(self.on_refresh_button_clicked)
|
||||
|
||||
self.ui.button_TcpClient.clicked.connect(
|
||||
self.on_tcp_client_connect_button_clicked
|
||||
)
|
||||
|
||||
self.ui.show()
|
||||
|
||||
def save_config(self):
|
||||
try:
|
||||
json.dump(self.config, open('config.json', 'w+'))
|
||||
except PermissionError as err:
|
||||
pass
|
||||
|
||||
def init_ui(self):
|
||||
# Interface
|
||||
self.update_network_interfaces()
|
||||
|
||||
# TCP Client
|
||||
# self.ui.textBrowser_TcpClientMessage.setEnabled(False)
|
||||
# self.ui.lineEdit_TcpClientSend.setEnabled(False)
|
||||
# self.ui.button_TcpClientSend.setEnabled(False)
|
||||
|
||||
tcp_client_ip = self.config.get('TCP_Client_IP', '127.0.0.1')
|
||||
tcp_client_port = self.config.get('TCP_Client_Port', '1234')
|
||||
self.ui.lineEdit_TcpClientTargetIP.setText(tcp_client_ip)
|
||||
self.ui.lineEdit_TcpClientTargetPort.setText(tcp_client_port)
|
||||
|
||||
def update_network_interfaces(self):
|
||||
self.net_if = psutil.net_if_addrs()
|
||||
|
||||
interface_idx = self.config.get('Interface', 0)
|
||||
self.ui.comboBox_Interface.clear()
|
||||
|
||||
net_names = list(self.net_if.keys())
|
||||
net_if_stats = psutil.net_if_stats()
|
||||
|
||||
for if_name in net_names:
|
||||
if not net_if_stats[if_name].isup:
|
||||
self.net_if.pop(if_name, None)
|
||||
else:
|
||||
self.ui.comboBox_Interface.addItem(if_name)
|
||||
|
||||
if interface_idx >= self.ui.comboBox_Interface.count():
|
||||
self.ui.comboBox_Interface.setCurrentIndex(0)
|
||||
else:
|
||||
self.ui.comboBox_Interface.setCurrentIndex(interface_idx)
|
||||
|
||||
current_interface = self.ui.comboBox_Interface.currentText()
|
||||
self.config['Interface'] = self.ui.comboBox_Interface.currentIndex()
|
||||
|
||||
for snicaddr in self.net_if[current_interface]:
|
||||
if snicaddr.family == socket.AF_INET:
|
||||
ipv4_add = snicaddr.address
|
||||
break
|
||||
else:
|
||||
ipv4_add = '0.0.0.0'
|
||||
|
||||
self.ui.label_LocalIP.setText(ipv4_add)
|
||||
|
||||
self.save_config()
|
||||
|
||||
def on_interface_selection_change(self):
|
||||
current_interface = self.ui.comboBox_Interface.currentText()
|
||||
|
||||
if current_interface in self.net_if:
|
||||
for snicaddr in self.net_if[current_interface]:
|
||||
if snicaddr.family == socket.AF_INET:
|
||||
ipv4_add = snicaddr.address
|
||||
break
|
||||
else:
|
||||
ipv4_add = '0.0.0.0'
|
||||
else:
|
||||
return
|
||||
|
||||
self.ui.label_LocalIP.setText(ipv4_add)
|
||||
self.config['Interface'] = self.ui.comboBox_Interface.currentIndex()
|
||||
self.save_config()
|
||||
|
||||
def on_refresh_button_clicked(self):
|
||||
self.update_network_interfaces()
|
||||
|
||||
# TCP Client
|
||||
def on_tcp_client_connect_button_clicked(self):
|
||||
if self.ui.button_TcpClient.text() == 'Connect':
|
||||
self.ui.button_TcpClient.setEnabled(False)
|
||||
self.ui.lineEdit_TcpClientTargetIP.setEnabled(False)
|
||||
self.ui.lineEdit_TcpClientTargetPort.setEnabled(False)
|
||||
|
||||
self.tcp_client_thread = QThread()
|
||||
self.tcp_client = TCPClient(
|
||||
self.ui.lineEdit_TcpClientTargetIP.text(),
|
||||
int(self.ui.lineEdit_TcpClientTargetPort.text()))
|
||||
|
||||
self.tcp_client_thread.started.connect(self.tcp_client.start)
|
||||
self.tcp_client.status.connect(self.on_tcp_client_status_update)
|
||||
self.tcp_client.message.connect(self.on_tcp_client_message_ready)
|
||||
|
||||
self.tcp_client.moveToThread(self.tcp_client_thread)
|
||||
|
||||
self.tcp_client_thread.start()
|
||||
|
||||
self.config['TCP_Client_IP'] = self.ui.lineEdit_TcpClientTargetIP.text()
|
||||
self.config['TCP_Client_Port'] = self.ui.lineEdit_TcpClientTargetPort.text()
|
||||
self.save_config()
|
||||
|
||||
elif self.ui.button_TcpClient.text() == 'Disconnect':
|
||||
self.ui.button_TcpClient.setEnabled(False)
|
||||
self.tcp_client.close()
|
||||
|
||||
def on_tcp_client_status_update(self, status, addr):
|
||||
if status == TCPClient.STOP:
|
||||
self.tcp_client.status.disconnect()
|
||||
self.tcp_client.message.disconnect()
|
||||
|
||||
self.ui.button_TcpClient.setText('Connect')
|
||||
self.tcp_client_thread.quit()
|
||||
|
||||
self.ui.lineEdit_TcpClientTargetIP.setEnabled(True)
|
||||
self.ui.lineEdit_TcpClientTargetPort.setEnabled(True)
|
||||
|
||||
self.ui.textBrowser_TcpClientMessage.setEnabled(False)
|
||||
self.ui.lineEdit_TcpClientSend.setEnabled(False)
|
||||
self.ui.button_TcpClientSend.setEnabled(False)
|
||||
self.status_message[0] = '● Idle'
|
||||
if self.ui.tabWidget.currentIndex() == 0:
|
||||
self.on_tab_changed(0)
|
||||
|
||||
elif status == TCPClient.CONNECTED:
|
||||
self.ui.button_TcpClient.setText('Disconnect')
|
||||
|
||||
self.ui.textBrowser_TcpClientMessage.setEnabled(True)
|
||||
self.ui.lineEdit_TcpClientSend.setEnabled(True)
|
||||
self.ui.button_TcpClientSend.setEnabled(True)
|
||||
self.status_message[0] = '● Connected to ' +\
|
||||
self.ui.label_LocalIP.text() +\
|
||||
':'+self.ui.lineEdit_TcpClientTargetPort.text()
|
||||
if self.ui.tabWidget.currentIndex() == 0:
|
||||
self.on_tab_changed(0)
|
||||
|
||||
self.ui.button_TcpClient.setEnabled(True)
|
||||
|
||||
def on_tcp_client_message_ready(self, source, msg):
|
||||
self.ui.textBrowser_TcpClientMessage.append(
|
||||
'<p style="text-align: center;"><span style="color: #2196F3;"><strong>----- ' +
|
||||
source +
|
||||
' -----</strong></span></p>')
|
||||
self.ui.textBrowser_TcpClientMessage.append(
|
||||
'<p style="text-align: center;"><span style="color: #2196F3;">' +
|
||||
msg +
|
||||
'</span></p>')
|
||||
|
||||
def on_tcp_client_message_send(self):
|
||||
self.tcp_client.send(self.ui.lineEdit_TcpClientSend.text())
|
||||
self.ui.textBrowser_TcpClientMessage.append(
|
||||
'<p style="text-align: center;"><strong>----- ' +
|
||||
'this' +
|
||||
' -----</strong></p>')
|
||||
self.ui.textBrowser_TcpClientMessage.append(
|
||||
'<p style="text-align: center;">' +
|
||||
self.ui.lineEdit_TcpClientSend.text() +
|
||||
'</p>')
|
||||
self.ui.lineEdit_TcpClientSend.clear()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
QtCore.QCoreApplication.setAttribute(QtCore.Qt.AA_ShareOpenGLContexts)
|
||||
app = QtWidgets.QApplication(sys.argv)
|
||||
window = MyApp()
|
||||
|
||||
sys.exit(app.exec())
|
@ -0,0 +1,104 @@
|
||||
"""
|
||||
Copyright (C) 2017 - 2021 Zhengyu Peng, https://zpeng.me
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
----------
|
||||
|
||||
` `
|
||||
-:. -#:
|
||||
-//:. -###:
|
||||
-////:. -#####:
|
||||
-/:.://:. -###++##:
|
||||
.. `://:- -###+. :##:
|
||||
`:/+####+. :##:
|
||||
.::::::::/+###. :##:
|
||||
.////-----+##: `:###:
|
||||
`-//:. :##: `:###/.
|
||||
`-//:. :##:`:###/.
|
||||
`-//:+######/.
|
||||
`-/+####/.
|
||||
`+##+.
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
:##:
|
||||
.+:
|
||||
|
||||
"""
|
||||
|
||||
from PySide6.QtCore import QObject, Signal, Slot
|
||||
import socket
|
||||
|
||||
|
||||
class TCPClient(QObject):
|
||||
status = Signal(int, object)
|
||||
message = Signal(object, object)
|
||||
ERROR = -1
|
||||
LISTEN = 1
|
||||
CONNECTED = 2
|
||||
STOP = 3
|
||||
|
||||
SIG_NORMAL = 0
|
||||
SIG_STOP = 1
|
||||
SIG_DISCONNECT = 2
|
||||
|
||||
def __init__(self, ip, port):
|
||||
QObject.__init__(self)
|
||||
|
||||
self.ip = ip
|
||||
self.port = port
|
||||
self.tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.tcp_socket.settimeout(1)
|
||||
|
||||
self.signal = self.SIG_NORMAL
|
||||
|
||||
@Slot()
|
||||
def start(self):
|
||||
try:
|
||||
self.tcp_socket.connect((self.ip, self.port))
|
||||
except OSError as err:
|
||||
print(err)
|
||||
# self.status.emit(self.STOP, '')
|
||||
else:
|
||||
# print('connected')
|
||||
self.status.emit(self.CONNECTED, self.ip)
|
||||
|
||||
while True:
|
||||
if self.signal == self.SIG_NORMAL:
|
||||
try:
|
||||
|
||||
data = self.tcp_socket.recv(4096)
|
||||
except socket.timeout as t_out:
|
||||
pass
|
||||
else:
|
||||
if data:
|
||||
self.message.emit(
|
||||
self.ip+':'+str(self.port),
|
||||
data.decode())
|
||||
else:
|
||||
break
|
||||
elif self.signal == self.SIG_DISCONNECT:
|
||||
self.signal = self.SIG_NORMAL
|
||||
self.tcp_socket.close()
|
||||
break
|
||||
finally:
|
||||
self.status.emit(self.STOP, '')
|
||||
|
||||
def send(self, msg):
|
||||
self.tcp_socket.sendall(msg.encode())
|
||||
|
||||
def close(self):
|
||||
self.signal = self.SIG_DISCONNECT
|
Loading…
Reference in New Issue