红联Linux门户
Linux帮助

Linux内核分析之工作队列

发布时间:2014-12-01 15:45:11来源:linux网站作者:bullbat

可延迟函数和工作队列非常相似,但是他们的区别还是很大的。主要区别在于:可延迟函数运行在中断上下文中,而工作队列中的函数运行在进程上下文中。在中断上下文中不可能发生进程切换。可延迟函数和工作队列中的函数都不能访问进程的用户态地址空间。


涉及数据结构

/*
 * The per-CPU workqueue (if single thread, we always use the first
 * possible cpu).
 */ 
struct cpu_workqueue_struct { 
 
spinlock_t lock;/*保护该数据结构的自旋锁*/ 
 
struct list_head worklist;/*挂起链表的头结点*/ 
/*等待队列,其中的工作者线程因等待跟多
的工作而处于睡眠状态*/ 
wait_queue_head_t more_work; 
/*等待队列,其中的进程由于等待工作队列
被刷新而处于睡眠状态*/ 
struct work_struct *current_work; 
 
struct workqueue_struct *wq; 
struct task_struct *thread;/*指向结构中工作者线程的进程描述符指针*/ 
} ____cacheline_aligned; 
 
/*
 * The externally visible workqueue abstraction is an array of
 * per-CPU workqueues:
 */ 
struct workqueue_struct { 
struct cpu_workqueue_struct *cpu_wq; 
struct list_head list; 
const char *name; 
int singlethread; 
int freezeable; /* Freeze threads during suspend */ 
int rt; 
#ifdef CONFIG_LOCKDEP  
struct lockdep_map lockdep_map; 
#endif  
}; 


工作队列操作

创建

最终都会调用如下函数执行

struct workqueue_struct *__create_workqueue_key(const char *name, 
int singlethread, 
int freezeable, 
int rt, 
struct lock_class_key *key, 
const char *lock_name) 

struct workqueue_struct *wq; 
struct cpu_workqueue_struct *cwq; 
int err = 0, cpu; 
/*分配wq结构*/ 
wq = kzalloc(sizeof(*wq), GFP_KERNEL); 
if (!wq) 
return NULL; 
/*分配cwq结构*/ 
wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct); 
if (!wq->cpu_wq) { 
kfree(wq); 
return NULL; 

 
wq->name = name; 
lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); 
wq->singlethread = singlethread; 
wq->freezeable = freezeable; 
wq->rt = rt; 
INIT_LIST_HEAD(&wq->list); 
 
if (singlethread) {/*如果设置了单线程,只创建一个*/ 
/*初始化cwq*/ 
cwq = init_cpu_workqueue(wq, singlethread_cpu); 
/*创建内核线程*/ 
err = create_workqueue_thread(cwq, singlethread_cpu); 
/*唤醒刚创建的内核线程*/ 
start_workqueue_thread(cwq, -1); 
} else {/*反之,每个cpu创建一个线程*/ 
cpu_maps_update_begin(); 
/*
 * We must place this wq on list even if the code below fails.
 * cpu_down(cpu) can remove cpu from cpu_populated_map before
 * destroy_workqueue() takes the lock, in that case we leak
 * cwq[cpu]->thread.
 */ 
spin_lock(&workqueue_lock); 
list_add(&wq->list, &workqueues); 
spin_unlock(&workqueue_lock); 
/*
 * We must initialize cwqs for each possible cpu even if we
 * are going to call destroy_workqueue() finally. Otherwise
 * cpu_up() can hit the uninitialized cwq once we drop the
 * lock.
 */ 
for_each_possible_cpu(cpu) {/*对每个cpu*/ 
cwq = init_cpu_workqueue(wq, cpu); 
if (err || !cpu_online(cpu)) 
continue; 
err = create_workqueue_thread(cwq, cpu); 
start_workqueue_thread(cwq, cpu); 

cpu_maps_update_done(); 

 
if (err) { 
destroy_workqueue(wq); 
wq = NULL; 

return wq; 


可见,工作队列在创建时就唤醒创建的内核线程,下面我们看看他创建的内核线程

static int worker_thread(void *__cwq) 

struct cpu_workqueue_struct *cwq = __cwq; 
DEFINE_WAIT(wait); 
 
if (cwq->wq->freezeable) 
set_freezable(); 
 
for (;;) { 
prepare_to_wait(&cwq->more_work, &wait, TASK_INTERRUPTIBLE); 
if (!freezing(current) && 
!kthread_should_stop() && 
list_empty(&cwq->worklist)) 
schedule(); 
finish_wait(&cwq->more_work, &wait); 
 
try_to_freeze(); 
 
if (kthread_should_stop()) 
break; 
/*执行工作队列*/ 
run_workqueue(cwq); 

 
return 0; 
}

static void run_workqueue(struct cpu_workqueue_struct *cwq) 

spin_lock_irq(&cwq->lock); 
while (!list_empty(&cwq->worklist)) { 
struct work_struct *work = list_entry(cwq->worklist.next, 
struct work_struct, entry); 
work_func_t f = work->func; 
#ifdef CONFIG_LOCKDEP  
/*
 * It is permissible to free the struct work_struct
 * from inside the function that is called from it,
 * this we need to take into account for lockdep too.
 * To avoid bogus "held lock freed" warnings as well
 * as problems when looking into work->lockdep_map,
 * make a copy and use that here.
 */ 
struct lockdep_map lockdep_map = work->lockdep_map; 
#endif  
trace_workqueue_execution(cwq->thread, work); 
cwq->current_work = work; 
list_del_init(cwq->worklist.next); 
spin_unlock_irq(&cwq->lock); 
 
BUG_ON(get_wq_data(work) != cwq); 
work_clear_pending(work); 
lock_map_acquire(&cwq->wq->lockdep_map); 
lock_map_acquire(&lockdep_map); 
f(work);/*执行工作队列中实际的函数*/ 
lock_map_release(&lockdep_map); 
lock_map_release(&cwq->wq->lockdep_map); 
 
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) { 
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: " 
"%s/0x%08x/%d\n", 
current->comm, preempt_count(), 
task_pid_nr(current)); 
printk(KERN_ERR "last function: "); 
print_symbol("%s\n", (unsigned long)f); 
debug_show_held_locks(current); 
dump_stack(); 

 
spin_lock_irq(&cwq->lock); 
cwq->current_work = NULL; 

spin_unlock_irq(&cwq->lock); 
}


可见,创建的内核线程是执行工作队列中的所有函数。
除了最重要的创建函数,内核提供了一系列函数对其操作和方便编程,在这里介绍一个插入队列的函数。

/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns 0 if @work was already on a queue, non-zero otherwise.
 *
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
 */ 
int queue_work(struct workqueue_struct *wq, struct work_struct *work) 

int ret; 
 
ret = queue_work_on(get_cpu(), wq, work); 
put_cpu(); 
 
return ret; 

/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns 0 if @work was already on a queue, non-zero otherwise.
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 */ 
int 
queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work) 

int ret = 0; 
 
if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) { 
BUG_ON(!list_empty(&work->entry)); 
__queue_work(wq_per_cpu(wq, cpu), work); 
ret = 1; 

return ret; 
}
 

最终调用insert_work函数

static void insert_work(struct cpu_workqueue_struct *cwq, 
struct work_struct *work, struct list_head *head) 

trace_workqueue_insertion(cwq->thread, work); 
 
set_wq_data(work, cwq); 
/*
 * Ensure that we get the right work->data if we see the
 * result of list_add() below, see try_to_grab_pending().
 */ 
smp_wmb(); 
list_add_tail(&work->entry, head); 
wake_up(&cwq->more_work); 
}


可见,在队列插入的时候就实现了唤醒。其他的函数不一一说了,了解了他的实现原理,看懂不难。