Concurrency Managed Workqueue之(四):workqueue如何处理work

作者:linuxer 发布于:2015-8-17 19:41 分类:中断子系统

一、前言

本文主要讲述下面两部分的内容:

1、将work挂入workqueue的处理过程

2、如何处理挂入workqueue的work

 

二、用户将一个work挂入workqueue

1、queue_work_on函数

使用workqueue机制的模块可以调用queue_work_on(有其他变种的接口,这里略过,其实思路是一致的)将一个定义好的work挂入workqueue,具体代码如下:

bool queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
{
    ……

    if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
        __queue_work(cpu, wq, work);---挂入work list并通知worker thread pool来处理
        ret = true;
    }

    ……
}

work_struct的data member中的WORK_STRUCT_PENDING_BIT这个bit标识了该work是处于pending状态还是正在处理中,pending状态的work只会挂入一次。大部分的逻辑都是在__queue_work函数中,下面的小节都是描述该函数的执行过程。

2、__WQ_DRAINING的解释

__queue_work函数一开始会校验__WQ_DRAINING这个flag,如下:

if (unlikely(wq->flags & __WQ_DRAINING) && WARN_ON_ONCE(!is_chained_work(wq)))
        return;

__WQ_DRAINING这个flag表示该workqueue正在进行draining的操作,这多半是发送在销毁workqueue的时候,既然要销毁,那么挂入该workqueue的所有的work都要处理完毕,才允许它消亡。当想要将一个workqueue中所有的work都清空的时候,如果还有work挂入怎么办?一般而言,这时候当然是不允许新的work挂入了,毕竟现在的目标是清空workqueue中的work。但是有一种特例(通过is_chained_work判定),也就是正在清空的work(隶属于该workqueue)又触发了一个queue work的操作(也就是所谓chained work),这时候该work允许挂入。

3、选择pool workqueue

if (req_cpu == WORK_CPU_UNBOUND)
        cpu = raw_smp_processor_id();

if (!(wq->flags & WQ_UNBOUND))
        pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
    else
        pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

WORK_CPU_UNBOUND表示并不指定cpu,这时候,选择当前代码运行的那个cpu了。一旦确定了cpu了,对于非unbound的workqueue,当然使用per cpu的pool workqueue。如果是unbound的workqueue,那么要根据numa node id来选择。cpu_to_node可以从cpu id获取node id。需要注意的是:这里选择的pool wq只是备选的,可能用也可能不用,它有可能会被替换掉,具体参考下一节描述。

4、选择worker thread pool

与其说挂入workqueue,不如说挂入worker thread pool,因为毕竟是线程池来处理具体的work。pool_workqueue有一个相关联的worker thread pool(struct pool_workqueue的pool成员),因此看起来选择了pool wq也就选定了worker pool了,但是,不是当前选定的那个pool wq对应的worker pool就适合该work,因为有时候该work可能正在其他的worker thread上执行中,在这种情况下,为了确保work的callback function不会重入,该work最好还是挂在那个worker thread pool上,具体代码如下:

last_pool = get_work_pool(work);
    if (last_pool && last_pool != pwq->pool) {
        struct worker *worker;

        spin_lock(&last_pool->lock);

        worker = find_worker_executing_work(last_pool, work);

        if (worker && worker->current_pwq->wq == wq) {
            pwq = worker->current_pwq;
        } else {
            /* meh... not running there, queue here */
            spin_unlock(&last_pool->lock);
            spin_lock(&pwq->pool->lock);
        }
    } else {
        spin_lock(&pwq->pool->lock);
    }

last_pool记录了上一次该work是被哪一个worker pool处理的,如果last_pool就是pool wq对应的worker pool,那么皆大欢喜,否则只能使用last pool了。使用last pool的例子比较复杂一些,因为这时候需要根据last worker pool找到对应的pool workqueue。find_worker_executing_work函数可以找到具体哪一个worker线程正在处理该work,如果没有找到,那么还是使用第3节中选定的pool wq吧,否则,选择该worker线程当前的那个pool workqueue(其实也就是选定了线程池)。

5、选择work挂入的队列

队列有两个,一个是被推迟执行的队列(pwq->delayed_works),一个是线程池要处理的队列(pwq->pool->worklist),如果挂入线程池要处理的队列,也就意味着该work进入active状态,线程池会立刻启动处理流程,如果挂入推迟执行的队列,那么该work还是pending状态:

    pwq->nr_in_flight[pwq->work_color]++;
    work_flags = work_color_to_flags(pwq->work_color);

    if (likely(pwq->nr_active < pwq->max_active)) {
        pwq->nr_active++;
        worklist = &pwq->pool->worklist;
    } else {
        work_flags |= WORK_STRUCT_DELAYED;
        worklist = &pwq->delayed_works;
    }

    insert_work(pwq, work, worklist, work_flags);

具体的挂入队列的动作是在insert_work函数中完成的。

6、唤醒idle的worker来处理该work

在insert_work函数中有下面的代码:

if (__need_more_worker(pool))
        wake_up_worker(pool);

当线程池中正在运行状态的worker线程数目等于0的时候,说明需要wakeup线程池中处于idle状态的的worker线程来处理work。

 

三、线程池如何创建worker线程?

1、per cpu worker pool什么时候创建worker线程?

对于per-CPU workqueue,每个cpu有两个线程池,一个是normal,一个是high priority的。在初始化函数init_workqueues中有对这两个线程池的初始化:

for_each_online_cpu(cpu) {
    struct worker_pool *pool;

    for_each_cpu_worker_pool(pool, cpu) {
        pool->flags &= ~POOL_DISASSOCIATED;
        BUG_ON(!create_worker(pool));
    }
}

因此,在系统初始化的时候,per cpu workqueue共享的那些线程池(2 x cpu nr)就会通过create_worker创建一个initial worker。

一旦initial worker启动,该线程会执行worker_thread函数来处理work,在处理过程中,如果有需要, worker会创建新的线程。

2、unbound thread pool什么时候创建worker线程?

我们先看看unbound thread pool的建立,和per-CPU不同的是unbound thread pool是全局共享的,因此,每当创建不同属性的unbound workqueue的时候,都需要创建pool_workqueue及其对应的worker pool,这时候就会调用get_unbound_pool函数在当前系统中现存的线程池中找是否有匹配的worker pool,如果没有就需要创建新的线程池。在创建新的线程池之后,会立刻调用create_worker创建一个initial worker。和per cpu worker pool一样,一旦initial worker启动,随着work不断的挂入以及worker处理work的具体情况,线程池会动态创建worker。

3、如何创建worker。代码如下:

static struct worker *create_worker(struct worker_pool *pool)
{
    struct worker *worker = NULL;
    int id = -1;
    char id_buf[16];

    id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);----分配ID

    worker = alloc_worker(pool->node);-----分配worker struct的内存

    worker->pool = pool;
    worker->id = id;

    if (pool->cpu >= 0)---------worker的名字
        snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,  pool->attrs->nice < 0  ? "H" : "");
    else
        snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

worker->task = kthread_create_on_node(worker_thread, worker, pool->node,   "kworker/%s", id_buf);

    set_user_nice(worker->task, pool->attrs->nice); ---创建task并设定nice value
    worker->task->flags |= PF_NO_SETAFFINITY; 
    worker_attach_to_pool(worker, pool); -----建立worker和线程池的关系

    spin_lock_irq(&pool->lock);
    worker->pool->nr_workers++;
    worker_enter_idle(worker);
    wake_up_process(worker->task);------让worker运行起来
    spin_unlock_irq(&pool->lock);

    return worker;
}

代码不复杂,通过线程池(struct worker_pool)绑定的cpu信息(struct worker_pool的cpu成员)可以知道该pool是per-CPU还是unbound,对于per-CPU线程池,pool->cpu是大于等于0的。对于对于per-CPU线程池,其worker线程的名字是kworker/cpuworker id,如果是high priority的,后面还跟着一个H字符。对于unbound线程池,其worker线程的名字是kworker/u pool idworker id。

四、work的处理

本章主要描述worker_thread函数的执行流程,部分代码有删节,保留主干部分。

1、PF_WQ_WORKER标记

worker线程函数一开始就会通过PF_WQ_WORKER来标注自己:

worker->task->flags |= PF_WQ_WORKER;

有了这样一个flag,调度器在调度当前进程sleep的时候可以检查这个准备sleep的进程是否是一个worker线程,如果是的话,那么调度器不能鲁莽的调度到其他的进程,这时候,还需要找到该worker对应的线程池,唤醒一个idle的worker线程。通过workqueue模块和调度器模块的交互,当work A被阻塞后(处理该work的worker线程进入sleep),调度器会唤醒其他的worker线程来处理其他的work B,work C……

2、管理线程池中的线程

recheck:
    if (!need_more_worker(pool))
        goto sleep;

    if (unlikely(!may_start_working(pool)) && manage_workers(worker))
        goto recheck;

如何判断是否需要创建更多的worker线程呢?原则如下:

(1)有事情做:挂在worker pool中的work list不能是空的,如果是空的,那么当然sleep就好了

(2)比较忙:worker pool的nr_running成员表示线程池中当前正在干活(running状态)的worker线程有多少个,当nr_running等于0表示所有的worker线程在处理work的时候阻塞了,这时候,必须要启动新的worker线程来处理worker pool上处于active状态的work链表上的work们。

3、worker线程开始处理work

worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

do {
    struct work_struct *work =   list_first_entry(&pool->worklist,  struct work_struct, entry);

    if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
        process_one_work(worker, work);
        if (unlikely(!list_empty(&worker->scheduled)))
            process_scheduled_works(worker);
    } else {
        move_linked_works(work, &worker->scheduled, NULL);
        process_scheduled_works(worker);
    }
} while (keep_working(pool));

worker_set_flags(worker, WORKER_PREP);

按理说worker线程处理work应该比较简单,从线程池的worklist中取一个work,然后调用process_one_work处理之就OK了,不过现实稍微复杂一些,work和work之间并不是独立的,也就是说,work A和work B可能是linked work,这些linked work应该被一个worker来处理。WORK_STRUCT_LINKED标记了work是属于linked work,如果是linked work,worker并不直接处理,而是将其挂入scheduled work list,然后调用process_scheduled_works来处理。毫无疑问,process_scheduled_works也是调用process_one_work来处理一个一个scheduled work list上的work。

scheduled work list并非仅仅应用在linked work,在worker处理work的时候,有一个原则要保证:同一个work不能被同一个cpu上的多个worker同时执行。这时候,如果worker发现自己要处理的work正在被另外一个worker线程处理,那么本worker线程将不处理该work,只需要挂入正在执行该work的worker线程的scheduled work list即可。

 

原创文章,转发请注明出处。蜗窝科技

标签: workqueue Concurrency Managed

评论:

www
2019-10-06 13:52
对于不同的bound的workqueue,最终大家都要添加到同样的worker pool的list上,而在一个worker pool上只会有一个active的线程处理事件,是否意味着驱动根本没有必要自己去创建workqueue,因为通过queue_work将work添加到自己创建的workqueue和添加到default的workqueue都同样要在同一个worker pool的list上排队,和直接queue_work到default的workqueue上效果一样,即不会并行处理。
是否是我什么地方的理解有问题?

谢谢
cade
2017-07-20 20:57
1. “work item” 是什么?“work item" 指work_struct 这个结构体,

ret = queue_work(lsm_workqueue, &sdata->input_work);
if (!ret)
   LOG_INFO("work was already on the queue.\n");
  
   如果上次提交的work没有执行,而再次调用queue_work,则这个应该是同一个”work item“
   还是不同的”work item“?
  即同一个work_struct 是否可以被提交多次,处于pending状态的work就没法再提交了?
  我的需求是每10ms执行一次work,即100Hz,读数据并上报给用户空间,
  通过加log发现好像是如果上次提交的work没有执行完,则无法提交;造成的结果就是在上层看来丢数据了,
  读数据的频率不是100Hz。
但我觉得不应该这样设计的,不是可以一直提交才更合理吗,类似链表管理一个work_struct的各个提交项目
请帮忙解答下,谢谢~

2. 关于 @max_active:
如下解释
@max_active determines the maximum number of execution contexts per
CPU which can be assigned to the work items of a wq.  For example,
with @max_active of 16, at most 16 work items of the wq can be
executing at the same time per CPU.

我的场景是driver中创建一个workqueue_struct,有两个work_struct A和B,A和B的执行逻辑一样,对应同一个call back
所以问题是:
    lsm_workqueue = alloc_workqueue("%s", WQ_HIGHPRI | WQ_UNBOUND | WQ_MEM_RECLAIM, 2, "lsm_wq");
这个参数max_active应该传入2 更合适吗?
lamaboy
2017-06-12 17:51
你好! 在看到如下代码:

static struct worker *create_worker(struct worker_pool *pool)
{
...
    worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
                          "kworker/%s", id_buf);
...
}

每一个线程创建的用的是同一个函数,这样,会不会引入重入的问题,是不是,只要我保证这个回调函数是线程安全的,不管在kernel、user 空间懂不会有问题,,希望博主解惑!!
lamaboy
2017-06-12 17:57
@lamaboy:还有,如果没有问题?? 我在申请中断的时候,如gpio_key ,我要申请10 个中断,用的是统一个函数,在什么情况下会有什么问题呢??
linuxer
2017-06-12 19:06
@lamaboy:多个irq number对应一个interrupt handler也是没有什么问题,当然需要仔细考虑清楚内核不同问题(也是保证重入安全的)。
linuxer
2017-06-12 18:58
@lamaboy:同意你的说法。重入问题在kernel space和userspace都是一样的,解决的方法就是:
1、全部访问thread local资源
2、在thread之间共享的资源使用锁机制保护

你可以看看worker_thread函数实现,显然,它使用了方法二来保证了多个thread重入是安全的。
lamaboy
2017-06-12 17:29
追代码的时候发现,当前工作队列线程休眠,在找不到空闲的线程的情况下 :如to_wakeup == null,是如何处理的呢?? 具体代码如下,
            if (prev->flags & PF_WQ_WORKER) {
                struct task_struct *to_wakeup;

                to_wakeup = wq_worker_sleeping(prev); // 如果当前一休眠,
                if (to_wakeup)
                    try_to_wake_up_local(to_wakeup, cookie);
            }
.//////////////

struct task_struct *wq_worker_sleeping(struct task_struct *task)
{
    struct worker *worker = kthread_data(task), *to_wakeup = NULL;
    struct worker_pool *pool;

    /*
     * Rescuers, which may not have all the fields set up like normal
     * workers, also reach here, let's not access anything before
     * checking NOT_RUNNING.
     */
    if (worker->flags & WORKER_NOT_RUNNING)
        return NULL;

    pool = worker->pool;

    /* this can only happen on the local cpu */
    if (WARN_ON_ONCE(pool->cpu != raw_smp_processor_id()))
        return NULL;

    /*
     * The counterpart of the following dec_and_test, implied mb,
     * worklist not empty test sequence is in insert_work().
     * Please read comment there.
     *
     * NOT_RUNNING is clear.  This means that we're bound to and
     * running on the local cpu w/ rq lock held and preemption
     * disabled, which in turn means that none else could be
     * manipulating idle_list, so dereferencing idle_list without pool
     * lock is safe.
     */
    if (atomic_dec_and_test(&pool->nr_running) &&
        !list_empty(&pool->worklist))
        to_wakeup = first_idle_worker(pool);  // 找不到空闲的!!
    return to_wakeup ? to_wakeup->task : NULL;
}
linuxer
2017-06-12 18:04
@lamaboy:如果to_wakeup等于null,说明当前线程池中已经有足够的worker线程了,不需要唤醒多余的worker thread了。一个典型的例子是挂在worker pool中的work list是空的,如果是空的,那么worker thread阻塞就阻塞吧,反正也没有什么活要干。
nimisolo
2017-03-28 16:08
请问,一个pool的多个work能并发处理吗?
例如:一个unbound work pool,目前有100个work需要处理,有5个worker,这5个worker能够同时运行并处理一部分work吗?
我看worker_thread中处理一个work前会首先获得pool->lock,处理好后再释放它,这是不是说明一个pool多个worker无法并发运行?
nimisolo
2017-03-28 16:52
@nimisolo:解决了...
process_one_work中在处理work之前会释放pool->lock,所以可以的
michael
2017-03-22 15:24
Hi, linuxer

有几个问题要咨询一下:
在旧机制下的workqueue,在workqueue下,一个workqueue可能包含了多个work。比如,当前正在执行work A时,被调度了,让出了CPU,那么workqueue中的其它work,可以在其它CPU上执行吗,还是说也被阻塞了?
linuxer
2017-03-22 16:28
@michael:这分两种情况:
(1)对于single threaded的workqueue,任何的work都是排队执行。如果workqueue中挂入work A B C,那么如果A阻塞,那么B和C也是会等待,直到A work完成。
(2)对于multi threaded(更准确的是per-CPU threaded)情况当然会好一些,因为该workqueue会为每一个online cpu创建一个线程来处理work。对于一个cpu上的线程,其处理逻辑等于single threaded的workqueue,即一个work阻塞了,挂入该cpu的work都会阻塞,但是其他cpu的线程处理是不受影响的。
lucky
2016-10-04 12:12
Hi linuxer

@linuxer
你之前的文章关于work同步的解释如下:
=>挂入到multi thread或者说per cpu workqueue上的指定的work,其callback是不会在一个cpu上并发执行(也就是说在多个cpu上可以并发执行)

此篇文中有这样一段话:“不是当前选定的那个pool wq对应的worker pool就适合该work,因为有时候该work可能正在其他的worker thread上执行中,在这种情况下,为了确保work的callback function不会重入,该work最好还是挂在那个worker thread pool上”
=>这一段的意思是:
1. 对于per cpu来讲保证同一个work不能在同一个cpu对应的worker pool里的多个worker thread里面同时执行,但可以在不同的cpu 的worker pool里的worker thread执行。
2. 对于UNBOUND的情况,可以将属于同一个node id的若干个个cpu视作一个逻辑cpu,保证同一个逻辑cpu里面不会出现多个worker thread执行一个相同的work,不同的node id(即不同的逻辑cpu)里面的worker thread是可以同时执行一个相同的work的。

我这样理解是正确的吗?如有偏差请指正,谢谢!
linuxer
2016-10-06 19:04
@lucky:的确,我之前在回复某个网友的提问的时候说到:挂入到multi thread或者说per cpu workqueue上的指定的work,其callback是不会在一个cpu上并发执行(也就是说在多个cpu上可以并发执行)。不过这句话是有条件限制的,当时,我的描述是针对旧的workqueue上的并发(参考的是linux2.6.23的workqueue代码)。

对于新的内核,例如4.4.6的内核,不论是bound(per cpu)或者unbounded workqueue,其work的处理都遵守下面的处理原则:在给定的时间点,同一个work只能被系统中的一个worker线程处理,也就是说,任何的work都是non-reentrant的。

发表评论:

Copyright @ 2013-2015 蜗窝科技 All rights reserved. Powered by emlog