workflow Executor 理解
workflow Executor
Executor需要涉及到对线程池和链表的理解。
ExecQueue ExecSession Executor
class ExecQueue
{
...
private:
// 给类提供了列表的头节点
struct list_head session_list;
// 作为该队列操作的唯一mutex来提供锁
pthread_mutex_t mutex;
...
};
class ExecSession
{
private:
// 实际操作处理
virtual void execute() = 0;
// 状态处理
virtual void handle(int state, int error) = 0;
private:
// 列表
ExecQueue *queue;
...
};
class Executor
{
...
public:
// 任务提出
int request(ExecSession *session, ExecQueue *queue);
...
};
Executor函数理解
首先有一个重要的struct,是整个类串联起来的关键。
ExecSessionEntry
struct ExecSessionEntry
{
struct list_head list;
ExecSession *session;
thrdpool_t *thrdpool;
};
Executor::request
int Executor::request(ExecSession *session, ExecQueue *queue)
{
struct ExecSessionEntry *entry;
session->queue = queue;
// 开辟ExecSessionEntry空间
entry = (struct ExecSessionEntry *)malloc(sizeof (struct ExecSessionEntry));
if (entry)
{
entry->session = session;
entry->thrdpool = this->thrdpool;
pthread_mutex_lock(&queue->mutex);
// 将entry挂载到queue末尾
list_add_tail(&entry->list, &queue->session_list);
// 如果queue下一个就是当前所挂载,则push到线程池中
if (queue->session_list.next == &entry->list)
{
struct thrdpool_task task = {
.routine = Executor::executor_thread_routine,
.context = queue
};
if (thrdpool_schedule(&task, this->thrdpool) < 0)
{
list_del(&entry->list);
free(entry);
entry = NULL;
}
}
pthread_mutex_unlock(&queue->mutex);
}
return -!entry;
}
Executor::executor_thread_routine
void Executor::executor_thread_routine(void *context)
{
ExecQueue *queue = (ExecQueue *)context;
struct ExecSessionEntry *entry;
ExecSession *session;
pthread_mutex_lock(&queue->mutex);
// queue->session_list.nexts实际上就是ExecSessionEntry的list
entry = list_entry(queue->session_list.next, struct ExecSessionEntry, list);
// 去除当前节点
list_del(&entry->list);
// 获取要执行的session
session = entry->session;
// 判断链表上是否还有节点,有的话继续放入到线程池中
if (!list_empty(&queue->session_list))
{
struct thrdpool_task task = {
.routine = Executor::executor_thread_routine,
.context = queue
};
__thrdpool_schedule(&task, entry, entry->thrdpool);
}
else
free(entry);
pthread_mutex_unlock(&queue->mutex);
// 实际任务执行
session->execute();
// 状态处理
session->handle(ES_STATE_FINISHED, 0);
}
Executor::executor_cancel
void Executor::executor_cancel(const struct thrdpool_task *task)
{
ExecQueue *queue = (ExecQueue *)task->context;
struct ExecSessionEntry *entry;
struct list_head *pos, *tmp;
ExecSession *session;
// 遍历当前链表
list_for_each_safe(pos, tmp, &queue->session_list)
{
// 找到每个任务的入口
entry = list_entry(pos, struct ExecSessionEntry, list);
// 删除当前节点
list_del(pos);
session = entry->session;
free(entry);
// 状态处理
session->handle(ES_STATE_CANCELED, 0);
}
}