DAO Thread System
Overview
The DAO Thread system provides a robust and flexible framework for creating and managing threads in C++ applications. It is designed to support real-time applications with features such as core affinity, NUMA awareness, and inter-thread communication.
The thread system consists of several layers:
ThreadIfce: Interface defining the thread operations
ThreadBase: Base implementation providing core thread functionality
Thread: User-facing class that combines the interface and base implementation
ThreadTable: Container for managing multiple threads
SignalTable: Inter-thread communication mechanism
Class Hierarchy
ThreadIfce (Interface)
|
| ThreadBase
| |
+----------+
|
+--> Thread
Thread Interface
The ThreadIfce class defines the standard interface for all thread operations:
class ThreadIfce
{
public:
virtual ~ThreadIfce(){};
virtual void Start() = 0;
virtual void Stop() = 0;
virtual void Exit() = 0;
virtual void Spawn() = 0;
virtual void Join() = 0;
virtual void Kill(int signal) = 0;
// signaling
virtual void Signal(int index) = 0;
virtual void SignalWaitSpin(int index) = 0;
virtual void SignalWaitSleep(int index, int usleep) = 0;
};
Thread Base Class
The ThreadBase class provides the core implementation of thread functionality:
Construction
ThreadBase(std::string thread_name, Log::Logger& logger, int core=-1, int thread_number=-1, bool rt_enabled=true)
Parameters:
thread_name: Name for the thread (shown in system tools)
logger: Logger instance for thread logging
core: CPU core to pin this thread to (-1 for no specific core)
thread_number: ID for thread when multiple threads of same type exist
rt_enabled: Whether to use real-time scheduling when possible
Thread Lifecycle
Threads follow a specific lifecycle:
Construction: Thread object is created but not started
Spawn: Thread is spawned but waits for a start signal
Start: Thread begins execution
Stop: Thread stops execution but remains ready to restart
Exit: Thread exits completely
Join: Wait for thread to finish
Thread Lifecycle Methods
Spawn(): Creates the thread and calls OnceOnSpawn()
Start(): Signals the thread to begin execution and calls OnceOnStart()
Stop(): Signals the thread to stop execution and calls OnceOnStop()
Exit(): Signals the thread to exit completely and calls OnceOnExit()
Join(): Waits for the thread to finish execution
Kill(int signal): Forcibly terminates the thread
Extension Points
ThreadBase provides several virtual methods that can be overridden:
OnceOnSpawn(): Called once when the thread is spawned
OnceOnStart(): Called once when the thread is started
OnceOnStop(): Called once when the thread is stopped
OnceOnExit(): Called once when the thread exits
Body(): The main thread function (must be implemented by derived classes)
Thread Class
The Thread class combines ThreadBase and ThreadIfce, providing a complete implementation:
class Thread: public ThreadBase, public ThreadIfce
{
public:
Thread(std::string name, Log::Logger& logger, int core=-1, int thread_number=-1, bool rt_enabled=true);
// Implements ThreadIfce methods by delegating to ThreadBase
// ...
protected:
virtual void RestartableThread() = 0; // Must be implemented by derived classes
};
Creating a Custom Thread
To create a custom thread, inherit from the Thread class and implement RestartableThread():
class MyThread : public Dao::Thread
{
public:
MyThread(std::string name, Dao::Log::Logger& logger, int core=-1)
: Thread(name, logger, core)
{
// Custom initialization
}
protected:
void RestartableThread() override
{
// This function is called repeatedly while the thread is running
// Implement your thread's main logic here
// For example:
processData();
// Optional sleep to control execution rate
usleep(1000);
}
// Optionally override lifecycle hooks
void OnceOnStart() override
{
Thread::OnceOnStart(); // Call base implementation
// Additional start logic
}
};
Thread Table
The ThreadTable class provides a way to manage multiple threads as a group:
ThreadTable threadTable;
// Add threads
threadTable.Add(&thread1);
threadTable.Add(&thread2);
// Start all threads
threadTable.Spawn();
threadTable.Start();
// Stop all threads
threadTable.Stop();
threadTable.Exit();
threadTable.Join();
Thread Table Methods
Add(ThreadIfce* thread): Adds a thread to the table
Start(): Starts all threads
Stop(): Stops all threads
Exit(): Signals all threads to exit
Spawn(): Spawns all threads
Join(): Waits for all threads to finish
Kill(int signal): Forcibly terminates all threads
Signal(int index): Sends a signal to all threads
Signal Table
The SignalTable class provides a mechanism for inter-thread communication:
// Thread 1
m_signal_table->SignalSend(SIGNAL_DATA_READY);
// Thread 2
m_signal_table->SignalReceiveSpin(SIGNAL_DATA_READY); // Blocks until signal received
Signal Table Methods
SignalSend(int index): Sends a signal
SignalReceive(int index): Non-blocking check for signal
SignalReceiveSpin(int index): Blocking wait for signal (spinning)
SignalReceiveSleep(int index, uint64_t uSleep): Blocking wait with sleep intervals
SignalReset(int index): Resets a specific signal
SignalTableReset(): Resets all signals
Predefined Signals
SIGNAL_THREAD_READY (0): Indicates thread is ready
SIGNAL_LOOPSTART_ALL (1): Used to synchronize the start of multiple threads
Real-Time Considerations
For real-time applications, the thread system offers:
Core Affinity: Pin threads to specific CPU cores
NUMA Awareness: Optimize memory access for Non-Uniform Memory Architecture
Real-time Scheduling: Use SCHED_FIFO when running as root
Signal-based Synchronization: Low-latency inter-thread communication
Best Practices
Thread Creation: Create all threads at application startup to avoid dynamic thread creation overhead
Core Placement: Pin critical threads to isolated cores
Thread Priority: Use RT threads for time-critical operations
Signal Management: Reset signals appropriately to avoid lost or stale signals
Error Handling: Implement proper exception handling in the thread functions
Usage Example
#include <daoThread.hpp>
#include <daoLog.hpp>
class ProcessingThread : public Dao::Thread
{
public:
ProcessingThread(Dao::Log::Logger& logger)
: Thread("Processor", logger, 2) // Pin to core 2
{
// Initialize
}
protected:
void RestartableThread() override
{
// Process data
processNextDataItem();
// Signal completion
Signal(DATA_PROCESSED);
// Sleep briefly to yield CPU
usleep(100);
}
private:
void processNextDataItem()
{
// Implementation
}
};
int main()
{
Dao::Log::Logger logger("ThreadApp");
ProcessingThread procThread(logger);
// Start thread
procThread.Spawn();
procThread.Start();
// Wait for thread to finish
procThread.Join();
return 0;
}