🧿Code samples

Here is code samples of features.

Terminate running thread

from kthreading import KThread
from time import sleep

def sample_function():
    # make random thing

# create an KThread instance
thread = KThread(target=sample_function)

# wait 10 seconds
sleep(10)

# kill running "sample_function"
thread.terminate()

Execute function with timeout

from kthreading import KThread
from kthreading import KThreadExecutionTimedOut

def sample_function():
    # make random thing
    return "Hello World"

# create an KThread instance
thread = KThread(target=sample_function)
wait_seconds = 10

try:
    result = thread.timeout(wait_seconds)
    print(result)
    
except KThreadExecutionTimedOut as error:
    print("Thread does not end after %s" % error.timeout)

Get function output

from kthreading import KThread

def sample_function():
    # make random thing
    return "Hello World"

# create an KThread instance
thread = KThread(target=sample_function)
thread.start()

# wait thread ends or other some random thing
thread.join()

# get result
status = thread.get_status()

if status.success:
    print(status.result)
    
else:
    print("Error %s" % status.result.text)

Run new thread with single line of code

from kthreading import start_kthread

def sample_function():
    # make random thing

start_kthread(target=sample_function)

Set function as error handler

from kthreading import KThread

def function_with_error():
    # NameError
    return a + b
    
def error_handler(error):
    print("Error occured: %s" % error.name)
    
thread = KThread(
    target=function_with_error,
    on_error=error_handler
)
thread.start()

Last updated