pyqt5-thread

QRunnable and the QThreadPool

https://www.learnpyqt.com/courses/concurrent-execution/multithreading-pyqt-applications-qthreadpool/

using this method

from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *

import time
import traceback, sys


class WorkerSignals(QObject):
'''
Defines the signals available from a running worker thread.

Supported signals are:

finished
No data

error
`tuple` (exctype, value, traceback.format_exc() )

result
`object` data returned from processing, anything

progress
`int` indicating % progress

'''
finished = pyqtSignal()
error = pyqtSignal(tuple)
result = pyqtSignal(object)
progress = pyqtSignal(int)


class Worker(QRunnable):
'''
Worker thread

Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

:param callback: The function callback to run on this worker thread. Supplied args and
kwargs will be passed through to the runner.
:type callback: function
:param args: Arguments to pass to the callback function
:param kwargs: Keywords to pass to the callback function

'''

def __init__(self, fn, *args, **kwargs):
super(Worker, self).__init__()

# Store constructor arguments (re-used for processing)
self.fn = fn
self.args = args
self.kwargs = kwargs
self.signals = WorkerSignals()

# Add the callback to our kwargs
self.kwargs['progress_callback'] = self.signals.progress

@pyqtSlot()
def run(self):
'''
Initialise the runner function with passed args, kwargs.
'''

# Retrieve args/kwargs here; and fire processing using them
try:
result = self.fn(*self.args, **self.kwargs)
except:
traceback.print_exc()
exctype, value = sys.exc_info()[:2]
self.signals.error.emit((exctype, value, traceback.format_exc()))
else:
self.signals.result.emit(result) # Return the result of the processing
finally:
self.signals.finished.emit() # Done



class MainWindow(QMainWindow):


def __init__(self, *args, **kwargs):
super(MainWindow, self).__init__(*args, **kwargs)

self.counter = 0

layout = QVBoxLayout()

self.l = QLabel("Start")
b = QPushButton("DANGER!")
b.pressed.connect(self.oh_no)

layout.addWidget(self.l)
layout.addWidget(b)

w = QWidget()
w.setLayout(layout)

self.setCentralWidget(w)

self.show()

self.threadpool = QThreadPool()
print("Multithreading with maximum %d threads" % self.threadpool.maxThreadCount())

self.timer = QTimer()
self.timer.setInterval(1000)
self.timer.timeout.connect(self.recurring_timer)
self.timer.start()

def progress_fn(self, n):
print("%d%% done" % n)

def execute_this_fn(self, progress_callback):
for n in range(0, 5):
time.sleep(1)
progress_callback.emit(n*100/4)

return "Done."

def print_output(self, s):
print(s)

def thread_complete(self):
print("THREAD COMPLETE!")

def oh_no(self):
# Pass the function to execute
worker = Worker(self.execute_this_fn) # Any other args, kwargs are passed to the run function
worker.signals.result.connect(self.print_output)
worker.signals.finished.connect(self.thread_complete)
worker.signals.progress.connect(self.progress_fn)

# Execute
self.threadpool.start(worker)


def recurring_timer(self):
self.counter +=1
self.l.setText("Counter: %d" % self.counter)


app = QApplication([])
window = MainWindow()
app.exec_()

QThread

class GetStockCode(QThread):
signal_str = pyqtSignal(str)

def __init__(self):
super().__init__()

def run(self):
self.signal_str.emit(r'开始获取股票代码...')
if len(result.inserted_ids) != len(data_arr):
assert False
self.signal_str.emit(r'插入代码表数据失败.请检查...')
else:
self.signal_str.emit(r'插入代码表数据成功...')

def __init__(self, parent=None):
self.GetStockCode = GetStockCode()
self.ui.btn_update_stock_code.clicked.connect(self.on_btn_stock_code)

def on_btn_trade_date(self):
self.GetTradeDate.start()