接下来,我要跟各位介绍一下 wait_queue 的用法,以及用一个例子来说明如何使用 wait_queue。最后,我会带各位去 trace 一下 wait_queue 的原始程序代码,看看 wait_queue 是如何做到的。
我想有件事要先提及的是 Linux 在 user space 跟在 kernel space 上的差异。我们知道 Linux 是 multi-tasking 的环境,同时可以有很多人执行很多的程序。这是从 user 的观点来看的。如果就 kernel 的观点来看,是没有所谓的 multi-tasking 的。在 kernel 里,只有 single-thread。也就是说,如果你的 kernel code 正在执行,那系统里只有那部分在执行。不会有另一部分的 kernel code 也在运作。当然,这是指 single processor 的情况下,如果是 SMP 的话,那我就不清楚了。我想很多人都在 Windows 3.1 下写过程序,在那种环境下写程序,每一个程序都必须适当的将 CPU 让给别的程序使用。如果有个程序里面有一个
while (1);
的话,那保证系统就停在那里了。这种的多任务叫做 non-preemptive。它多任务的特性是由各个程序相互合作而造成的。在 Linux 的 user space 下,则是所谓的 preemptive,各个 process 喜欢执行什么就执行什么,就算你在你的程序里加上 while(1); 这一行也不会影响系统的运作。反正时间到了,系统自动就会将你的程序停住,让别的程序去执行。这是在 user space 的情况下,在 kernel 这方面,就跟 Windows 3.1 程序是一样的。在 kernel 里,你必须适当的将 CPU 的执行权释放出来。如果你在 kernel里加入 while(1); 这一行。那系统就会跟 Windows 3.1 一样。卡在那里。当然啦,我是没试过这样去改 kernel,有兴趣的人可以去试试看,如果有不同的结果,请记得告诉我。
假设我们在 kernel 里产生一个 buffer,user 可以经由 read,write 等 system call 来读取或写资料到这个 buffer 里。如果有一个 user 写资料到 buffer 时,此时 buffer 已经满了。那请问你要如何去处理这种情形呢 ? 第一种,传给 user 一个错误讯息,说 buffer 已经满了,不能再写入。第二种,将 user 的要求 block 住,等有人将 buffer 内容读走,留出空位时,再让 user 写入资料。但问题来了,你要怎么将 user 的要求 block 住。难道你要用
while ( is_full );
write_to_buffer;
这样的程序代码吗? 想想看,如果你这样做会发生什么事? 第一,kernel会一直在这个 while 里执行。第二个,如果 kernel 一直在这个 while 里执行,表示它没有办法去 maintain系统的运作。那此时系统就相当于当掉了。在这里 is_full 是一个变量,当然,你可以让 is_full 是一个 function,在这个 function里会去做别的事让 kernel 可以运作,那系统就不会当。这是一个方式。但是,如果我们使用 wait_queue 的话,那程序看起来会比较漂亮,而且也比较让人了解,如下所示:
struct wait_queue *wq = NULL; /* global variable */
while ( is_full ) {
interruptible_sleep_on( &wq );
}
write_to_buffer();
interruptible_sleep_on( &wq ) 是用来将目前的 process,也就是要求写资料到 buffer 的 process放到 wq 这个 wait_queue 里。在 interruptible_sleep_on 里,则是最后会呼叫 schedule() 来做 schedule 的动作,也就是去找另一个 process 来执行以维持系统的运作。当执行完 interruptible_sleep_on 之后,要求 write 的 process 就会被 block 住。那什么时候会恢复执行呢 ? 这个 process 之所以会被 block 住是因为 buffer 的空间满了,无法写入。但是如果有人将 buffer 的资料读取掉,则 buffer 就有空间可以让人写入。所以,有关于叫醒 process 的动作应该是在 read buffer 这方面的程序代码做的。
extern struct wait_queue *wq;
if ( !is_empty ) {
read_from_buffer();
wake_up_interruptible( &wq );
}
....
以上的程序代码应该要放在 read buffer 这部分的程序代码里,当 buffer 有多余的空间时,我们就呼叫 wake_up_interruptible( &wq ) 来将挂在 wq 上的所有 process 叫醒。请记得,我是说将 wq 上的所有 process 叫醒,所以,如果如果有10个 process 挂在 wq 上的话,那这 10 个都会被叫醒。之后,至于谁先执行。则是要看 schedule 是怎么做的。就是因为这 10 个都会被叫醒。如果 A 先执行,而且万一很不凑巧的,A 又把 buffer 写满了,那其它 9 个 process 要怎么办呢? 所以在 write buffer 的部分,需要用一个 while 来检查 buffer 目前是否满了.如果是的话,那就继续挂在 wq 上面.
上面所谈的就是 wait_queue 的用法。很简单不是吗? 接下来,我会再介绍一下 wait_queue 提供那些 function 让我们使用。让我再重申一次。wait_queue 应设为 global variable,比方叫 wq,只要任何的 process 想将自己挂在上面,就可以直接叫呼叫 sleep_on 等 function。要将 wq 上的 process 叫醒。只要呼叫 wake_up 等 function 就可以了.
就我所知,wait_queue 提供4个 function 可以使用,两个是用来将 process 加到 wait_queue 的:
sleep_on( struct wait_queue **wq );
interruptible_sleep_on( struct wait_queue **wq );
另外两个则是将process从wait_queue上叫醒的。
wake_up( struct wait_queue **wq );
wake_up_interruptible( struct wait_queue **wq );
我现在来解释一下为什么会有两组。有 interruptible 的那一组是这样子的。当我们去 read 一个没有资料可供读取的 socket 时,process 会 block 在那里。如果我们此时按下 Ctrl C,那 read() 就会传回 EINTR。像这种的 block IO 就是使用 interruptible_sleep_on() 做到的。也就是说,如果你是用 interruptible_sleep_on() 来将 process 放到 wait_queue 时,如果有人送一个 signal 给这个 process,那它就会自动从 wait_queue 中醒来。但是如果你是用 sleep_on() 把 process 放到 wq 中的话,那不管你送任何的 signal 给它,它还是不会理你的。除非你是使用 wake_up() 将它叫醒。sleep 有两组。wake_up 也有两组。wake_up_interruptible() 会将 wq 中使用 interruptible_sleep_on() 的 process 叫醒。至于 wake_up() 则是会将 wq 中所有的 process 叫醒。包括使用 interruptible_sleep_on() 的 process。
在使用 wait_queue 之前有一点需要特别的小心,呼叫 interruptible_sleep_on() 以及 sleep_on() 的 function 必须要是 reentrant。简单的说,reentrant 的意思是说此 function不会改变任何的 global variable,或者是不会 depend on 任何的 global variable,或者是在呼叫 interruptible_sleep_on() 或 sleep_on() 之后不会 depend on 任何的 global variable。因为当此 function 呼叫 sleep_on() 时,目前的 process 会被暂停执行。可能另一个 process 又会呼叫此 function。若之前的 process 将某些 information 存在 global variable,等它恢复执行时要使用,结果第二行程进来了,又把这个 global variable 改掉了。等第一个 process 恢复执行时,放在 global variable 中的 information 都变了。产生的结果恐怕就不是我们所能想象了。其实,从 process 执行指令到此 function 中所呼叫的 function 都应该是要 reentrant 的。不然,很有可能还是会有上述的情形发生.
由于 wait_queue 是 kernel 所提供的,所以,这个例子必须要放到 kernel 里去执行。我使用的这个例子是一个简单的 driver。它会 maintain 一个 buffer,大小是 8192 bytes。提供 read跟 write 的功能。当 buffer 中没有资料时,read() 会马上传回,也就是不做 block IO。而当 write buffer 时,如果呼叫 write() 时,空间已满或写入的资料比 buffer 大时,就会被 block 住,直到有人将 buffer 里的资料读出来为止。在 write buffer 的程序代码中,我们使用 wait_queue 来做到 block IO 的功能。在这里,我会将此 driver 写成 module,方便加载 kernel。
第一步,这个 driver 是一个简单的 character device driver。所以,我们先在 /dev 下产生一个 character device。major number 我们找一个比较没人使用的,像是 54,minor number 就用 0。接着下一个命令.
mknod /dev/buf c 54 0
mknod 是用来产生 special file 的 command。/dev/buf 表示要产生叫 buf 的档案,位于 /dev 下。 c 表示它是一个 character device。54 为其 major number,0 则是它的 minor number。有关 character device driver 的写法。有机会我再跟各位介绍,由于这次是讲 wait_queue,所以,就不再多提 driver 方面的东西.
第二步,我们要写一个 module,底下是这个 module 的程序代码:
buf.c
#define MODULE
#include
#include
#include
#include
#include
#define BUF_LEN 8192
int flag; /* when rp = wp,flag = 0 for empty,flag = 1 for
non-empty */
char *wp,*rp;
char buffer[BUF_LEN];
EXPORT_NO_SYMBOLS; /* don't export anything */
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static ssize_t buf_write( struct file *filp,const char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static int buf_open( struct inode *inode,struct file *filp )
{
MOD_INC_USE_COUNT;
return 0;
}
static int buf_release( struct inode *inode,struct file *filp )
{
MOD_DEC_USE_COUNT;
return 0;
}
static struct file_operations buf_fops = {
NULL, /* lseek */
buf_read,
buf_write,
NULL, /* readdir */
NULL, /* poll */
NULL, /* ioctl */
NULL, /* mmap */
buf_open, /* open */
NULL, /* flush */
buf_release, /* release */
NULL, /* fsync */
NULL, /* fasync */
NULL, /* check_media_change */
NULL, /* revalidate */
NULL /* lock */
};
static int buf_init()
{
int result;
flag = 0;
wp = rp = buf;
result = register_chrdev( 54,"buf",&buf_fops );
if ( result < 0 ) {
printk( "<5>buf: cannot get major 54\n" );
return result;
}
return 0;
}
static void buf_clean()
{
if ( unregister_chrdev( 54,"buf" ) ) {
printk( "<5>buf: unregister_chrdev error\n" );
}
}
int init_module( void )
{
return buf_init();
}
void cleanup_module( void )
{
buf_clean();
}
有关 module 的写法,请各位自行参考其它的文件,最重要的是要有 init_module()和 cleanup_module() 这两个 function。我在这两个 function 里分别做 initialize 和 finalize 的动作。现在分别解释一下。在 init_module() 里,只有呼叫 buf_init() 而己。其实,也可以将 buf_init() 的 code 写到 init_module() 里。只是我觉得这样比较好而已。
flag = 0;
wp = rp = buf;
result = register_chrdev( 54,"buf",&buf_fops );
if ( result < 0 ) {
printk( "<5>buf: cannot get major 54\n" );
return result;
}
return 0;
init_buf() 做的事就是去注册一个 character device driver。在注册一个 character device driver 之前,必须要先准备一个型别为 file_operations 结构的变量,file_operations 里包含了一些 function pointer。driver 的作者必须自己写这些 function。并将 function address 放到这个结构里。如此一来,当 user 去读取这个 device 时,kernel 才有办法去呼叫对应这个 driver 的 function。其实,简要来讲。character device driver 就是这么一个 file_operations 结构的变量。file_operations 定义在 这个档案里。它的 prototype 在 kernel 2.2.1 与以前的版本有些微的差异,这点是需要注意的地方。
register_chrdev() 看名字就大概知道是要注册 character device driver。第一个参数是此 device 的 major number。第二个是它的名字。名字你可以随便取。第三个的参数就是一个 file_operations 变量的地址。init_module() 必须要传回 0,module 才会被加载。
在 cleanup_module() 的部分,我们也是只呼叫 buf_clean() 而已。它做的事是 unregister 的动作。
if ( unregister_chrdev( 54,"buf" ) ) {
printk( "<5>buf: unregister_chrdev error\n" );
}
也就是将原本记录在 device driver table 上的资料洗掉。第一个参数是 major number。第二个则是此 driver 的名称,这个名字必须要跟 register_chrdev() 中所给的名字一样才行。
现在我们来看看此 driver 所提供的 file_operations 是那些。
static struct file_operations buf_fops = {
NULL, /* lseek */
buf_read,
buf_write,
NULL, /* readdir */
NULL, /* poll */
NULL, /* ioctl */
NULL, /* mmap */
buf_open, /* open */
NULL, /* flush */
buf_release, /* release */
NULL, /* fsync */
NULL, /* fasync */
NULL, /* check_media_change */
NULL, /* revalidate */
NULL /* lock */
};
在此,我们只打算 implement buf_read(),buf_write(),buf_open,和 buf_release()等 function 而已。当 user 对这个 device 呼叫 open() 的时候,buf_open() 会在最后被 kernel 呼叫。相同的,当呼叫 close(),read(),和 write() 时,buf_release(),buf_read(),和 buf_write() 也都会分别被呼叫。首先,我们先来看看 buf_open()。
static int buf_open( struct inode *inode,struct file *filp )
MOD_INC_USE_COUNT;
return 0;
}
buf_open() 做的事很简单。就是将此 module 的 use count 加一。这是为了避免当此 module 正被使用时不会被从 kernel 移除掉。相对应的,在 buf_release() 中,我们应该要将 use count 减一。就像开启档案一样。有 open(),就应该要有对应的 close() 才行。如果 module 的 use count 在不为 0 的话,那此 module 就无法从 kernel 中移除了。
static int buf_release( struct inode *inode,struct file *filp )
{
MOD_DEC_USE_COUNT;
return 0;
}
接下来,我们要看一下buf_read()和buf_write()。
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static ssize_t buf_write( struct file *filp,const char *buf,
size_t count,loff_t *ppos )
{
return count;
}
在此,我们都只是回传 user 要求读取或写入的字符数目而已。在此,我要说明一下这些参数的意义。filp 是一个 file 结构的 pointer。也就是指我们在 /dev 下所产生的 buf 档案的 file 结构。当我们呼叫 read() 或 write() 时,必须要给一个 buffer 以及要读写的长度。Buf 指的就是这个 buffer,而 count 指的就是长度。至于 ppos 是表示目前这个档案的 offset 在那里。这个值对普通档案是有用的。也就是跟 lseek() 有关系。由于在这里是一个 drvice。所以 ppos 在此并不会用到。有一点要小心的是,上面参数 buf 是一个地址,而且还是一个 user space 的地址,当 kernel 呼叫 buf_read() 时,程序在位于 kernel space。所以你不能直接读写资料到 buf 里。必须先切换 FS 这个 register 才行。
Makefile
P = buf
OBJ = buf.o
INCLUDE = -I/usr/src/linux/include/linux
CFLAGS = -D__KERNEL__ -DMODVERSIONS -DEXPORT_SYMTAB -O $(INCLUDE) \
-include /usr/src/linux/include/linux/modversions.h
CC = gcc
$(P): $(OBJ)
ld -r $(OBJ) -o $(P).o
.c.o:
$(CC) -c $(CFLAGS) $<
clean:
rm -f *.o *~ $(P)
加入上面这个 Makefile,打入 make 之后,就会产生一个 buf.o 的档案。利用 insmod 将 buf.o 载到 kernel 里。相信大家应该都用过 /dev/zero 这个 device。去读取这个 device,只会得到空的内容。写资料到这个 device 里也只会石沈大海。现在你可以去比较 buf 和 zero 这两个 device。两者的行为应该很类似才是。
第三步,我们在第二步中 implement 一个像 zero 的 device driver。我们现在要经由修改它来使用 wait_queue。首先,我们先加入一个 global variable,write_wq,并把它设为 NULL。
struct wait_queue *write_wq = NULL;
然后,在 buf_read() 里,我们要改写成这个样子。
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
int num,nRead;
nRead = 0;
while ( ( wp == rp ) && !flag ) { /* buffer is empty */
return 0;
}
repeate_reading:
if ( rp < wp ) {
num = min( count,( int ) ( wp-rp ) );
}
else {
num = min( count,( int ) ( buffer BUF_LEN-rp ) );
}
copy_to_user( buf,rp,num );
rp = num;
count -= num;
nRead = num;
if ( rp == ( buffer BUF_LEN ) )
rp = buffer;
if ( ( rp != wp ) && ( count > 0 ) )
goto repeate_reading;
flag = 0;
wake_up_interruptible( &write_wq );
return nRead;
}
在前头我有提到,buf 的地址是属于 user space 的。在 kernel space 中,你不能像普通写到 buffer 里一样直接将资料写到 buf 里,或直接从 buf 里读资料。Linux 里使用 FS 这个 register 来当作 kernel space 和 user space 的切换。所以,如果你想手动的话,可以这样做:
mm_segment_t fs;
fs = get_fs();
set_fs( USER_DS );
write_data_to_buf( buf );
set_fs( fs );
也就是先切换到 user space,再写资料到 buf 里。之后记得要切换回来 kernel space。这种自己动手的方法比较麻烦,所以 Linux 提供了几个 function,可以让我们直接在不同的 space 之间做资料的搬移。诚如各位所见,copy_to_user() 就是其中一个。
copy_to_user( to,from,n );
copy_from_user( to,from,n );
顾名思义,copy_to_user() 就是将资料 copy 到 user space 的 buffer 里,也就是从 to 写到 from,n 为要 copy 的 byte 数。相同的,copy_from_user() 就是将资料从 user space 的 from copy 到位于 kernel 的 to 里,长度是 n bytes。在以前的 kernel 里,这两个 function 的前身是 memcpy_tofs() 和 memcpy_fromfs(),不知道为什么到了 kernel 2.2.1之后,名字就被改掉了。至于它们的程序代码有没有更改就不太清楚了。至于到那一版才改的。我没有仔细去查,只知道在 2.0.36 时还没改,到了 2.2.1 就改了。这两个 function 是 macro,都定义在 里。要使用前记得先 include 进来。
相信 buf_read() 的程序代码应当不难了解才对。不知道各位有没有看到,在buf_read() 的后面有一行的程序,就是
wake_up_interruptible( &write_wq );
write_wq 是我们用来放那些想要写资料到 buffer,但 buffer 已满的 process。这一行的程序会将挂在此 queue 上的 process 叫醒。当 queue 是空的时,也就是当 write_wq 为 NULL 时,wake_up_interruptible() 并不会造成任何的错误。接下来,我们来看看更改后的 buf_write()。
static ssize_t buf_write( struct file *filp,const char *buf,size_t count,loff_t *ppos )
{
int num,nWrite;
nWrite = 0;
while ( ( wp == rp ) && flag ) {
interruptible_sleep_on( &write_wq );
}
repeate_writing:
if ( rp > wp ) {
num = min( count,( int ) ( rp - wp ) );
}
else {
num = min( count,( int ) ( buffer BUF_LEN - wp ) );
}
copy_from_user( wp,buf,num );
wp = num;
count -= num;
nWrite = num;
if ( wp == ( buffer BUF_LEN ) ) {
wp = buffer;
}
if ( ( wp != rp ) && ( count > 0 ) ) {
goto repeate_writing;
}
flag = 1;
return nWrite;
}
我们把 process 丢到 write_wq 的动作放在 buf_write() 里。当 buffer 已满时,就直接将 process 丢到 write_wq 里.
while ( ( wp == rp ) && flag ) {
interruptible_sleep_on( &write_wq );
}
好了。现在程序已经做了一些修改。再重新 make 一次,利用 insmod 将 buf.o 载到 kernel 里就行了。接着,我们就来试验一下是不是真正做到 block IO.
# cd /dev
# ls -l ~/WWW-HOWTO
-rw-r--r-- 1 root root 23910 Apr 14 16:50 /root/WWW-HOWTO
# cat ~/WWW-HOWTO > buf
执行到这里,应该会被 block 住。现在,我们再开一个 shell 出来.
# cd /dev
# cat buf
..。( contents of WWW-HOWTO ) ..。skip ...
此时,WWW-HOWTO 的内容就会出现了。而且之前 block 住的 shell 也已经回来了。最后,试验结束,可以下
# rmmod buf
将 buf 这个 module 从 kernel 中移除。以上跟各位介绍的就是 wait_queue 的使用。希望能对各位有所助益。
我想对某些人来讲,会使用一个东西就够了。然而对某些人来讲,可能也很希望知道这项东西是如何做出来的。至少我就是这种人。在下面,我将为各位介绍 wait_queue 的 implementation。如果对其 implementation 没兴趣,以下这一段就可以略过不用看了。
wait_queue 是定义在 里,我们可以先看看它的数据结构是怎么样:
struct wait_queue {
struct task_struct * task;
struct wait_queue * next;
};
很简单是吧。这个结构里面只有二个字段,一个是 task_struct 的 pointer,另一个则是 wait_queue 的 pointer。很明显的,我们可以看出 wait_queue 其实就是一个 linked list,而且它还是一个 circular linked list。 其中 task_struct 就是用来指呼叫 sleep_on 等 function的 process。在 Linux 里,每一个 process 是由一个 task_struct 来描叙。task_struct 是一个很大的的结构,在此我们不会讨论。Linux 里有一个 global variable,叫 current,它会指到目前正在执行的 process 的 task_struct 结构。这也就是为什么当 process 呼叫 system call,切换到 kernel 时,kernel 会知道是那个 process 呼叫的。
好,我们现在来看看 interruptible_sleep_on() 和 sleep_on() 是如何做的。这两个 function 都是位于 /usr/src/linux/kernel/sched.c 里。
void interruptible_sleep_on(struct wait_queue **p)
{
SLEEP_ON_VAR
current->state = TASK_INTERRUPTIBLE;
SLEEP_ON_HEAD
schedule();
SLEEP_ON_TAIL
}
void sleep_on(struct wait_queue **p)
{
SLEEP_ON_VAR
current->state = TASK_UNINTERRUPTIBLE;
SLEEP_ON_HEAD
schedule();
SLEEP_ON_TAIL
}
各位有没有发现这两个 function 很类似。是的,它们唯一的差别就在于
current->state = ...
这一行而已。之前,我们有说过,interruptible_sleep_on() 可以被 signal 中断,所以,其 current->state 被设为 TASK_INTERRUPTIBLE。而 sleep_on() 没办法被中断,所以 current->state 设为 TASK_UNINTERRUPTIBLE。接下来,我们只看 interruptible_sleep_on() 就好了。毕竟它们两的差异只在那一行而已。
在 sched.c 里,SLEEP_ON_VAR 是一个 macro,其实它只是定义两个区域变量出来而已。
#defineSLEEP_ON_VAR\
unsigned long flags;\
struct wait_queue wait;
刚才我也说过,current 这个变量是指到目前正在执行的 process 的 task_struct 结构。所以 current->state = TASK_INTERRUPTIBLE 会设定在呼叫 interruptible_sleep_on() 的 process 身上。至于 SLEEP_ON_HEAD 做的事,则是将 current 的值放到 SLEEP_ON_VAR 宣告的 wait 变量里,并把 wait 放到 interruptible_sleep_on() 的参数所属的 wait_queue list 中。
#defineSLEEP_ON_HEAD\
wait.task = current;\
write_lock_irqsave(&waitqueue_lock,flags);\
__add_wait_queue(p,&wait);\
write_unlock(&waitqueue_lock);
wait 是在 SLEEP_ON_VAR 中宣告的区域变量。其 task 字段被设成呼叫 interruptible_sleep_on() 的 process。至于 waitqueue_lock 这个变量是一个 spin lock。 waitqueue_lock 是用来确保同一时间只能有一个 writer。但同一时间则可以有好几个 reader。也就是说 waitqueue_lock 是用来保证 critical section 的 mutual exclusive access。
unsigned long flags;
write_lock_irqsave(&waitqueue_lock,flags);
...critical section ...
write_unlock(&waitqueue_lock)
学过 OS 的人应该知道 critical section 的作用是什么,如有需要,请自行参考 OS 参考书。在 critical section 里只做一件事,就是将 wait 这个区域变量放到 p 这个 wait_queue list 中。 p 是 user 在呼叫 interruptible_sleep_on() 时传进来的,它的型别是 struct wait_queue **。在此, critical section 只呼叫 __add_wait_queue()。
extern inline void __add_wait_queue(struct wait_queue ** p,
struct wait_queue * wait)
{
wait->next = *p ? : WAIT_QUEUE_HEAD(p);
*p = wait;
}
__add_wait_queue() 是一个inline function,定义在 中。WAIT_QUEUE_HEAD()是个很有趣的 macro,待会我们再讨论。现在只要知道它会传回这个 wait_queue 的开头就可以了。所以,__add_wait_queue() 的意思就是要把 wait 放到 p 所属的 wait_queue list 的开头。但是,大家还记得吗? 在上面的例子里,一开始我们是把 write_wq 设为 NULL。也就是说 *p 是 NULL。所以,当 *p 是 NULL 时,
wait->next = WAIT_QUEUE_HEAD(p)
是什么意思呢?
所以,现在,我们来看一下 WAIT_QUEUE_HEAD() 是怎么样的一个 macro,它是定义在 里。
#define WAIT_QUEUE_HEAD(x) ((struct wait_queue *)((x)-1))
x 型别是 struct wait_queue **,因为是一个 pointer,所以大小是 4 byte。因此,若 x 为 100 的话,那 ((x)-1) 就变成 96。如下图所示。 WAIT_QUEUE_HEAD(x) 其实会传回 96,而且将其转型为 struct wait_queue*,各位可以看看。原本的 wait_queue* 只配制在 100-104 之间。现在 WAIT_QUEUE_HEAD(x) 却直接传回96,但是 96-100 这块位置根本没有被我们配置起来。更妙的事。由于 x 是一个 wait_queue list 的开头,我们始终不会用到 96-100 这块,我们只会直接使用到 100-104 这块内存。这也算是 wait_queue 一项比较奇怪的 implementation 方式吧。下面有三张图,第一张表示我们宣告了一个 wait_queue* 的变量,地址在 100。另外还有一个 wait_queue 的变量,名叫 wait。第二张图是我们呼叫 interruptible_sleep_on() 之后得到的结果。第三张则是我们又宣告一个 wait_queue,名叫 ano_wait,将 ano_wait 放到 wait_queue list 后的结果就第三张图所显示的。http:/linuxfab.cx/Columns/10/wqq.GIF
在 interruptible_sleep_on() 中,当呼叫完 SLEEP_ON_HEAD 之后,目前的 process 就已经被放到 wait_queue 中了。接下来会直接呼叫 schedule(),这个 function 是用来做 scheduling 用的。current 所指到的 process 会被放到 scheduling queue 中等待被挑出来执行。执行完 schedule() 之后,current 就没办法继续执行了。而当 current 以后被 wake up 时,就会从 schedule() 之后,也就是从 SLEEP_ON_TAIL 开始执行。SLEEP_ON_TAIL 做的事刚好跟 SLEEP_ON_HEAD 相反,它会将此 process 从 wait_queue 中移除。
#defineSLEEP_ON_TAIL\
write_lock_irq(&waitqueue_lock);\
__remove_wait_queue(p,&wait);\
write_unlock_irqrestore(&waitqueue_lock,flags);
跟 SLEEP_ON_HEAD 一样。SLEEP_ON_TAIL 也是利用 spin lock 包住一个 critical section。
extern inline void __remove_wait_queue(struct wait_queue ** p,struct
wait_queue * wait)
{
struct wait_queue * next = wait->next;
struct wait_queue * head = next;
struct wait_queue * tmp;
while ((tmp = head->next) != wait) {
head = tmp;
}
head->next = next;
}
__remove_wait_queue() 是一个 inline function,也是同样定义在 里。是用来将 wait 从 p 这个 wait_queue list 中移除掉。
现在,大家应该已经清楚了 interruptible_sleep_on() 和 sleep_on() 的做法,也应该比较清楚 wait_queue 是如何的做到 block IO。接下来,我们继续看 wake_up_interruptible() 和 wake_up() 是如何 implement 的。wake_up_interruptible() 和 wake_up() 其实是两个 macro,都定义在 里。
#define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE | \
TASK_INTERRUPTIBLE)
#define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE)
从这里可以看出,两个 macro 几乎是一样的,差别只在于传给 __wake_up() 中的一个 flag 有所差异而已。其实,wake_up() 传给 __wake_up() 的是 TASK_UNINTERRUPTIBLE|TASK_INTERRUPTIBLE,意思是说它会将 wait_queue list 中 process->state 是 TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE 的所有 process 叫醒。而 wake_up_interruptible() 则只将 state是 TASK_INTERRUPTIBLE 的叫醒.
void __wake_up(struct wait_queue **q,unsigned int mode)
{
struct wait_queue *next;
read_lock(&waitqueue_lock);
if (q && (next = *q)) {
struct wait_queue *head;
head = WAIT_QUEUE_HEAD(q);
while (next != head) {
struct task_struct *p = next->task;
next = next->next;
if (p->state & mode)
wake_up_process(p);
}
}
read_unlock(&waitqueue_lock);
}
在 wake up 的过程中,我们不需要设定 write lock,但是仍要设定 read lock,这是为了避免有人在我们读取 wait_queue 时去写 wait_queue list 的内容,造成 inconsistent。在这段程序代码中,是去 transverse 整个 list,如果 process 的 state 跟 mode 有吻合,则呼叫 wake_up_process() 将它叫醒。
void wake_up_process(struct task_struct * p)
{
unsigned long flags;
spin_lock_irqsave(&runqueue_lock,flags);
p->state = TASK_RUNNING;
if (!p->next_run) {
add_to_runqueue(p);
reschedule_idle(p);
}
spin_unlock_irqrestore(&runqueue_lock,flags);
}
在此,runqueue_lock 也是一个 spin lock,kernel 依然在此设一个 critical section 以方便更改 run queue。Run queue 是用来放可以执行的 process 用的。在放入 run queue 之前,会先将 process 的 state 设为 TASK_RUNNING。
wait_queue 其实是一个蛮好用的东西。相信只要各位有机会去修改 kernel 的话,都应该有机会用到它才对。希望对大家有点帮助。
我想有件事要先提及的是 Linux 在 user space 跟在 kernel space 上的差异。我们知道 Linux 是 multi-tasking 的环境,同时可以有很多人执行很多的程序。这是从 user 的观点来看的。如果就 kernel 的观点来看,是没有所谓的 multi-tasking 的。在 kernel 里,只有 single-thread。也就是说,如果你的 kernel code 正在执行,那系统里只有那部分在执行。不会有另一部分的 kernel code 也在运作。当然,这是指 single processor 的情况下,如果是 SMP 的话,那我就不清楚了。我想很多人都在 Windows 3.1 下写过程序,在那种环境下写程序,每一个程序都必须适当的将 CPU 让给别的程序使用。如果有个程序里面有一个
while (1);
的话,那保证系统就停在那里了。这种的多任务叫做 non-preemptive。它多任务的特性是由各个程序相互合作而造成的。在 Linux 的 user space 下,则是所谓的 preemptive,各个 process 喜欢执行什么就执行什么,就算你在你的程序里加上 while(1); 这一行也不会影响系统的运作。反正时间到了,系统自动就会将你的程序停住,让别的程序去执行。这是在 user space 的情况下,在 kernel 这方面,就跟 Windows 3.1 程序是一样的。在 kernel 里,你必须适当的将 CPU 的执行权释放出来。如果你在 kernel里加入 while(1); 这一行。那系统就会跟 Windows 3.1 一样。卡在那里。当然啦,我是没试过这样去改 kernel,有兴趣的人可以去试试看,如果有不同的结果,请记得告诉我。
假设我们在 kernel 里产生一个 buffer,user 可以经由 read,write 等 system call 来读取或写资料到这个 buffer 里。如果有一个 user 写资料到 buffer 时,此时 buffer 已经满了。那请问你要如何去处理这种情形呢 ? 第一种,传给 user 一个错误讯息,说 buffer 已经满了,不能再写入。第二种,将 user 的要求 block 住,等有人将 buffer 内容读走,留出空位时,再让 user 写入资料。但问题来了,你要怎么将 user 的要求 block 住。难道你要用
while ( is_full );
write_to_buffer;
这样的程序代码吗? 想想看,如果你这样做会发生什么事? 第一,kernel会一直在这个 while 里执行。第二个,如果 kernel 一直在这个 while 里执行,表示它没有办法去 maintain系统的运作。那此时系统就相当于当掉了。在这里 is_full 是一个变量,当然,你可以让 is_full 是一个 function,在这个 function里会去做别的事让 kernel 可以运作,那系统就不会当。这是一个方式。但是,如果我们使用 wait_queue 的话,那程序看起来会比较漂亮,而且也比较让人了解,如下所示:
struct wait_queue *wq = NULL; /* global variable */
while ( is_full ) {
interruptible_sleep_on( &wq );
}
write_to_buffer();
interruptible_sleep_on( &wq ) 是用来将目前的 process,也就是要求写资料到 buffer 的 process放到 wq 这个 wait_queue 里。在 interruptible_sleep_on 里,则是最后会呼叫 schedule() 来做 schedule 的动作,也就是去找另一个 process 来执行以维持系统的运作。当执行完 interruptible_sleep_on 之后,要求 write 的 process 就会被 block 住。那什么时候会恢复执行呢 ? 这个 process 之所以会被 block 住是因为 buffer 的空间满了,无法写入。但是如果有人将 buffer 的资料读取掉,则 buffer 就有空间可以让人写入。所以,有关于叫醒 process 的动作应该是在 read buffer 这方面的程序代码做的。
extern struct wait_queue *wq;
if ( !is_empty ) {
read_from_buffer();
wake_up_interruptible( &wq );
}
....
以上的程序代码应该要放在 read buffer 这部分的程序代码里,当 buffer 有多余的空间时,我们就呼叫 wake_up_interruptible( &wq ) 来将挂在 wq 上的所有 process 叫醒。请记得,我是说将 wq 上的所有 process 叫醒,所以,如果如果有10个 process 挂在 wq 上的话,那这 10 个都会被叫醒。之后,至于谁先执行。则是要看 schedule 是怎么做的。就是因为这 10 个都会被叫醒。如果 A 先执行,而且万一很不凑巧的,A 又把 buffer 写满了,那其它 9 个 process 要怎么办呢? 所以在 write buffer 的部分,需要用一个 while 来检查 buffer 目前是否满了.如果是的话,那就继续挂在 wq 上面.
上面所谈的就是 wait_queue 的用法。很简单不是吗? 接下来,我会再介绍一下 wait_queue 提供那些 function 让我们使用。让我再重申一次。wait_queue 应设为 global variable,比方叫 wq,只要任何的 process 想将自己挂在上面,就可以直接叫呼叫 sleep_on 等 function。要将 wq 上的 process 叫醒。只要呼叫 wake_up 等 function 就可以了.
就我所知,wait_queue 提供4个 function 可以使用,两个是用来将 process 加到 wait_queue 的:
sleep_on( struct wait_queue **wq );
interruptible_sleep_on( struct wait_queue **wq );
另外两个则是将process从wait_queue上叫醒的。
wake_up( struct wait_queue **wq );
wake_up_interruptible( struct wait_queue **wq );
我现在来解释一下为什么会有两组。有 interruptible 的那一组是这样子的。当我们去 read 一个没有资料可供读取的 socket 时,process 会 block 在那里。如果我们此时按下 Ctrl C,那 read() 就会传回 EINTR。像这种的 block IO 就是使用 interruptible_sleep_on() 做到的。也就是说,如果你是用 interruptible_sleep_on() 来将 process 放到 wait_queue 时,如果有人送一个 signal 给这个 process,那它就会自动从 wait_queue 中醒来。但是如果你是用 sleep_on() 把 process 放到 wq 中的话,那不管你送任何的 signal 给它,它还是不会理你的。除非你是使用 wake_up() 将它叫醒。sleep 有两组。wake_up 也有两组。wake_up_interruptible() 会将 wq 中使用 interruptible_sleep_on() 的 process 叫醒。至于 wake_up() 则是会将 wq 中所有的 process 叫醒。包括使用 interruptible_sleep_on() 的 process。
在使用 wait_queue 之前有一点需要特别的小心,呼叫 interruptible_sleep_on() 以及 sleep_on() 的 function 必须要是 reentrant。简单的说,reentrant 的意思是说此 function不会改变任何的 global variable,或者是不会 depend on 任何的 global variable,或者是在呼叫 interruptible_sleep_on() 或 sleep_on() 之后不会 depend on 任何的 global variable。因为当此 function 呼叫 sleep_on() 时,目前的 process 会被暂停执行。可能另一个 process 又会呼叫此 function。若之前的 process 将某些 information 存在 global variable,等它恢复执行时要使用,结果第二行程进来了,又把这个 global variable 改掉了。等第一个 process 恢复执行时,放在 global variable 中的 information 都变了。产生的结果恐怕就不是我们所能想象了。其实,从 process 执行指令到此 function 中所呼叫的 function 都应该是要 reentrant 的。不然,很有可能还是会有上述的情形发生.
由于 wait_queue 是 kernel 所提供的,所以,这个例子必须要放到 kernel 里去执行。我使用的这个例子是一个简单的 driver。它会 maintain 一个 buffer,大小是 8192 bytes。提供 read跟 write 的功能。当 buffer 中没有资料时,read() 会马上传回,也就是不做 block IO。而当 write buffer 时,如果呼叫 write() 时,空间已满或写入的资料比 buffer 大时,就会被 block 住,直到有人将 buffer 里的资料读出来为止。在 write buffer 的程序代码中,我们使用 wait_queue 来做到 block IO 的功能。在这里,我会将此 driver 写成 module,方便加载 kernel。
第一步,这个 driver 是一个简单的 character device driver。所以,我们先在 /dev 下产生一个 character device。major number 我们找一个比较没人使用的,像是 54,minor number 就用 0。接着下一个命令.
mknod /dev/buf c 54 0
mknod 是用来产生 special file 的 command。/dev/buf 表示要产生叫 buf 的档案,位于 /dev 下。 c 表示它是一个 character device。54 为其 major number,0 则是它的 minor number。有关 character device driver 的写法。有机会我再跟各位介绍,由于这次是讲 wait_queue,所以,就不再多提 driver 方面的东西.
第二步,我们要写一个 module,底下是这个 module 的程序代码:
buf.c
#define MODULE
#include
#include
#include
#include
#include
#define BUF_LEN 8192
int flag; /* when rp = wp,flag = 0 for empty,flag = 1 for
non-empty */
char *wp,*rp;
char buffer[BUF_LEN];
EXPORT_NO_SYMBOLS; /* don't export anything */
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static ssize_t buf_write( struct file *filp,const char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static int buf_open( struct inode *inode,struct file *filp )
{
MOD_INC_USE_COUNT;
return 0;
}
static int buf_release( struct inode *inode,struct file *filp )
{
MOD_DEC_USE_COUNT;
return 0;
}
static struct file_operations buf_fops = {
NULL, /* lseek */
buf_read,
buf_write,
NULL, /* readdir */
NULL, /* poll */
NULL, /* ioctl */
NULL, /* mmap */
buf_open, /* open */
NULL, /* flush */
buf_release, /* release */
NULL, /* fsync */
NULL, /* fasync */
NULL, /* check_media_change */
NULL, /* revalidate */
NULL /* lock */
};
static int buf_init()
{
int result;
flag = 0;
wp = rp = buf;
result = register_chrdev( 54,"buf",&buf_fops );
if ( result < 0 ) {
printk( "<5>buf: cannot get major 54\n" );
return result;
}
return 0;
}
static void buf_clean()
{
if ( unregister_chrdev( 54,"buf" ) ) {
printk( "<5>buf: unregister_chrdev error\n" );
}
}
int init_module( void )
{
return buf_init();
}
void cleanup_module( void )
{
buf_clean();
}
有关 module 的写法,请各位自行参考其它的文件,最重要的是要有 init_module()和 cleanup_module() 这两个 function。我在这两个 function 里分别做 initialize 和 finalize 的动作。现在分别解释一下。在 init_module() 里,只有呼叫 buf_init() 而己。其实,也可以将 buf_init() 的 code 写到 init_module() 里。只是我觉得这样比较好而已。
flag = 0;
wp = rp = buf;
result = register_chrdev( 54,"buf",&buf_fops );
if ( result < 0 ) {
printk( "<5>buf: cannot get major 54\n" );
return result;
}
return 0;
init_buf() 做的事就是去注册一个 character device driver。在注册一个 character device driver 之前,必须要先准备一个型别为 file_operations 结构的变量,file_operations 里包含了一些 function pointer。driver 的作者必须自己写这些 function。并将 function address 放到这个结构里。如此一来,当 user 去读取这个 device 时,kernel 才有办法去呼叫对应这个 driver 的 function。其实,简要来讲。character device driver 就是这么一个 file_operations 结构的变量。file_operations 定义在 这个档案里。它的 prototype 在 kernel 2.2.1 与以前的版本有些微的差异,这点是需要注意的地方。
register_chrdev() 看名字就大概知道是要注册 character device driver。第一个参数是此 device 的 major number。第二个是它的名字。名字你可以随便取。第三个的参数就是一个 file_operations 变量的地址。init_module() 必须要传回 0,module 才会被加载。
在 cleanup_module() 的部分,我们也是只呼叫 buf_clean() 而已。它做的事是 unregister 的动作。
if ( unregister_chrdev( 54,"buf" ) ) {
printk( "<5>buf: unregister_chrdev error\n" );
}
也就是将原本记录在 device driver table 上的资料洗掉。第一个参数是 major number。第二个则是此 driver 的名称,这个名字必须要跟 register_chrdev() 中所给的名字一样才行。
现在我们来看看此 driver 所提供的 file_operations 是那些。
static struct file_operations buf_fops = {
NULL, /* lseek */
buf_read,
buf_write,
NULL, /* readdir */
NULL, /* poll */
NULL, /* ioctl */
NULL, /* mmap */
buf_open, /* open */
NULL, /* flush */
buf_release, /* release */
NULL, /* fsync */
NULL, /* fasync */
NULL, /* check_media_change */
NULL, /* revalidate */
NULL /* lock */
};
在此,我们只打算 implement buf_read(),buf_write(),buf_open,和 buf_release()等 function 而已。当 user 对这个 device 呼叫 open() 的时候,buf_open() 会在最后被 kernel 呼叫。相同的,当呼叫 close(),read(),和 write() 时,buf_release(),buf_read(),和 buf_write() 也都会分别被呼叫。首先,我们先来看看 buf_open()。
static int buf_open( struct inode *inode,struct file *filp )
MOD_INC_USE_COUNT;
return 0;
}
buf_open() 做的事很简单。就是将此 module 的 use count 加一。这是为了避免当此 module 正被使用时不会被从 kernel 移除掉。相对应的,在 buf_release() 中,我们应该要将 use count 减一。就像开启档案一样。有 open(),就应该要有对应的 close() 才行。如果 module 的 use count 在不为 0 的话,那此 module 就无法从 kernel 中移除了。
static int buf_release( struct inode *inode,struct file *filp )
{
MOD_DEC_USE_COUNT;
return 0;
}
接下来,我们要看一下buf_read()和buf_write()。
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
return count;
}
static ssize_t buf_write( struct file *filp,const char *buf,
size_t count,loff_t *ppos )
{
return count;
}
在此,我们都只是回传 user 要求读取或写入的字符数目而已。在此,我要说明一下这些参数的意义。filp 是一个 file 结构的 pointer。也就是指我们在 /dev 下所产生的 buf 档案的 file 结构。当我们呼叫 read() 或 write() 时,必须要给一个 buffer 以及要读写的长度。Buf 指的就是这个 buffer,而 count 指的就是长度。至于 ppos 是表示目前这个档案的 offset 在那里。这个值对普通档案是有用的。也就是跟 lseek() 有关系。由于在这里是一个 drvice。所以 ppos 在此并不会用到。有一点要小心的是,上面参数 buf 是一个地址,而且还是一个 user space 的地址,当 kernel 呼叫 buf_read() 时,程序在位于 kernel space。所以你不能直接读写资料到 buf 里。必须先切换 FS 这个 register 才行。
Makefile
P = buf
OBJ = buf.o
INCLUDE = -I/usr/src/linux/include/linux
CFLAGS = -D__KERNEL__ -DMODVERSIONS -DEXPORT_SYMTAB -O $(INCLUDE) \
-include /usr/src/linux/include/linux/modversions.h
CC = gcc
$(P): $(OBJ)
ld -r $(OBJ) -o $(P).o
.c.o:
$(CC) -c $(CFLAGS) $<
clean:
rm -f *.o *~ $(P)
加入上面这个 Makefile,打入 make 之后,就会产生一个 buf.o 的档案。利用 insmod 将 buf.o 载到 kernel 里。相信大家应该都用过 /dev/zero 这个 device。去读取这个 device,只会得到空的内容。写资料到这个 device 里也只会石沈大海。现在你可以去比较 buf 和 zero 这两个 device。两者的行为应该很类似才是。
第三步,我们在第二步中 implement 一个像 zero 的 device driver。我们现在要经由修改它来使用 wait_queue。首先,我们先加入一个 global variable,write_wq,并把它设为 NULL。
struct wait_queue *write_wq = NULL;
然后,在 buf_read() 里,我们要改写成这个样子。
static ssize_t buf_read( struct file *filp,char *buf,size_t count,
loff_t *ppos )
{
int num,nRead;
nRead = 0;
while ( ( wp == rp ) && !flag ) { /* buffer is empty */
return 0;
}
repeate_reading:
if ( rp < wp ) {
num = min( count,( int ) ( wp-rp ) );
}
else {
num = min( count,( int ) ( buffer BUF_LEN-rp ) );
}
copy_to_user( buf,rp,num );
rp = num;
count -= num;
nRead = num;
if ( rp == ( buffer BUF_LEN ) )
rp = buffer;
if ( ( rp != wp ) && ( count > 0 ) )
goto repeate_reading;
flag = 0;
wake_up_interruptible( &write_wq );
return nRead;
}
在前头我有提到,buf 的地址是属于 user space 的。在 kernel space 中,你不能像普通写到 buffer 里一样直接将资料写到 buf 里,或直接从 buf 里读资料。Linux 里使用 FS 这个 register 来当作 kernel space 和 user space 的切换。所以,如果你想手动的话,可以这样做:
mm_segment_t fs;
fs = get_fs();
set_fs( USER_DS );
write_data_to_buf( buf );
set_fs( fs );
也就是先切换到 user space,再写资料到 buf 里。之后记得要切换回来 kernel space。这种自己动手的方法比较麻烦,所以 Linux 提供了几个 function,可以让我们直接在不同的 space 之间做资料的搬移。诚如各位所见,copy_to_user() 就是其中一个。
copy_to_user( to,from,n );
copy_from_user( to,from,n );
顾名思义,copy_to_user() 就是将资料 copy 到 user space 的 buffer 里,也就是从 to 写到 from,n 为要 copy 的 byte 数。相同的,copy_from_user() 就是将资料从 user space 的 from copy 到位于 kernel 的 to 里,长度是 n bytes。在以前的 kernel 里,这两个 function 的前身是 memcpy_tofs() 和 memcpy_fromfs(),不知道为什么到了 kernel 2.2.1之后,名字就被改掉了。至于它们的程序代码有没有更改就不太清楚了。至于到那一版才改的。我没有仔细去查,只知道在 2.0.36 时还没改,到了 2.2.1 就改了。这两个 function 是 macro,都定义在 里。要使用前记得先 include 进来。
相信 buf_read() 的程序代码应当不难了解才对。不知道各位有没有看到,在buf_read() 的后面有一行的程序,就是
wake_up_interruptible( &write_wq );
write_wq 是我们用来放那些想要写资料到 buffer,但 buffer 已满的 process。这一行的程序会将挂在此 queue 上的 process 叫醒。当 queue 是空的时,也就是当 write_wq 为 NULL 时,wake_up_interruptible() 并不会造成任何的错误。接下来,我们来看看更改后的 buf_write()。
static ssize_t buf_write( struct file *filp,const char *buf,size_t count,loff_t *ppos )
{
int num,nWrite;
nWrite = 0;
while ( ( wp == rp ) && flag ) {
interruptible_sleep_on( &write_wq );
}
repeate_writing:
if ( rp > wp ) {
num = min( count,( int ) ( rp - wp ) );
}
else {
num = min( count,( int ) ( buffer BUF_LEN - wp ) );
}
copy_from_user( wp,buf,num );
wp = num;
count -= num;
nWrite = num;
if ( wp == ( buffer BUF_LEN ) ) {
wp = buffer;
}
if ( ( wp != rp ) && ( count > 0 ) ) {
goto repeate_writing;
}
flag = 1;
return nWrite;
}
我们把 process 丢到 write_wq 的动作放在 buf_write() 里。当 buffer 已满时,就直接将 process 丢到 write_wq 里.
while ( ( wp == rp ) && flag ) {
interruptible_sleep_on( &write_wq );
}
好了。现在程序已经做了一些修改。再重新 make 一次,利用 insmod 将 buf.o 载到 kernel 里就行了。接着,我们就来试验一下是不是真正做到 block IO.
# cd /dev
# ls -l ~/WWW-HOWTO
-rw-r--r-- 1 root root 23910 Apr 14 16:50 /root/WWW-HOWTO
# cat ~/WWW-HOWTO > buf
执行到这里,应该会被 block 住。现在,我们再开一个 shell 出来.
# cd /dev
# cat buf
..。( contents of WWW-HOWTO ) ..。skip ...
此时,WWW-HOWTO 的内容就会出现了。而且之前 block 住的 shell 也已经回来了。最后,试验结束,可以下
# rmmod buf
将 buf 这个 module 从 kernel 中移除。以上跟各位介绍的就是 wait_queue 的使用。希望能对各位有所助益。
我想对某些人来讲,会使用一个东西就够了。然而对某些人来讲,可能也很希望知道这项东西是如何做出来的。至少我就是这种人。在下面,我将为各位介绍 wait_queue 的 implementation。如果对其 implementation 没兴趣,以下这一段就可以略过不用看了。
wait_queue 是定义在 里,我们可以先看看它的数据结构是怎么样:
struct wait_queue {
struct task_struct * task;
struct wait_queue * next;
};
很简单是吧。这个结构里面只有二个字段,一个是 task_struct 的 pointer,另一个则是 wait_queue 的 pointer。很明显的,我们可以看出 wait_queue 其实就是一个 linked list,而且它还是一个 circular linked list。 其中 task_struct 就是用来指呼叫 sleep_on 等 function的 process。在 Linux 里,每一个 process 是由一个 task_struct 来描叙。task_struct 是一个很大的的结构,在此我们不会讨论。Linux 里有一个 global variable,叫 current,它会指到目前正在执行的 process 的 task_struct 结构。这也就是为什么当 process 呼叫 system call,切换到 kernel 时,kernel 会知道是那个 process 呼叫的。
好,我们现在来看看 interruptible_sleep_on() 和 sleep_on() 是如何做的。这两个 function 都是位于 /usr/src/linux/kernel/sched.c 里。
void interruptible_sleep_on(struct wait_queue **p)
{
SLEEP_ON_VAR
current->state = TASK_INTERRUPTIBLE;
SLEEP_ON_HEAD
schedule();
SLEEP_ON_TAIL
}
void sleep_on(struct wait_queue **p)
{
SLEEP_ON_VAR
current->state = TASK_UNINTERRUPTIBLE;
SLEEP_ON_HEAD
schedule();
SLEEP_ON_TAIL
}
各位有没有发现这两个 function 很类似。是的,它们唯一的差别就在于
current->state = ...
这一行而已。之前,我们有说过,interruptible_sleep_on() 可以被 signal 中断,所以,其 current->state 被设为 TASK_INTERRUPTIBLE。而 sleep_on() 没办法被中断,所以 current->state 设为 TASK_UNINTERRUPTIBLE。接下来,我们只看 interruptible_sleep_on() 就好了。毕竟它们两的差异只在那一行而已。
在 sched.c 里,SLEEP_ON_VAR 是一个 macro,其实它只是定义两个区域变量出来而已。
#defineSLEEP_ON_VAR\
unsigned long flags;\
struct wait_queue wait;
刚才我也说过,current 这个变量是指到目前正在执行的 process 的 task_struct 结构。所以 current->state = TASK_INTERRUPTIBLE 会设定在呼叫 interruptible_sleep_on() 的 process 身上。至于 SLEEP_ON_HEAD 做的事,则是将 current 的值放到 SLEEP_ON_VAR 宣告的 wait 变量里,并把 wait 放到 interruptible_sleep_on() 的参数所属的 wait_queue list 中。
#defineSLEEP_ON_HEAD\
wait.task = current;\
write_lock_irqsave(&waitqueue_lock,flags);\
__add_wait_queue(p,&wait);\
write_unlock(&waitqueue_lock);
wait 是在 SLEEP_ON_VAR 中宣告的区域变量。其 task 字段被设成呼叫 interruptible_sleep_on() 的 process。至于 waitqueue_lock 这个变量是一个 spin lock。 waitqueue_lock 是用来确保同一时间只能有一个 writer。但同一时间则可以有好几个 reader。也就是说 waitqueue_lock 是用来保证 critical section 的 mutual exclusive access。
unsigned long flags;
write_lock_irqsave(&waitqueue_lock,flags);
...critical section ...
write_unlock(&waitqueue_lock)
学过 OS 的人应该知道 critical section 的作用是什么,如有需要,请自行参考 OS 参考书。在 critical section 里只做一件事,就是将 wait 这个区域变量放到 p 这个 wait_queue list 中。 p 是 user 在呼叫 interruptible_sleep_on() 时传进来的,它的型别是 struct wait_queue **。在此, critical section 只呼叫 __add_wait_queue()。
extern inline void __add_wait_queue(struct wait_queue ** p,
struct wait_queue * wait)
{
wait->next = *p ? : WAIT_QUEUE_HEAD(p);
*p = wait;
}
__add_wait_queue() 是一个inline function,定义在 中。WAIT_QUEUE_HEAD()是个很有趣的 macro,待会我们再讨论。现在只要知道它会传回这个 wait_queue 的开头就可以了。所以,__add_wait_queue() 的意思就是要把 wait 放到 p 所属的 wait_queue list 的开头。但是,大家还记得吗? 在上面的例子里,一开始我们是把 write_wq 设为 NULL。也就是说 *p 是 NULL。所以,当 *p 是 NULL 时,
wait->next = WAIT_QUEUE_HEAD(p)
是什么意思呢?
所以,现在,我们来看一下 WAIT_QUEUE_HEAD() 是怎么样的一个 macro,它是定义在 里。
#define WAIT_QUEUE_HEAD(x) ((struct wait_queue *)((x)-1))
x 型别是 struct wait_queue **,因为是一个 pointer,所以大小是 4 byte。因此,若 x 为 100 的话,那 ((x)-1) 就变成 96。如下图所示。 WAIT_QUEUE_HEAD(x) 其实会传回 96,而且将其转型为 struct wait_queue*,各位可以看看。原本的 wait_queue* 只配制在 100-104 之间。现在 WAIT_QUEUE_HEAD(x) 却直接传回96,但是 96-100 这块位置根本没有被我们配置起来。更妙的事。由于 x 是一个 wait_queue list 的开头,我们始终不会用到 96-100 这块,我们只会直接使用到 100-104 这块内存。这也算是 wait_queue 一项比较奇怪的 implementation 方式吧。下面有三张图,第一张表示我们宣告了一个 wait_queue* 的变量,地址在 100。另外还有一个 wait_queue 的变量,名叫 wait。第二张图是我们呼叫 interruptible_sleep_on() 之后得到的结果。第三张则是我们又宣告一个 wait_queue,名叫 ano_wait,将 ano_wait 放到 wait_queue list 后的结果就第三张图所显示的。http:/linuxfab.cx/Columns/10/wqq.GIF
在 interruptible_sleep_on() 中,当呼叫完 SLEEP_ON_HEAD 之后,目前的 process 就已经被放到 wait_queue 中了。接下来会直接呼叫 schedule(),这个 function 是用来做 scheduling 用的。current 所指到的 process 会被放到 scheduling queue 中等待被挑出来执行。执行完 schedule() 之后,current 就没办法继续执行了。而当 current 以后被 wake up 时,就会从 schedule() 之后,也就是从 SLEEP_ON_TAIL 开始执行。SLEEP_ON_TAIL 做的事刚好跟 SLEEP_ON_HEAD 相反,它会将此 process 从 wait_queue 中移除。
#defineSLEEP_ON_TAIL\
write_lock_irq(&waitqueue_lock);\
__remove_wait_queue(p,&wait);\
write_unlock_irqrestore(&waitqueue_lock,flags);
跟 SLEEP_ON_HEAD 一样。SLEEP_ON_TAIL 也是利用 spin lock 包住一个 critical section。
extern inline void __remove_wait_queue(struct wait_queue ** p,struct
wait_queue * wait)
{
struct wait_queue * next = wait->next;
struct wait_queue * head = next;
struct wait_queue * tmp;
while ((tmp = head->next) != wait) {
head = tmp;
}
head->next = next;
}
__remove_wait_queue() 是一个 inline function,也是同样定义在 里。是用来将 wait 从 p 这个 wait_queue list 中移除掉。
现在,大家应该已经清楚了 interruptible_sleep_on() 和 sleep_on() 的做法,也应该比较清楚 wait_queue 是如何的做到 block IO。接下来,我们继续看 wake_up_interruptible() 和 wake_up() 是如何 implement 的。wake_up_interruptible() 和 wake_up() 其实是两个 macro,都定义在 里。
#define wake_up(x) __wake_up((x),TASK_UNINTERRUPTIBLE | \
TASK_INTERRUPTIBLE)
#define wake_up_interruptible(x) __wake_up((x),TASK_INTERRUPTIBLE)
从这里可以看出,两个 macro 几乎是一样的,差别只在于传给 __wake_up() 中的一个 flag 有所差异而已。其实,wake_up() 传给 __wake_up() 的是 TASK_UNINTERRUPTIBLE|TASK_INTERRUPTIBLE,意思是说它会将 wait_queue list 中 process->state 是 TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE 的所有 process 叫醒。而 wake_up_interruptible() 则只将 state是 TASK_INTERRUPTIBLE 的叫醒.
void __wake_up(struct wait_queue **q,unsigned int mode)
{
struct wait_queue *next;
read_lock(&waitqueue_lock);
if (q && (next = *q)) {
struct wait_queue *head;
head = WAIT_QUEUE_HEAD(q);
while (next != head) {
struct task_struct *p = next->task;
next = next->next;
if (p->state & mode)
wake_up_process(p);
}
}
read_unlock(&waitqueue_lock);
}
在 wake up 的过程中,我们不需要设定 write lock,但是仍要设定 read lock,这是为了避免有人在我们读取 wait_queue 时去写 wait_queue list 的内容,造成 inconsistent。在这段程序代码中,是去 transverse 整个 list,如果 process 的 state 跟 mode 有吻合,则呼叫 wake_up_process() 将它叫醒。
void wake_up_process(struct task_struct * p)
{
unsigned long flags;
spin_lock_irqsave(&runqueue_lock,flags);
p->state = TASK_RUNNING;
if (!p->next_run) {
add_to_runqueue(p);
reschedule_idle(p);
}
spin_unlock_irqrestore(&runqueue_lock,flags);
}
在此,runqueue_lock 也是一个 spin lock,kernel 依然在此设一个 critical section 以方便更改 run queue。Run queue 是用来放可以执行的 process 用的。在放入 run queue 之前,会先将 process 的 state 设为 TASK_RUNNING。
wait_queue 其实是一个蛮好用的东西。相信只要各位有机会去修改 kernel 的话,都应该有机会用到它才对。希望对大家有点帮助。
作者:jackxiang@向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除
地址:https://jackxiang.com/post/2046/
版权所有。转载时必须以链接形式注明作者和原始出处及本声明!
评论列表