🧿Code samples
Here is code samples of features.
Terminate running thread
from kthreading import KThread
from time import sleep
def sample_function():
# make random thing
# create an KThread instance
thread = KThread(target=sample_function)
# wait 10 seconds
sleep(10)
# kill running "sample_function"
thread.terminate()
Execute function with timeout
from kthreading import KThread
from kthreading import KThreadExecutionTimedOut
def sample_function():
# make random thing
return "Hello World"
# create an KThread instance
thread = KThread(target=sample_function)
wait_seconds = 10
try:
result = thread.timeout(wait_seconds)
print(result)
except KThreadExecutionTimedOut as error:
print("Thread does not end after %s" % error.timeout)
Get function output
from kthreading import KThread
def sample_function():
# make random thing
return "Hello World"
# create an KThread instance
thread = KThread(target=sample_function)
thread.start()
# wait thread ends or other some random thing
thread.join()
# get result
status = thread.get_status()
if status.success:
print(status.result)
else:
print("Error %s" % status.result.text)
Run new thread with single line of code
from kthreading import start_kthread
def sample_function():
# make random thing
start_kthread(target=sample_function)
Set function as error handler
from kthreading import KThread
def function_with_error():
# NameError
return a + b
def error_handler(error):
print("Error occured: %s" % error.name)
thread = KThread(
target=function_with_error,
on_error=error_handler
)
thread.start()
Last updated