Linux内核中的软中断、tasklet和工作队列详解
引言
软中断、tasklet和工作队列并不是Linux内核中一直存在的机制,而是由更早版本的内核中的“下半部”(bottom half)演变而来。下半部的机制实际上包括五种,但2.6版本的内核中,下半部和任务队列的函数都消失了,只剩下了前三者。
介绍这三种下半部实现之前,有必要说一下上半部与下半部的区别。 上半部指的是中断处理程序,下半部则指的是一些虽然与中断有相关性但是可以延后执行的任务。举个例子:在网络传输中,网卡接收到数据包这个事件不一定需要马上被处理,适合用下半部去实现;但是用户敲击键盘这样的事件就必须马上被响应,应该用中断实现。 两者的主要区别在于:中断不能被相同类型的中断打断,而下半部依然可以被中断打断;中断对于时间非常敏感,而下半部基本上都是一些可以延迟的工作。由于二者的这种区别,所以对于一个工作是放在上半部还是放在下半部去执行,可以参考下面4条:- 如果一个任务对时间非常敏感,将其放在中断处理程序中执行。
- 如果一个任务和硬件相关,将其放在中断处理程序中执行。
- 如果一个任务要保证不被其他中断(特别是相同的中断)打断,将其放在中断处理程序中执行。
- 其他所有任务,考虑放在下半部去执行。 有写内核任务需要延后执行,因此才有的下半部,进而实现了三种实现下半部的方法。这就是本文要讨论的软中断、tasklet和工作队列。
下表可以更直观的看到它们之间的关系。
软中断
软中断作为下半部机制的代表,是随着SMP(share memory processor)的出现应运而生的,它也是tasklet实现的基础(tasklet实际上只是在软中断的基础上添加了一定的机制)。软中断一般是“可延迟函数”的总称,有时候也包括了tasklet(请读者在遇到的时候根据上下文推断是否包含tasklet)。它的出现就是因为要满足上面所提出的上半部和下半部的区别,使得对时间不敏感的任务延后执行,而且可以在多个CPU上并行执行,使得总的系统效率可以更高。它的特性包括:
- 产生后并不是马上可以执行,必须要等待内核的调度才能执行。软中断不能被自己打断(即单个cpu上软中断不能嵌套执行),只能被硬件中断打断(上半部)。
- 可以并发运行在多个CPU上(即使同一类型的也可以)。所以软中断必须设计为可重入的函数(允许多个CPU同时操作),因此也需要使用自旋锁来保其数据结构。
相关数据结构
- 软中断描述符
struct softirq_action{ void (*action)(struct softirq_action *);};
描述每一种类型的软中断,其中void(*action)
是软中断触发时的执行函数。 - 软中断全局数据和类型
static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp; enum { HI_SOFTIRQ=0, /*用于高优先级的tasklet*/ TIMER_SOFTIRQ, /*用于定时器的下半部*/ NET_TX_SOFTIRQ, /*用于网络层发包*/ NET_RX_SOFTIRQ, /*用于网络层收报*/ BLOCK_SOFTIRQ, BLOCK_IOPOLL_SOFTIRQ, TASKLET_SOFTIRQ, /*用于低优先级的tasklet*/ SCHED_SOFTIRQ, HRTIMER_SOFTIRQ, RCU_SOFTIRQ, /* Preferable RCU should always be the last softirq */ NR_SOFTIRQS };
相关API
- 注册软中断
void open_softirq(int nr, void (*action)(struct softirq_action *))
即注册对应类型的处理函数到全局数组softirq_vec中。例如网络发包对应类型为NET_TX_SOFTIRQ的处理函数net_tx_action.
- 触发软中断
void raise_softirq(unsigned int nr)
实际上即以软中断类型nr作为偏移量置位每cpu变量irq_stat[cpu_id]的成员变量__softirq_pending,这也是同一类型软中断可以在多个cpu上并行运行的根本原因。
- 软中断执行函数
do_softirq-->__do_softirq
执行软中断处理函数__do_softirq前首先要满足两个条件:
(1)不在中断中(硬中断、软中断和NMI) 。 (2)有软中断处于pending状态。 系统这么设计是为了避免软件中断在中断嵌套中被调用,并且达到在单个CPU上软件中断不能被重入的目的。对于ARM架构的CPU不存在中断嵌套中调用软件中断的问题,因为ARM架构的CPU在处理硬件中断的过程中是关闭掉中断的。只有在进入了软中断处理过程中之后才会开启硬件中断,如果在软件中断处理过程中有硬件中断嵌套,也不会再次调用软中断,because硬件中断是软件中断处理过程中再次进入的,此时preempt_count已经记录了软件中断!对于其它架构的CPU,有可能在触发调用软件中断前,也就是还在处理硬件中断的时候,就已经开启了硬件中断,可能会发生中断嵌套,在中断嵌套中是不允许调用软件中断处理的。Why?我的理解是,在发生中断嵌套的时候,表明这个时候是系统突发繁忙的时候,内核第一要务就是赶紧把中断中的事情处理完成,退出中断嵌套。避免多次嵌套,哪里有时间处理软件中断,所以把软件中断推迟到了所有中断处理完成的时候才能触发软件中断。实现原理和实例
软中断的调度时机:
- do_irq完成I/O中断时调用irq_exit。
- 系统使用I/O APIC,在处理完本地时钟中断时。
- local_bh_enable,即开启本地软中断时。
- SMP系统中,cpu处理完被CALL_FUNCTION_VECTOR处理器间中断所触发的函数时。
- ksoftirqd/n线程被唤醒时。 下面以从中断处理返回函数irq_exit中调用软中断为例详细说明。 触发和初始化的的流程如图所示:
软中断处理流程
asmlinkage void __do_softirq(void){ struct softirq_action *h; __u32 pending; int max_restart = MAX_SOFTIRQ_RESTART; int cpu; pending = local_softirq_pending(); account_system_vtime(current); __local_bh_disable((unsigned long)__builtin_return_address(0)); lockdep_softirq_enter(); cpu = smp_processor_id(); restart: /* Reset the pending bitmask before enabling irqs */ set_softirq_pending(0); local_irq_enable(); h = softirq_vec; do { if (pending & 1) { int prev_count = preempt_count(); kstat_incr_softirqs_this_cpu(h - softirq_vec); trace_softirq_entry(h, softirq_vec); h->action(h); trace_softirq_exit(h, softirq_vec); if (unlikely(prev_count != preempt_count())) { printk(KERN_ERR "huh, entered softirq %td %s %p" "with preempt_count %08x," " exited with %08x?\n", h - softirq_vec, softirq_to_name[h - softirq_vec], h->action, prev_count, preempt_count()); preempt_count() = prev_count; } rcu_bh_qs(cpu); } h++; pending >>= 1; } while (pending); local_irq_disable(); pending = local_softirq_pending(); if (pending && --max_restart) goto restart; if (pending) wakeup_softirqd(); lockdep_softirq_exit(); account_system_vtime(current); _local_bh_enable(); }
- 首先调用local_softirq_pending函数取得目前有哪些位存在软件中断。
- 调用__local_bh_disable关闭软中断,其实就是设置正在处理软件中断标记,在同一个CPU上使得不能重入__do_softirq函数。
- 重新设置软中断标记为0,set_softirq_pending重新设置软中断标记为0,这样在之后重新开启中断之后硬件中断中又可以设置软件中断位。
- 调用local_irq_enable,开启硬件中断。
- 之后在一个循环中,遍历pending标志的每一位,如果这一位设置就会调用软件中断的处理函数。在这个过程中硬件中断是开启的,随时可以打断软件中断。这样保证硬件中断不会丢失。
- 之后关闭硬件中断(local_irq_disable),查看是否又有软件中断处于pending状态,如果是,并且在本次调用__do_softirq函数过程中没有累计重复进入软件中断处理的次数超过max_restart=10次,就可以重新调用软件中断处理。如果超过了10次,就调用wakeup_softirqd()唤醒内核的一个进程来处理软件中断。设立10次的限制,也是为了避免影响系统响应时间。
- 调用_local_bh_enable开启软中断。
软中断内核线程
之前我们分析的触发软件中断的位置其实是中断上下文中,而在软中断的内核线程中实际已经是进程的上下文。
这里说的软中断上下文指的就是系统为每个CPU建立的ksoftirqd进程。 软中断的内核进程中主要有两个大循环,外层的循环处理有软件中断就处理,没有软件中断就休眠。内层的循环处理软件中断,每循环一次都试探一次是否过长时间占据了CPU,需要调度就释放CPU给其它进程。具体的操作在注释中做了解释。set_current_state(TASK_INTERRUPTIBLE); //外层大循环。 while (!kthread_should_stop()) { preempt_disable();//禁止内核抢占,自己掌握cpu if (!local_softirq_pending()) { preempt_enable_no_resched(); //如果没有软中断在pending中就让出cpu schedule(); //调度之后重新掌握cpu preempt_disable(); } __set_current_state(TASK_RUNNING); while (local_softirq_pending()) { /* Preempt disable stops cpu going offline. If already offline, we'll be on wrong CPU: don't process */ if (cpu_is_offline((long)__bind_cpu)) goto wait_to_die; //有软中断则开始软中断调度 do_softirq(); //查看是否需要调度,避免一直占用cpu preempt_enable_no_resched(); cond_resched(); preempt_disable(); rcu_sched_qs((long)__bind_cpu); } preempt_enable(); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0; wait_to_die: preempt_enable(); /* Wait for kthread_stop */ set_current_state(TASK_INTERRUPTIBLE); while (!kthread_should_stop()) { schedule(); set_current_state(TASK_INTERRUPTIBLE); } __set_current_state(TASK_RUNNING); return 0;
tasklet
由于软中断必须使用可重入函数,这就导致设计上的复杂度变高,作为设备驱动程序的开发者来说,增加了负担。而如果某种应用并不需要在多个CPU上并行执行,那么软中断其实是没有必要的。因此诞生了弥补以上两个要求的tasklet。它具有以下特性:
a)一种特定类型的tasklet只能运行在一个CPU上,不能并行,只能串行执行。 b)多个不同类型的tasklet可以并行在多个CPU上。 c)软中断是静态分配的,在内核编译好之后,就不能改变。但tasklet就灵活许多,可以在运行时改变(比如添加模块时)。 tasklet是在两种软中断类型的基础上实现的,因此如果不需要软中断的并行特性,tasklet就是最好的选择。也就是说tasklet是软中断的一种特殊用法,即延迟情况下的串行执行。相关数据结构
- tasklet描述符
struct tasklet_struct{ struct tasklet_struct *next;//将多个tasklet链接成单向循环链表 unsigned long state;//TASKLET_STATE_SCHED(Tasklet is scheduled for execution) TASKLET_STATE_RUN(Tasklet is running (SMP only)) atomic_t count;//0:激活tasklet 非0:禁用tasklet void (*func)(unsigned long); //用户自定义函数 unsigned long data; //函数入参 };
- tasklet链表
static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);//低优先级static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);//高优先级
相关API
- 定义tasklet
#define DECLARE_TASKLET(name, func, data) \struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data } //定义名字为name的非激活tasklet #define DECLARE_TASKLET_DISABLED(name, func, data) \ struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data } //定义名字为name的激活tasklet void tasklet_init(struct tasklet_struct *t,void (*func)(unsigned long), unsigned long data) //动态初始化tasklet
- tasklet操作
static inline void tasklet_disable(struct tasklet_struct *t)//函数暂时禁止给定的tasklet被tasklet_schedule调度,直到这个tasklet被再次被enable;若这个tasklet当前在运行, 这个函数忙等待直到这个tasklet退出 static inline void tasklet_enable(struct tasklet_struct *t) //使能一个之前被disable的tasklet;若这个tasklet已经被调度, 它会很快运行。tasklet_enable和tasklet_disable必须匹配调用, 因为内核跟踪每个tasklet的"禁止次数" static inline void tasklet_schedule(struct tasklet_struct *t) //调度 tasklet 执行,如果tasklet在运行中被调度, 它在完成后会再次运行; 这保证了在其他事件被处理当中发生的事件受到应有的注意. 这个做法也允许一个 tasklet 重新调度它自己 tasklet_hi_schedule(struct tasklet_struct *t) //和tasklet_schedule类似,只是在更高优先级执行。当软中断处理运行时, 它处理高优先级 tasklet 在其他软中断之前,只有具有低响应周期要求的驱动才应使用这个函数, 可避免其他软件中断处理引入的附加周期. tasklet_kill(struct tasklet_struct *t) //确保了 tasklet 不会被再次调度来运行,通常当一个设备正被关闭或者模块卸载时被调用。如果 tasklet 正在运行, 这个函数等待直到它执行完毕。若 tasklet 重新调度它自己,则必须阻止在调用 tasklet_kill 前它重新调度它自己,如同使用 del_timer_sync
实现原理
- 调度原理
static inline void tasklet_schedule(struct tasklet_struct *t){ if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) __tasklet_schedule(t); } void __tasklet_schedule(struct tasklet_struct *t) { unsigned long flags; local_irq_save(flags); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next);//加入低优先级列表 raise_softirq_irqoff(TASKLET_SOFTIRQ);//触发软中断 local_irq_restore(flags); }
- tasklet执行过程 TASKLET_SOFTIRQ对应执行函数为tasklet_action,HI_SOFTIRQ为tasklet_hi_action,以tasklet_action为例说明,tasklet_hi_action大同小异。
static void tasklet_action(struct softirq_action *a){ struct tasklet_struct *list; local_irq_disable(); list = __get_cpu_var(tasklet_vec).head; __get_cpu_var(tasklet_vec).head = NULL; __get_cpu_var(tasklet_vec).tail = &__get_cpu_var(tasklet_vec).head;//取得tasklet链表 local_irq_enable(); while (list) { struct tasklet_struct *t = list; list = list->next; if (tasklet_trylock(t)) { if (!atomic_read(&t->count)) { //执行tasklet if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state)) BUG(); t->func(t->data); tasklet_unlock(t); continue; } tasklet_unlock(t); } //如果t->count的值不等于0,说明这个tasklet在调度之后,被disable掉了,所以会将tasklet结构体重新放回到tasklet_vec链表,并重新调度TASKLET_SOFTIRQ软中断,在之后enable这个tasklet之后重新再执行它 local_irq_disable(); t->next = NULL; *__get_cpu_var(tasklet_vec).tail = t; __get_cpu_var(tasklet_vec).tail = &(t->next); __raise_softirq_irqoff(TASKLET_SOFTIRQ); local_irq_enable(); } }
工作队列
从上面的介绍看以看出,软中断运行在中断上下文中,因此不能阻塞和睡眠,而tasklet使用软中断实现,当然也不能阻塞和睡眠。但如果某延迟处理函数需要睡眠或者阻塞呢?没关系工作队列就可以如您所愿了。
把推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events。 工作队列(work queue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行—这个下半部分总是会在进程上下文执行,但由于是内核线程,其不能访问用户空间。最重要特点的就是工作队列允许重新调度甚至是睡眠。 通常,在工作队列和软中断/tasklet中作出选择非常容易。可使用以下规则: - 如果推后执行的任务需要睡眠,那么只能选择工作队列。 - 如果推后执行的任务需要延时指定的时间再触发,那么使用工作队列,因为其可以利用timer延时(内核定时器实现)。 - 如果推后执行的任务需要在一个tick之内处理,则使用软中断或tasklet,因为其可以抢占普通进程和内核线程,同时不可睡眠。 - 如果推后执行的任务对延迟的时间没有任何要求,则使用工作队列,此时通常为无关紧要的任务。 实际上,工作队列的本质就是将工作交给内核线程处理,因此其可以用内核线程替换。但是内核线程的创建和销毁对编程者的要求较高,而工作队列实现了内核线程的封装,不易出错,所以我们也推荐使用工作队列。相关数据结构
- 正常工作结构体
struct work_struct { atomic_long_t data; //传递给工作函数的参数#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */ #define WORK_STRUCT_FLAG_MASK (3UL) #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK) struct list_head entry; //链表结构,链接同一工作队列上的工作。 work_func_t func; //工作函数,用户自定义实现 #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif }; //工作队列执行函数的原型: void (*work_func_t)(struct work_struct *work); //该函数会由一个工作者线程执行,因此其在进程上下文中,可以睡眠也可以中断。但只能在内核中运行,无法访问用户空间。
- 延迟工作结构体(延迟的实现是在调度时延迟插入相应的工作队列)
struct delayed_work { struct work_struct work; struct timer_list timer; //定时器,用于实现延迟处理};
- 工作队列结构体
struct workqueue_struct { struct cpu_workqueue_struct *cpu_wq; //指针数组,其每个元素为per-cpu的工作队列 struct list_head list; const char *name; int singlethread; //标记是否只创建一个工作者线程 int freezeable; /* Freeze threads during suspend */ int rt; #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif };
- 每cpu工作队列(每cpu都对应一个工作者线程worker_thread)
struct cpu_workqueue_struct { spinlock_t lock; struct list_head worklist; wait_queue_head_t more_work; struct work_struct *current_work; struct workqueue_struct *wq; struct task_struct *thread; } ____cacheline_aligned;
相关API
- 缺省工作队列
静态创建 DECLARE_WORK(name,function); //定义正常执行的工作项DECLARE_DELAYED_WORK(name,function);//定义延后执行的工作项动态创建INIT_WORK(_work, _func) //创建正常执行的工作项 INIT_DELAYED_WORK(_work, _func)//创建延后执行的工作项 调度默认工作队列 int schedule_work(struct work_struct *work) //对正常执行的工作进行调度,即把给定工作的处理函数提交给缺省的工作队列和工作者线程。工作者线程本质上是一个普通的内核线程,在默认情况下,每个CPU均有一个类型为“events”的工作者线程,当调用schedule_work时,这个工作者线程会被唤醒去执行工作链表上的所有工作。 系统默认的工作队列名称是:keventd_wq,默认的工作者线程叫:events/n,这里的n是处理器的编号,每个处理器对应一个线程。比如,单处理器的系统只有events/0这样一个线程。而双处理器的系统就会多一个events/1线程。 默认的工作队列和工作者线程由内核初始化时创建: start_kernel()-->rest_init-->do_basic_setup-->init_workqueues 调度延迟工作 int schedule_delayed_work(struct delayed_work *dwork,unsigned long delay) 刷新缺省工作队列 void flush_scheduled_work(void) //此函数会一直等待,直到队列中的所有工作都被执行。 取消延迟工作 static inline int cancel_delayed_work(struct delayed_work *work) //flush_scheduled_work并不取消任何延迟执行的工作,因此,如果要取消延迟工作,应该调用cancel_delayed_work。
以上均是采用缺省工作者线程来实现工作队列,其优点是简单易用,缺点是如果缺省工作队列负载太重,执行效率会很低,这就需要我们创建自己的工作者线程和工作队列。
- 自定义工作队列
create_workqueue(name) //宏定义 返回值为工作队列,name为工作线程名称。创建新的工作队列和相应的工作者线程,name用于该内核线程的命名。int queue_work(struct workqueue_struct *wq, struct work_struct *work)//类似于schedule_work,区别在于queue_work把给定工作提交给创建的工作队列wq而不是缺省队列。 int queue_delayed_work(struct workqueue_struct *wq,struct delayed_work *dwork, unsigned long delay) //调度延迟工作。 void flush_workqueue(struct workqueue_struct *wq) //刷新指定工作队列。 void destroy_workqueue(struct workqueue_struct *wq) //释放创建的工作队列。
实现原理
- 工作队列的组织结构 即workqueue_struct、cpu_workqueue_struct与work_struct的关系。 一个工作队列对应一个work_queue_struct,工作队列中每cpu的工作队列由cpu_workqueue_struct表示,而work_struct为其上的具体工作。 关系如下图所示: 2.工作队列的工作过程
- 应用实例 linux各个接口的状态(up/down)的消息需要通知netdev_chain上感兴趣的模块同时上报用户空间消息。这里使用的就是工作队列。 具体流程图如下所示:
-
- 是否处于中断中在Linux中是通过preempt_count来判断的,具体如下: 在linux系统的进程数据结构里,有这么一个数据结构: #define preempt_count() (current_thread_info()->preempt_count) 利用preempt_count可以表示是否处于中断处理或者软件中断处理过程中,如下所示: # define hardirq_count() (preempt_count() & HARDIRQ_MASK) #define softirq_count() (preempt_count() & SOFTIRQ_MASK) #define irq_count() (preempt_count() & (HARDIRQ_MASK | SOFTIRQ_MASK | NMI_MASK)) #define in_irq() (hardirq_count()) #define in_softirq() (softirq_count()) #define in_interrupt() (irq_count()) preempt_count的8~23位记录中断处理和软件中断处理过程的计数。如果有计数,表示系统在硬件中断或者软件中断处理过程中。
softirq(软中断)下半部中tasklet与workqueue的区别
一、中断处理的tasklet(小任务)机制
中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。因此,内核把中断处理分为两部分:上半部(tophalf)和下半部(bottomhalf),上半部(就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理,
首先,一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。
下半部运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。
但是,内核到底什时候执行下半部,以何种方式组织下半部?这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottomhalf(简称bh),在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面主要介绍常用的小任务(Tasklet)机制及2.6内核中的工作队列机制。
小任务机制这里的小任务是指对要推迟执行的函数进行组织的一种机制。其数据结构为tasklet_struct,每个结构代表一个独立的小任务,其定义如下:
- struct tasklet_struct {
- struct tasklet_struct *next; /*指向链表中的下一个结构*/
- unsignedlong state; /* 小任务的状态*/
- atomic_tcount; /* 引用计数器*/
- void(*func) (unsigned long); /* 要调用的函数*/
- unsignedlong data; /* 传递给函数的参数*/
- };
结构中的func域就是下半部中要推迟执行的函数,data是它唯一的参数。 State域的取值为TASKLET_STATE_SCHED或TASKLET_STATE_RUN。TASKLET_STATE_SCHED表示小任务已被调度,正准备投入运行,TASKLET_STATE_RUN表示小任务正在运行。TASKLET_STATE_RUN只有在多处理器系统上才使用,单处理器系统什么时候都清楚一个小任务是不是正在运行(它要么就是当前正在执行的代码,要么不是)。 Count域是小任务的引用计数器。如果它不为0,则小任务被禁止,不允许执行;只有当它为零,小任务才被激活,并且在被设置为挂起时,小任务才能够执行。 1. 声明和使用小任务大多数情况下,为了控制一个寻常的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。 我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个: DECLARE_TASKLET(name,func, data) DECLARE_TASKLET_DISABLED(name,func, data) 这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如: DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev); 这行代码其实等价于 structtasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0), tasklet_handler,dev}; 这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。 2. 编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型: voidtasklet_handler(unsigned long data) 由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。 3. 调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行: tasklet_schedule(&my_tasklet); /*把my_tasklet标记为挂起 */ 在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。 可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如: tasklet_disable(&my_tasklet); /*小任务现在被禁止,这个小任务不能运行*/ tasklet_enable(&my_tasklet); /* 小任务现在被激活*/ 也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。 4.tasklet的简单用法 下面是tasklet的一个简单应用,以模块的形成加载。
- #include <linux module.h="">
- #include<linux init.h="">
- #include<linux fs.h="">
- #include<linux kdev_t.h="">
- #include <linux cdev.h="">
- #include <linux kernel.h="">
- #include<linux interrupt.h="">
- static struct t asklet_struct my_tasklet;
- static void tasklet_handler (unsigned longd ata)
- {
- printk(KERN_ALERT,"tasklet_handler is running./n");
- }
- staticint __init test_init(void)
- {
- tasklet_init(&my_tasklet,tasklet_handler,0);
- tasklet_schedule(&my_tasklet);
- return0;
- }
- static void __exit test_exit(void)
- {
- tasklet_kill(&tasklet);
- printk(KERN_ALERT,"test_exit is running./n");
- }
- MODULE_LICENSE("GPL");
- module_init(test_init);
- module_exit(test_exit);
- </linux></linux></linux></linux></linux></linux></linux>
从这个例子可以看出,所谓的小任务机制是为下半部函数的执行提供了一种执行机制,也就是说,推迟处理的事情是由tasklet_handler实现,何时执行,经由小任务机制封装后交给内核去处理。
二、中断处理的工作队列机制
工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。
那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。
-
工作、工作队列和工作者线程
如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。
-
表示工作的数据结构
工作用<linux/workqueue.h>中定义的work_struct结构表示:
- struct work_struct{
- unsigned long pending; /* 这个工作正在等待处理吗?*/
- struct list_head entry; /* 连接所有工作的链表 */
- void (*func) (void *); /* 要执行的函数 */
- void *data; /* 传递给函数的参数 */
- void *wq_data; /* 内部使用 */
- struct timer_list timer; /* 延迟的工作队列所用到的定时器 */
- };
这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。
3. 创建推后的工作
要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:
DECLARE_WORK(name, void (*func) (void *), void *data);
这样就会静态地创建一个名为name,待执行函数为func,参数为data的work_struct结构。
同样,也可以在运行时通过指针创建一个工作:
INIT_WORK(struct work_struct *work, woid(*func) (void *), void *data);
这会动态地初始化一个由work指向的工作。
4. 工作队列中待执行的函数
工作队列待执行的函数原型是:
void work_handler(void *data)
这个函数会由一个工作者线程执行,因此,函数会运行在进程上下文中。默认情况下,允许响应中断,并且不持有任何锁。如果需要,函数可以睡眠。需要注意的是,尽管该函数运行在进程上下文中,但它不能访问用户空间,因为内核线程在用户空间没有相关的内存映射。通常在系统调用发生时,内核会代表用户空间的进程运行,此时它才能访问用户空间,也只有在此时它才会映射用户空间的内存。
5. 对工作进行调度
现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用
schedule_work(&work);
work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。
有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:
schedule_delayed_work(&work, delay);
这时,&work指向的work_struct直到delay指定的时钟节拍用完以后才会执行。
6. 工作队列的简单应用
- #include<linux module.h="">
- #include<linux init.h="">
- #include<linux workqueue.h="">
- staticstruct workqueue_struct *queue =NULL;
- staticstruct work_struct work;
- staticvoid work_handler(struct work_struct*data)
- {
- printk(KERN_ALERT"work handler function./n");
- }
- staticint __init test_init(void)
- {
- queue= create_singlethread_workqueue("helloworld"); /*创建一个单线程的工作队列*/
- if(!queue)
- goto err;
- INIT_WORK(&work, work_handler);
- schedule_work(&work);/*schedule_work是添加到系统的events workqueue, 要添加到自己的workqueue, 应该使用queue_work, 故此处有误*/
- return 0;
- err:
- return-1;
- }
- staticvoid __exit test_exit(void)
- {
- destroy_workqueue(queue);
- }
- MODULE_LICENSE("GPL");
- module_init(test_init);
- module_exit(test_exit);
- </linux></linux></linux>
tasklet与workqueue的区别和不同应用环境总结
tasklet | Workqueue |
处于atomic context,不能sleep | 不处于atomic context,可以sleep |
处于中断上下文,OS不可以进行进程调度 | 处于进程上下文,OS可以进行进程调度 |
运行调度它们的同一个CPU上 | 默认同一个CPU上 |
不能指定确定时间进行调度 | 不能指定确定时间进行调度或者指定至少延时一个确定时间后调度 |
只能交给ksoftirqd/0 | 可以提交给events/0,也可以提交给自定义的workqueue |
Tasklet函数带参数 | Work函数不带参数 |
Tasklet与workqueue的不同应用环境总结如下:
(1) 必须立即进行紧急处理的极少量任务放入在中断的顶半部中,此时屏蔽了与自己同类型的中断,由于任务量少,所以可以迅速不受打扰地处理完紧急任务。
(2) 需要较少时间的中等数量的急迫任务放在tasklet中。此时不会屏蔽任何中断(包括与自己的顶半部同类型的中断),所以不影响顶半部对紧急事务的处理;同时又不会进行用户进程调度,从而保证了自己急迫任务得以迅速完成。
(3) 需要较多时间且并不急迫(允许被操作系统剥夺运行权)的大量任务放在workqueue中。此时操作系统会尽量快速处理完这个任务,但如果任务量太大,期间操作系统也会有机会调度别的用户进程运行,从而保证不会因为这个任务需要运行时间将其它用户进程无法进行。
(4) 可能引起睡眠的任务放在workqueue中。因为在workqueue中睡眠是安全的。