异步IO、APC、IO完成端口、线程池与高性能服务器之一 异步IO
背景:轮询 PIO DMA 中断
早期IO设备的速度与CPU相比,还不是太悬殊。CPU定时轮询一遍IO设备,看看有无处理要求,有则加以处理,完成后返回继续工作。至今,软盘驱动器还保留着这种轮询工作方式。
随着CPU性能的迅速提高,这种效率低下的工作方式浪费了大量的CPU时间。因此,中断工作方式开始成为普遍采用的技术。这种技术使得IO设备在需要得到 服务时,能够产生一个硬件中断,迫使CPU放弃目前的处理任务,进入特定的中断服务过程,中断服务完成后,再继续原先的处理。这样一来,IO设备和CPU 可以同时进行处理,从而避免了CPU等待IO完成。
早期数据的传输方式主要是PIO(程控IO)方式。通过对IO地址编程方式的方式来传 输数据。比如串行口,软件每次往串行口上写一个字节数据,串口设备完成传输任务后,将会产生一个中断,然后软件再次重复直到全部数据发送完成。性能更好的 硬件设备提供一个FIFO(先进先出缓冲部件),可以让软件一次传输更多的字节。
显然,使用PIO方式对于高速IO设备来说,还是占用了 太多的CPU时间(因为需要通过CPU编程控制传输)。而DMA(直接内存访问)方式能够极大地减少CPU处理时间。CPU仅需告诉DMA控制器数据块的 起始地址和大小,以后DMA控制器就可以自行在内存和设备之间传输数据,其间CPU可以处理其他任务。数据传输完毕后将会产生一个中断。
同步文件IO和异步文件IO
下面摘抄于MSDN《synchronous file I/O and asynchronous file I/O》。
有两种类型的文件IO同步:同步文件IO和异步文件IO。异步文件IO也就是重叠IO。
在同步文件IO中,线程启动一个IO操作然后就立即进入等待状态,直到IO操作完成后才醒来继续执行。而异步文件IO方式中,线程发送一个IO请求到内核,然后继续处理其他的事情,内核完成IO请求后,将会通知线程IO操作完成了。
如果IO请求需要大量时间执行的话,异步文件IO方式可以显著提高效率,因为在线程等待的这段时间内,CPU将会调度其他线程进行执行,如果没有其他线程 需要执行的话,这段时间将会浪费掉(可能会调度操作系统的零页线程)。如果IO请求操作很快,用异步IO方式反而还低效,还不如用同步IO方式。
同步IO在同一时刻只允许一个IO操作,也就是说对于同一个文件句柄的IO操作是序列化的,即使使用两个线程也不能同时对同一个文件句柄同时发出读写操作。重叠IO允许一个或多个线程同时发出IO请求。
异步IO在请求完成时,通过将文件句柄设为有信号状态来通知应用程序,或者应用程序通过GetOverlappedResult察看IO请求是否完成,也可以通过一个事件对象来通知应用程序。
异步IO、APC、IO完成端口、线程池与高性能服务器之二 APC
Alertable IO(告警IO)提供了更有效的异步通知形式。ReadFileEx / WriteFileEx在发出IO请求的同时,提供一个回调函数(APC过程),当IO请求完成后,一旦线程进入可告警状态,回调函数将会执行。
以下五个函数能够使线程进入告警状态:
SleepEx
WaitForSingleObjectEx
WaitForMultipleObjectsEx
SignalObjectAndWait
MsgWaitForMultipleObjectsEx
线程进入告警状态时,内核将会检查线程的APC队列,如果队列中有APC,将会按FIFO方式依次执行。如果队列为空,线程将会挂起等待事件对象。以后的 某个时刻,一旦APC进入队列,线程将会被唤醒执行APC,同时等待函数返回WAIT_IO_COMPLETION。
QueueUserAPC可以用来人为投递APC,只要目标线程处于告警状态时,APC就能够得到执行。
使用告警IO的主要缺点是发出IO请求的线程也必须是处理结果的线程,如果一个线程退出时还有未完成的IO请求,那么应用程序将永远丢失IO完成通知。然而以后我们将会看到IO完成端口没有这个限制。
下面的代码演示了QueueUserAPC的用法。
/************************************************************************/
/* APC Test. */
/************************************************************************/
DWORD WINAPI WorkThread(PVOID pParam)
{
HANDLE Event = (HANDLE)pParam;
for(;;)
{
DWORD dwRet = WaitForSingleObjectEx(Event, INFINITE, TRUE);
if(dwRet == WAIT_OBJECT_0)
break;
else if(dwRet == WAIT_IO_COMPLETION)
printf("WAIT_IO_COMPLETION\n");
}
return 0;
}
VOID CALLBACK APCProc(DWORD dwParam)
{
printf("%s", (PVOID)dwParam);
}
void TestAPC(BOOL bFast)
{
HANDLE QuitEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
HANDLE hThread = CreateThread(NULL,
0,
WorkThread,
(PVOID)QuitEvent,
0,
NULL);
Sleep(100); // Wait for WorkThread initialized.
for(int i=5; i>0; i--)
{
QueueUserAPC(APCProc, hThread, (DWORD)(PVOID)"APC here\n");
if(!bFast)
Sleep(1000);
}
SetEvent(QuitEvent);
WaitForSingleObject(hThread, INFINITE);
CloseHandle(hThread);
}
异步IO、APC、IO完成端口、线程池与高性能服务器之三 IO完成端口
IO完成端口
下面摘抄于MSDN《I/O Completion Ports》,smallfool翻译,原文请参考CSDN文档中心文章《I/O Completion Ports》, http://dev.csdn.net/Develop/article/29%5C29240.shtm 。
I/O 完成端口是一种机制,通过这个机制,应用程序在启动时会首先创建一个线程池,然后该应用程序使用线程池处理异步I/O请求。这些线程被创建的唯一目的就是 用于处理I/O请求。对于处理大量并发异步I/O请求的应用程序来说,相比于在I/O请求发生时创建线程来说,使用完成端口(s)它就可以做的更快且更有 效率。
CreateIoCompletionPort函数会使一个I/O完成端口与一个或多个文件句柄发生关联。当与一个完成端口相关的文件句柄 上启动的异步I/O操作完成时,一个I/O完成包就会进入到该完成端口的队列中。对于多个文件句柄来说,这种机制可以用来把多文件句柄的同步点放在单个对 象中。(言下之意,如果我们需要对每个句柄文件进行同步,一般而言我们需要多个对象(如:Event来同步),而我们使用IO Complete Port 来实现异步操作,我们可以同多个文件相关联,每当一个文件中的异步操作完成,就会把一个complete package放到队列中,这样我们就可以使用这个来完成所有文件句柄的同步)
调用GetQueuedCompletionStatus函数,某 个线程就会等待一个完成包进入到完成端口的队列中,而不是直接等待异步I/O请求完成。线程(们)就会阻塞于它们的运行在完成端口(按照后进先出队列顺序 的被释放)。这就意味着当一个完成包进入到完成端口的队列中时,系统会释放最近被阻塞在该完成端口的线程。
调用GetQueuedCompletionStatus,线程就会将会与某个指定的完成端口建立联系,一直延续其该线程的存在周期,或被指定了不同的完成端口,或者释放了与完成端口的联系。一个线程只能与最多不超过一个的完成端口发生联系。
完 成端口最重要的特性就是并发量。完成端口的并发量可以在创建该完成端口时指定。该并发量限制了与该完成端口相关联的可运行线程的数目。当与该完成端口相关 联的可运行线程的总数目达到了该并发量,系统就会阻塞任何与该完成端口相关联的后续线程的执行,直到与该完成端口相关联的可运行线程数目下降到小于该并发 量为止。最有效的假想是发生在有完成包在队列中等待,而没有等待被满足,因为此时完成端口达到了其并发量的极限。此时,一个正在运行中的线程调用 GetQueuedCompletionStatus时,它就会立刻从队列中取走该完成包。这样就不存在着环境的切换,因为该处于运行中的线程就会连续不 断地从队列中取走完成包,而其他的线程就不能运行了。
对于并发量最好的挑选值就是您计算机中CPU的数目。如果您的事务处理需要一个漫长的计算时间,一个比较大的并发量可以允许更多线程来运行。虽然完成每个事务处理需要花费更长的时间,但更多的事务可以同时被处理。对于应用程序来说,很容易通过测试并发量来获得最好的效果。
PostQueuedCompletionStatus函数允许应用程序可以针对自定义的专用I/O完成包进行排队,而无需启动一个异步I/O操作。这点对于通知外部事件的工作者线程来说很有用。
在没有更多的引用针对某个完成端口时,需要释放该完成端口。该完成端口句柄以及与该完成端口相关联的所有文件句柄都需要被释放。调用CloseHandle可以释放完成端口的句柄。
下面的代码利用IO完成端口做了一个简单的线程池。
/************************************************************************/
/* Test IOCompletePort. */
/************************************************************************/
DWORD WINAPI IOCPWorkThread(PVOID pParam)
{
HANDLE CompletePort = (HANDLE)pParam;
PVOID UserParam;
WORK_ITEM_PROC UserProc;
LPOVERLAPPED pOverlapped;
for(;;)
{
BOOL bRet = GetQueuedCompletionStatus(
CompletePort,
(LPDWORD)&UserParam,
(LPDWORD)&UserProc,
&pOverlapped,
INFINITE);
_ASSERT(bRet);
if(UserProc == NULL) // Quit signal.
break;
// execute user's proc.
UserProc(UserParam);
}
return 0;
}
void TestIOCompletePort(BOOL bWaitMode, LONG ThreadNum)
{
HANDLE CompletePort;
OVERLAPPED Overlapped = { 0, 0, 0, 0, NULL };
CompletePort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL,
NULL,
0);
// Create threads.
for(int i=0; i<ThreadNum; i++)
{
HANDLE hThread = CreateThread(NULL,
0,
IOCPWorkThread,
CompletePort,
0,
NULL);
CloseHandle(hThread);
}
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(i=0; i<20; i++)
{
PostQueuedCompletionStatus(
CompletePort,
(DWORD)bWaitMode,
(DWORD)UserProc1,
&Overlapped);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
// Destroy all threads.
for(i=0; i<ThreadNum; i++)
{
PostQueuedCompletionStatus(
CompletePort,
NULL,
NULL,
&Overlapped);
}
Sleep(1000); // wait all thread exit.
CloseHandle(CompletePort);
}
异步IO、APC、IO完成端口、线程池与高性能服务器之四 线程池
线程池
下面摘抄于MSDN《Thread Pooling》。
有 许多应用程序创建的线程花费了大量时间在睡眠状态来等待事件的发生。还有一些线程进入睡眠状态后定期被唤醒以轮询工作方式来改变或者更新状态信息。线程池 可以让你更有效地使用线程,它为你的应用程序提供一个由系统管理的工作者线程池。至少会有一个线程来监听放到线程池的所有等待操作,当等待操作完成后,线 程池中将会有一个工作者线程来执行相应的回调函数。
你也可以把没有等待操作的工作项目放到线程池中,用QueueUserWorkItem函数来完成这个工作,把要执行的工作项目函数通过一个参数传递给线程池。工作项目被放到线程池中后,就不能再取消了。
Timer-queue timers和Registered wait operations也使用线程池来实现。他们的回调函数也放在线程池中。你也可以用BindIOCompletionCallback函数来投递一个异 步IO操作,在IO完成端口上,回调函数也是由线程池线程来执行。
当第一次调用QueueUserWorkItem函数或者 BindIOCompletionCallback函数的时候,线程池被自动创建,或者Timer-queue timers或者Registered wait operations放入回调函数的时候,线程池也可以被创建。线程池可以创建的线程数量不限,仅受限于可用的内存,每一个线程使用默认的初始堆栈大小, 运行在默认的优先级上。
线程池中有两种类型的线程:IO线程和非IO线程。IO线程等待在可告警状态,工作项目作为APC放到IO线程中。如果你的工作项目需要线程执行在可警告状态,你应该将它放到IO线程。
非IO工作者线程等待在IO完成端口上,使用非IO线程比IO线程效率更高,也就是说,只要有可能的话,尽量使用非IO线程。IO线程和非IO线程在异步IO操作没有完成之前都不会退出。然而,不要在非IO线程中发出需要很长时间才能完成的异步IO请求。
正 确使用线程池的方法是,工作项目函数以及它将会调用到的所有函数都必须是线程池安全的。安全的函数不应该假设线程是一次性线程的或者是永久线程。一般来 说,应该避免使用线程本地存储和发出需要永久线程的异步IO调用,比如说RegNotifyChangeKeyValue函数。如果需要在永久线程中执行 这样的函数的话,可以给QueueUserWorkItem传递一个选项WT_EXECUTEINPERSISTENTTHREAD。
注意,线程池不能兼容COM的单线程套间(STA)模型。
为了更深入地讲解操作系统实现的线程池的优越性,我们首先尝试着自己实现一个简单的线程池模型。
代码如下:
/************************************************************************/
/* Test Our own thread pool. */
/************************************************************************/
typedef struct _THREAD_POOL
{
HANDLE QuitEvent;
HANDLE WorkItemSemaphore;
LONG WorkItemCount;
LIST_ENTRY WorkItemHeader;
CRITICAL_SECTION WorkItemLock;
LONG ThreadNum;
HANDLE *ThreadsArray;
}THREAD_POOL, *PTHREAD_POOL;
typedef VOID (*WORK_ITEM_PROC)(PVOID Param);
typedef struct _WORK_ITEM
{
LIST_ENTRY List;
WORK_ITEM_PROC UserProc;
PVOID UserParam;
}WORK_ITEM, *PWORK_ITEM;
DWORD WINAPI WorkerThread(PVOID pParam)
{
PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
HANDLE Events[2];
Events[0] = pThreadPool->QuitEvent;
Events[1] = pThreadPool->WorkItemSemaphore;
for(;;)
{
DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);
if(dwRet == WAIT_OBJECT_0)
break;
//
// execute user's proc.
//
else if(dwRet == WAIT_OBJECT_0 +1)
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
EnterCriticalSection(&pThreadPool->WorkItemLock);
_ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
pWorkItem->UserProc(pWorkItem->UserParam);
InterlockedDecrement(&pThreadPool->WorkItemCount);
free(pWorkItem);
}
else
{
_ASSERT(0);
break;
}
}
return 0;
}
BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
{
pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
pThreadPool->WorkItemCount = 0;
InitializeListHead(&pThreadPool->WorkItemHeader);
InitializeCriticalSection(&pThreadPool->WorkItemLock);
pThreadPool->ThreadNum = ThreadNum;
pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);
for(int i=0; i<ThreadNum; i++)
{
pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
}
return TRUE;
}
VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
{
SetEvent(pThreadPool->QuitEvent);
for(int i=0; i<pThreadPool->ThreadNum; i++)
{
WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
CloseHandle(pThreadPool->ThreadsArray[i]);
}
free(pThreadPool->ThreadsArray);
CloseHandle(pThreadPool->QuitEvent);
CloseHandle(pThreadPool->WorkItemSemaphore);
DeleteCriticalSection(&pThreadPool->WorkItemLock);
while(!IsListEmpty(&pThreadPool->WorkItemHeader))
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
free(pWorkItem);
}
}
BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
{
PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
if(pWorkItem == NULL)
return FALSE;
pWorkItem->UserProc = UserProc;
pWorkItem->UserParam = UserParam;
EnterCriticalSection(&pThreadPool->WorkItemLock);
InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
InterlockedIncrement(&pThreadPool->WorkItemCount);
ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);
return TRUE;
}
VOID UserProc1(PVOID dwParam)
{
WorkItem(dwParam);
}
void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
{
THREAD_POOL ThreadPool;
InitializeThreadPool(&ThreadPool, ThreadNum);
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(int i=0; i<20; i++)
{
PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
DestroyThreadPool(&ThreadPool);
}
我们把工作项目放到一个队列中,用一个信号量通知线程池,线程池中任意一个线程取出工作项目来执行,执行完毕之后,线程返回线程池,继续等待新的工作项目。
线程池中线程的数量是固定的,预先创建好的,永久的线程,直到销毁线程池的时候,这些线程才会被销毁。
线程池中线程获得工作项目的机会是均等的,随机的,并没有特别的方式保证哪一个线程具有特殊的优先获得工作项目的机会。
而且,同一时刻可以并发运行的线程数目没有任何限定。事实上,在我们的执行计算任务的演示代码中,所有的线程都并发执行。
下面,我们再来看一下,完成同样的任务,系统提供的线程池是如何运作的。
/************************************************************************/
/* QueueWorkItem Test. */
/************************************************************************/
DWORD BeginTime;
LONG ItemCount;
HANDLE CompleteEvent;
int compute()
{
srand(BeginTime);
for(int i=0; i<20 *1000 * 1000; i++)
rand();
return rand();
}
DWORD WINAPI WorkItem(LPVOID lpParameter)
{
BOOL bWaitMode = (BOOL)lpParameter;
if(bWaitMode)
Sleep(1000);
else
compute();
if(InterlockedDecrement(&ItemCount) == 0)
{
printf("Time total %d second.\n", GetTickCount() - BeginTime);
SetEvent(CompleteEvent);
}
return 0;
}
void TestWorkItem(BOOL bWaitMode, DWORD Flag)
{
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(int i=0; i<20; i++)
{
QueueUserWorkItem(WorkItem, (PVOID)bWaitMode, Flag);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
}
很简单,是吧?我们仅需要关注于我们的回调函数即可。但是与我们的简单模拟来比,系统提供的线程池有着更多的优点。
首先,线程池中线程的数目是动态调整的,其次,线程池利用IO完成端口的特性,它可以限制并发运行的线程数目,默认情况下,将会限制为CPU的数目,这可以减少线程切换。它挑选最近执行过的线程再次投入执行,从而避免了不必要的线程切换。
系统提供的线程池背后的策略,我们下一节继续再谈。
异步IO、APC、IO完成端口、线程池与高性能服务器之五 服务器的性能指标与实现高性能的途径
服务器的性能指标
作为一个网络服务器程序,性能永远是第一位的指标。性能可以这样定义:在给定的硬件条件和时间里,能够处理的任务量。能够最大限度地利用硬件性能的服务器设计才是良好的设计。
设计良好的服务器还应该考虑平均服务,对于每一个客户端,服务器应该给予每个客户端平均的服务,不能让某一个客户端长时间得不到服务而发生“饥饿”的状况。
可伸缩性,也就是说,随着硬件能力的提高,服务器的性能能够随之呈线性增长。
实现高性能的途径
一个实际的服务器的计算是很复杂的,往往是混合了IO计算和CPU计算。IO计算指计算任务中以IO为主的计算模型,比如文件服务器、邮件服务器等,混合 了大量的网络IO和文件IO;CPU计算指计算任务中没有或很少有IO,比如加密/解密,编码/解码,数学计算等等。
在CPU计算中,单 线程和多线程模型效果是相当的。《Win32多线程的性能》中说“在一个单处理器的计算机中,基于 CPU 的任务的并发执行速度不可能比串行执行速度快,但是我们可以看到,在 Windows NT 下线程创建和切换的额外开销非常小;对于非常短的计算,并发执行仅仅比串行执行慢 10%,而随着计算长度的增加,这两个时间就非常接近了。”
可见,对于纯粹的CPU计算来说,如果只有一个CPU,多线程模型是不合适的。考虑一个执行密集的CPU计算的服务,如果有几十个这样的线程并发执行,过于频繁地任务切换导致了不必要的性能损失。
在编程实现上,单线程模型计算模型对于服务器程序设计是很不方便的。因此,对于CPU计算采用线程池工作模型是比较恰当的。 QueueUserWorkItem函数非常适合于将一个CPU计算放入线程池。线程池实现将会努力减少这种不必要的线程切换,而且控制并发线程的数目为 CPU的数目。
我们真正需要关心的是IO计算,一般的网络服务器程序往往伴随着大量的IO计算。提高性能的途径在于要避免等待IO 的结束,造成CPU空闲,要尽量利用硬件能力,让一个或多个IO设备与CPU并发执行。前面介绍的异步IO,APC,IO完成端口都可以达到这个目的。
对于网络服务器来说,如果客户端并发请求数目比较少的话,用简单的多线程模型就可以应付了。如果一个线程因为等待IO操作完成而被挂起,操作系统将会调度 另外一个就绪的线程投入运行,从而形成并发执行。经典的网络服务器逻辑大多采用多线程/多进程方式,在一个客户端发起到服务器的连接时,服务器将会创建一 个线程,让这个新的线程来处理后续事务。这种以一个专门的线程/进程来代表一个客户端对象的编程方法非常直观,易于理解。
对于大型网络服 务器程序来说,这种方式存在着局限性。首先,创建线程/进程和销毁线程/进程的代价非常高昂,尤其是在服务器采用TCP“短连接”方式或UDP方式通讯的 情况下,例如,HTTP协议中,客户端发起一个连接后,发送一个请求,服务器回应了这个请求后,连接也就被关闭了。如果采用经典方式设计HTTP服务器, 那么过于频繁地创建线程/销毁线程对性能造成的影响是很恶劣的。
其次,即使一个协议中采取TCP“长连接”,客户端连上服务器后就一直保 持此连接,经典的设计方式也是有弊病的。如果客户端并发请求量很高,同一时刻有很多客户端等待服务器响应的情况下,将会有过多的线程并发执行,频繁的线程 切换将用掉一部分计算能力。实际上,如果并发线程数目过多的话,往往会过早地耗尽物理内存,绝大部分时间耗费在线程切换上,因为线程切换的同时也将引起内 存调页。最终导致服务器性能急剧下降,
对于一个需要应付同时有大量客户端并发请求的网络服务器来说,线程池是唯一的解决方案。线程池不光能够避免频繁地创建线程和销毁线程,而且能够用数目很少的线程就可以处理大量客户端并发请求。
值得注意的是,对于一个压力不大的网络服务器程序设计,我们并不推荐以上任何技巧。在简单的设计就能够完成任务的情况下,把事情弄得很复杂是很不明智,很愚蠢的行为。
异步IO、APC、IO完成端口、线程池与高性能服务器之二 APC
Alertable IO(告警IO)提供了更有效的异步通知形式。ReadFileEx / WriteFileEx在发出IO请求的同时,提供一个回调函数(APC过程),当IO请求完成后,一旦线程进入可告警状态,回调函数将会执行。
以下五个函数能够使线程进入告警状态:
SleepEx
WaitForSingleObjectEx
WaitForMultipleObjectsEx
SignalObjectAndWait
MsgWaitForMultipleObjectsEx
线程进入告警状态时,内核将会检查线程的APC队列,如果队列中有APC,将会按FIFO方式依次执行。如果队列为空,线程将会挂起等待事件对象。以后的 某个时刻,一旦APC进入队列,线程将会被唤醒执行APC,同时等待函数返回WAIT_IO_COMPLETION。
QueueUserAPC可以用来人为投递APC,只要目标线程处于告警状态时,APC就能够得到执行。
使用告警IO的主要缺点是发出IO请求的线程也必须是处理结果的线程,如果一个线程退出时还有未完成的IO请求,那么应用程序将永远丢失IO完成通知。然而以后我们将会看到IO完成端口没有这个限制。
下面的代码演示了QueueUserAPC的用法。
/************************************************************************/
/* APC Test. */
/************************************************************************/
DWORD WINAPI WorkThread(PVOID pParam)
{
HANDLE Event = (HANDLE)pParam;
for(;;)
{
DWORD dwRet = WaitForSingleObjectEx(Event, INFINITE, TRUE);
if(dwRet == WAIT_OBJECT_0)
break;
else if(dwRet == WAIT_IO_COMPLETION)
printf("WAIT_IO_COMPLETION\n");
}
return 0;
}
VOID CALLBACK APCProc(DWORD dwParam)
{
printf("%s", (PVOID)dwParam);
}
void TestAPC(BOOL bFast)
{
HANDLE QuitEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
HANDLE hThread = CreateThread(NULL,
0,
WorkThread,
(PVOID)QuitEvent,
0,
NULL);
Sleep(100); // Wait for WorkThread initialized.
for(int i=5; i>0; i--)
{
QueueUserAPC(APCProc, hThread, (DWORD)(PVOID)"APC here\n");
if(!bFast)
Sleep(1000);
}
SetEvent(QuitEvent);
WaitForSingleObject(hThread, INFINITE);
CloseHandle(hThread);
}
异步IO、APC、IO完成端口、线程池与高性能服务器之三 IO完成端口
IO完成端口
下面摘抄于MSDN《I/O Completion Ports》,smallfool翻译,原文请参考CSDN文档中心文章《I/O Completion Ports》, http://dev.csdn.net/Develop/article/29%5C29240.shtm 。
I/O 完成端口是一种机制,通过这个机制,应用程序在启动时会首先创建一个线程池,然后该应用程序使用线程池处理异步I/O请求。这些线程被创建的唯一目的就是 用于处理I/O请求。对于处理大量并发异步I/O请求的应用程序来说,相比于在I/O请求发生时创建线程来说,使用完成端口(s)它就可以做的更快且更有 效率。
CreateIoCompletionPort函数会使一个I/O完成端口与一个或多个文件句柄发生关联。当与一个完成端口相关的文件句柄 上启动的异步I/O操作完成时,一个I/O完成包就会进入到该完成端口的队列中。对于多个文件句柄来说,这种机制可以用来把多文件句柄的同步点放在单个对 象中。(言下之意,如果我们需要对每个句柄文件进行同步,一般而言我们需要多个对象(如:Event来同步),而我们使用IO Complete Port 来实现异步操作,我们可以同多个文件相关联,每当一个文件中的异步操作完成,就会把一个complete package放到队列中,这样我们就可以使用这个来完成所有文件句柄的同步)
调用GetQueuedCompletionStatus函数,某 个线程就会等待一个完成包进入到完成端口的队列中,而不是直接等待异步I/O请求完成。线程(们)就会阻塞于它们的运行在完成端口(按照后进先出队列顺序 的被释放)。这就意味着当一个完成包进入到完成端口的队列中时,系统会释放最近被阻塞在该完成端口的线程。
调用GetQueuedCompletionStatus,线程就会将会与某个指定的完成端口建立联系,一直延续其该线程的存在周期,或被指定了不同的完成端口,或者释放了与完成端口的联系。一个线程只能与最多不超过一个的完成端口发生联系。
完 成端口最重要的特性就是并发量。完成端口的并发量可以在创建该完成端口时指定。该并发量限制了与该完成端口相关联的可运行线程的数目。当与该完成端口相关 联的可运行线程的总数目达到了该并发量,系统就会阻塞任何与该完成端口相关联的后续线程的执行,直到与该完成端口相关联的可运行线程数目下降到小于该并发 量为止。最有效的假想是发生在有完成包在队列中等待,而没有等待被满足,因为此时完成端口达到了其并发量的极限。此时,一个正在运行中的线程调用 GetQueuedCompletionStatus时,它就会立刻从队列中取走该完成包。这样就不存在着环境的切换,因为该处于运行中的线程就会连续不 断地从队列中取走完成包,而其他的线程就不能运行了。
对于并发量最好的挑选值就是您计算机中CPU的数目。如果您的事务处理需要一个漫长的计算时间,一个比较大的并发量可以允许更多线程来运行。虽然完成每个事务处理需要花费更长的时间,但更多的事务可以同时被处理。对于应用程序来说,很容易通过测试并发量来获得最好的效果。
PostQueuedCompletionStatus函数允许应用程序可以针对自定义的专用I/O完成包进行排队,而无需启动一个异步I/O操作。这点对于通知外部事件的工作者线程来说很有用。
在没有更多的引用针对某个完成端口时,需要释放该完成端口。该完成端口句柄以及与该完成端口相关联的所有文件句柄都需要被释放。调用CloseHandle可以释放完成端口的句柄。
下面的代码利用IO完成端口做了一个简单的线程池。
/************************************************************************/
/* Test IOCompletePort. */
/************************************************************************/
DWORD WINAPI IOCPWorkThread(PVOID pParam)
{
HANDLE CompletePort = (HANDLE)pParam;
PVOID UserParam;
WORK_ITEM_PROC UserProc;
LPOVERLAPPED pOverlapped;
for(;;)
{
BOOL bRet = GetQueuedCompletionStatus(
CompletePort,
(LPDWORD)&UserParam,
(LPDWORD)&UserProc,
&pOverlapped,
INFINITE);
_ASSERT(bRet);
if(UserProc == NULL) // Quit signal.
break;
// execute user's proc.
UserProc(UserParam);
}
return 0;
}
void TestIOCompletePort(BOOL bWaitMode, LONG ThreadNum)
{
HANDLE CompletePort;
OVERLAPPED Overlapped = {0, 0, 0, 0, NULL};
CompletePort = CreateIoCompletionPort(
INVALID_HANDLE_VALUE,
NULL,
NULL,
0);
// Create threads.
for(int i=0; i<ThreadNum; i++)
{
HANDLE hThread = CreateThread(NULL,
0,
IOCPWorkThread,
CompletePort,
0,
NULL);
CloseHandle(hThread);
}
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(i=0; i<20; i++)
{
PostQueuedCompletionStatus(
CompletePort,
(DWORD)bWaitMode,
(DWORD)UserProc1,
&Overlapped);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
// Destroy all threads.
for(i=0; i<ThreadNum; i++)
{
PostQueuedCompletionStatus(
CompletePort,
NULL,
NULL,
&Overlapped);
}
Sleep(1000); // wait all thread exit.
CloseHandle(CompletePort);
}
(转载)异步IO、APC、IO完成端口、线程池与高性能服务器 (四)
转载: http://www.vchelp.net/
原作者姓名 Fang(fangguicheng@21cn.com)
正文
异步IO、APC、IO完成端口、线程池与高性能服务器之四 线程池
线程池
下面摘抄于MSDN《Thread Pooling》。
有 许多应用程序创建的线程花费了大量时间在睡眠状态来等待事件的发生。还有一些线程进入睡眠状态后定期被唤醒以轮询工作方式来改变或者更新状态信息。线程池 可以让你更有效地使用线程,它为你的应用程序提供一个由系统管理的工作者线程池。至少会有一个线程来监听放到线程池的所有等待操作,当等待操作完成后,线 程池中将会有一个工作者线程来执行相应的回调函数。
你也可以把没有等待操作的工作项目放到线程池中,用QueueUserWorkItem函数来完成这个工作,把要执行的工作项目函数通过一个参数传递给线程池。工作项目被放到线程池中后,就不能再取消了。
Timer-queue timers和Registered wait operations也使用线程池来实现。他们的回调函数也放在线程池中。你也可以用BindIOCompletionCallback函数来投递一个异 步IO操作,在IO完成端口上,回调函数也是由线程池线程来执行。
当第一次调用QueueUserWorkItem函数或者 BindIOCompletionCallback函数的时候,线程池被自动创建,或者Timer-queue timers或者Registered wait operations放入回调函数的时候,线程池也可以被创建。线程池可以创建的线程数量不限,仅受限于可用的内存,每一个线程使用默认的初始堆栈大小, 运行在默认的优先级上。
线程池中有两种类型的线程:IO线程和非IO线程。IO线程等待在可告警状态,工作项目作为APC放到IO线程中。如果你的工作项目需要线程执行在可警告状态,你应该将它放到IO线程。
非IO工作者线程等待在IO完成端口上,使用非IO线程比IO线程效率更高,也就是说,只要有可能的话,尽量使用非IO线程。IO线程和非IO线程在异步IO操作没有完成之前都不会退出。然而,不要在非IO线程中发出需要很长时间才能完成的异步IO请求。
正 确使用线程池的方法是,工作项目函数以及它将会调用到的所有函数都必须是线程池安全的。安全的函数不应该假设线程是一次性线程的或者是永久线程。一般来 说,应该避免使用线程本地存储和发出需要永久线程的异步IO调用,比如说RegNotifyChangeKeyValue函数。如果需要在永久线程中执行 这样的函数的话,可以给QueueUserWorkItem传递一个选项WT_EXECUTEINPERSISTENTTHREAD。
注意,线程池不能兼容COM的单线程套间(STA)模型。
为了更深入地讲解操作系统实现的线程池的优越性,我们首先尝试着自己实现一个简单的线程池模型。
代码如下:
/************************************************************************/
/* Test Our own thread pool. */
/************************************************************************/
typedef struct _THREAD_POOL
{
HANDLE QuitEvent;
HANDLE WorkItemSemaphore;
LONG WorkItemCount;
LIST_ENTRY WorkItemHeader;
CRITICAL_SECTION WorkItemLock;
LONG ThreadNum;
HANDLE *ThreadsArray;
}THREAD_POOL, *PTHREAD_POOL;
typedef VOID (*WORK_ITEM_PROC)(PVOID Param);
typedef struct _WORK_ITEM
{
LIST_ENTRY List;
WORK_ITEM_PROC UserProc;
PVOID UserParam;
}WORK_ITEM, *PWORK_ITEM;
DWORD WINAPI WorkerThread(PVOID pParam)
{
PTHREAD_POOL pThreadPool = (PTHREAD_POOL)pParam;
HANDLE Events[2];
Events[0] = pThreadPool->QuitEvent;
Events[1] = pThreadPool->WorkItemSemaphore;
for(;;)
{
DWORD dwRet = WaitForMultipleObjects(2, Events, FALSE, INFINITE);
if(dwRet == WAIT_OBJECT_0)
break;
//
// execute user's proc.
//
else if(dwRet == WAIT_OBJECT_0 +1)
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
EnterCriticalSection(&pThreadPool->WorkItemLock);
_ASSERT(!IsListEmpty(&pThreadPool->WorkItemHeader));
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
pWorkItem->UserProc(pWorkItem->UserParam);
InterlockedDecrement(&pThreadPool->WorkItemCount);
free(pWorkItem);
}
else
{
_ASSERT(0);
break;
}
}
return 0;
}
BOOL InitializeThreadPool(PTHREAD_POOL pThreadPool, LONG ThreadNum)
{
pThreadPool->QuitEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
pThreadPool->WorkItemSemaphore = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
pThreadPool->WorkItemCount = 0;
InitializeListHead(&pThreadPool->WorkItemHeader);
InitializeCriticalSection(&pThreadPool->WorkItemLock);
pThreadPool->ThreadNum = ThreadNum;
pThreadPool->ThreadsArray = (HANDLE*)malloc(sizeof(HANDLE) * ThreadNum);
for(int i=0; i<ThreadNum; i++)
{
pThreadPool->ThreadsArray[i] = CreateThread(NULL, 0, WorkerThread, pThreadPool, 0, NULL);
}
return TRUE;
}
VOID DestroyThreadPool(PTHREAD_POOL pThreadPool)
{
SetEvent(pThreadPool->QuitEvent);
for(int i=0; i<pThreadPool->ThreadNum; i++)
{
WaitForSingleObject(pThreadPool->ThreadsArray[i], INFINITE);
CloseHandle(pThreadPool->ThreadsArray[i]);
}
free(pThreadPool->ThreadsArray);
CloseHandle(pThreadPool->QuitEvent);
CloseHandle(pThreadPool->WorkItemSemaphore);
DeleteCriticalSection(&pThreadPool->WorkItemLock);
while(!IsListEmpty(&pThreadPool->WorkItemHeader))
{
PWORK_ITEM pWorkItem;
PLIST_ENTRY pList;
pList = RemoveHeadList(&pThreadPool->WorkItemHeader);
pWorkItem = CONTAINING_RECORD(pList, WORK_ITEM, List);
free(pWorkItem);
}
}
BOOL PostWorkItem(PTHREAD_POOL pThreadPool, WORK_ITEM_PROC UserProc, PVOID UserParam)
{
PWORK_ITEM pWorkItem = (PWORK_ITEM)malloc(sizeof(WORK_ITEM));
if(pWorkItem == NULL)
return FALSE;
pWorkItem->UserProc = UserProc;
pWorkItem->UserParam = UserParam;
EnterCriticalSection(&pThreadPool->WorkItemLock);
InsertTailList(&pThreadPool->WorkItemHeader, &pWorkItem->List);
LeaveCriticalSection(&pThreadPool->WorkItemLock);
InterlockedIncrement(&pThreadPool->WorkItemCount);
ReleaseSemaphore(pThreadPool->WorkItemSemaphore, 1, NULL);
return TRUE;
}
VOID UserProc1(PVOID dwParam)
{
WorkItem(dwParam);
}
void TestSimpleThreadPool(BOOL bWaitMode, LONG ThreadNum)
{
THREAD_POOL ThreadPool;
InitializeThreadPool(&ThreadPool, ThreadNum);
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(int i=0; i<20; i++)
{
PostWorkItem(&ThreadPool, UserProc1, (PVOID)bWaitMode);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
DestroyThreadPool(&ThreadPool);
}
我们把工作项目放到一个队列中,用一个信号量通知线程池,线程池中任意一个线程取出工作项目来执行,执行完毕之后,线程返回线程池,继续等待新的工作项目。
线程池中线程的数量是固定的,预先创建好的,永久的线程,直到销毁线程池的时候,这些线程才会被销毁。
线程池中线程获得工作项目的机会是均等的,随机的,并没有特别的方式保证哪一个线程具有特殊的优先获得工作项目的机会。
而且,同一时刻可以并发运行的线程数目没有任何限定。事实上,在我们的执行计算任务的演示代码中,所有的线程都并发执行。
下面,我们再来看一下,完成同样的任务,系统提供的线程池是如何运作的。
/************************************************************************/
/* QueueWorkItem Test. */
/************************************************************************/
DWORD BeginTime;
LONG ItemCount;
HANDLE CompleteEvent;
int compute()
{
srand(BeginTime);
for(int i=0; i<20 *1000 * 1000; i++)
rand();
return rand();
}
DWORD WINAPI WorkItem(LPVOID lpParameter)
{
BOOL bWaitMode = (BOOL)lpParameter;
if(bWaitMode)
Sleep(1000);
else
compute();
if(InterlockedDecrement(&ItemCount) == 0)
{
printf("Time total %d second.\n", GetTickCount() - BeginTime);
SetEvent(CompleteEvent);
}
return 0;
}
void TestWorkItem(BOOL bWaitMode, DWORD Flag)
{
CompleteEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
BeginTime = GetTickCount();
ItemCount = 20;
for(int i=0; i<20; i++)
{
QueueUserWorkItem(WorkItem, (PVOID)bWaitMode, Flag);
}
WaitForSingleObject(CompleteEvent, INFINITE);
CloseHandle(CompleteEvent);
}
很简单,是吧?我们仅需要关注于我们的回调函数即可。但是与我们的简单模拟来比,系统提供的线程池有着更多的优点。
首先,线程池中线程的数目是动态调整的,其次,线程池利用IO完成端口的特性,它可以限制并发运行的线程数目,默认情况下,将会限制为CPU的数目,这可以减少线程切换。它挑选最近执行过的线程再次投入执行,从而避免了不必要的线程切换。
系统提供的线程池背后的策略,我们下一节继续再谈。
(转载)异步IO、APC、IO完成端口、线程池与高性能服务器 (五)
转载: http://www.vchelp.net/
原作者姓名 Fang(fangguicheng@21cn.com)
正文
异步IO、APC、IO完成端口、线程池与高性能服务器之五 服务器的性能指标与实现高性能的途径
服务器的性能指标
作为一个网络服务器程序,性能永远是第一位的指标。性能可以这样定义:在给定的硬件条件和时间里,能够处理的任务量。能够最大限度地利用硬件性能的服务器设计才是良好的设计。
设计良好的服务器还应该考虑平均服务,对于每一个客户端,服务器应该给予每个客户端平均的服务,不能让某一个客户端长时间得不到服务而发生“饥饿”的状况。
可伸缩性,也就是说,随着硬件能力的提高,服务器的性能能够随之呈线性增长。
实现高性能的途径
一个实际的服务器的计算是很复杂的,往往是混合了IO计算和CPU计算。IO计算指计算任务中以IO为主的计算模型,比如文件服务器、邮件服务器等,混合 了大量的网络IO和文件IO;CPU计算指计算任务中没有或很少有IO,比如加密/解密,编码/解码,数学计算等等。
在CPU计算中,单 线程和多线程模型效果是相当的。《Win32多线程的性能》中说“在一个单处理器的计算机中,基于 CPU 的任务的并发执行速度不可能比串行执行速度快,但是我们可以看到,在 Windows NT 下线程创建和切换的额外开销非常小;对于非常短的计算,并发执行仅仅比串行执行慢 10%,而随着计算长度的增加,这两个时间就非常接近了。”
可见,对于纯粹的CPU计算来说,如果只有一个CPU,多线程模型是不合适的。考虑一个执行密集的CPU计算的服务,如果有几十个这样的线程并发执行,过于频繁地任务切换导致了不必要的性能损失。
在编程实现上,单线程模型计算模型对于服务器程序设计是很不方便的。因此,对于CPU计算采用线程池工作模型是比较恰当的。 QueueUserWorkItem函数非常适合于将一个CPU计算放入线程池。线程池实现将会努力减少这种不必要的线程切换,而且控制并发线程的数目为 CPU的数目。
我们真正需要关心的是IO计算,一般的网络服务器程序往往伴随着大量的IO计算。提高性能的途径在于要避免等待IO 的结束,造成CPU空闲,要尽量利用硬件能力,让一个或多个IO设备与CPU并发执行。前面介绍的异步IO,APC,IO完成端口都可以达到这个目的。
对于网络服务器来说,如果客户端并发请求数目比较少的话,用简单的多线程模型就可以应付了。如果一个线程因为等待IO操作完成而被挂起,操作系统将会调度 另外一个就绪的线程投入运行,从而形成并发执行。经典的网络服务器逻辑大多采用多线程/多进程方式,在一个客户端发起到服务器的连接时,服务器将会创建一 个线程,让这个新的线程来处理后续事务。这种以一个专门的线程/进程来代表一个客户端对象的编程方法非常直观,易于理解。
对于大型网络服 务器程序来说,这种方式存在着局限性。首先,创建线程/进程和销毁线程/进程的代价非常高昂,尤其是在服务器采用TCP“短连接”方式或UDP方式通讯的 情况下,例如,HTTP协议中,客户端发起一个连接后,发送一个请求,服务器回应了这个请求后,连接也就被关闭了。如果采用经典方式设计HTTP服务器, 那么过于频繁地创建线程/销毁线程对性能造成的影响是很恶劣的。
其次,即使一个协议中采取TCP“长连接”,客户端连上服务器后就一直保 持此连接,经典的设计方式也是有弊病的。如果客户端并发请求量很高,同一时刻有很多客户端等待服务器响应的情况下,将会有过多的线程并发执行,频繁的线程 切换将用掉一部分计算能力。实际上,如果并发线程数目过多的话,往往会过早地耗尽物理内存,绝大部分时间耗费在线程切换上,因为线程切换的同时也将引起内 存调页。最终导致服务器性能急剧下降,
对于一个需要应付同时有大量客户端并发请求的网络服务器来说,线程池是唯一的解决方案。线程池不光能够避免频繁地创建线程和销毁线程,而且能够用数目很少的线程就可以处理大量客户端并发请求。
值得注意的是,对于一个压力不大的网络服务器程序设计,我们并不推荐以上任何技巧。在简单的设计就能够完成任务的情况下,把事情弄得很复杂是很不明智,很愚蠢的行为。
正文完
socket编程之登峰造极------完成端口
socket编程之登峰造极------完成端口
主 题: socket编程之登峰造极------完成端口 << span="" />网络编程技术>>
“完成端口”模型是迄今为止最为复杂的—种I/O模型。然而。假若—个应用程序同时需要管理为数众多的套接字,那么采用这种模型。往往可以达到最佳的系统性能,然而不幸的是,该模型只适用于以下操作系统(微软的):Windows NT和Windows 2000操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千个套接字的时候、而且希望随着系统内安装的CPU数量的增多、应用程序的性能也可以线性提升,才应考虑采用“完成端口”模型。要记住的一个基本准则是,假如要为Windows NT或windows 2000开发高性能的服务器应用,同时希望为大量套接字I/O请求提供服务(Web服务器便是这方面的典型例子),那么I/O完成端口模型便是最佳选择.
从本质上说,完成端口模型要求我们创建一个Win32完成端口对象,通过指定数量的线程对重叠I/O请求进行管理。以便为已经完成的重叠I/O请求提供服务。要注意的是。所谓“完成端口”,实际是Win32、Windows NT以及windows 2000采用的一种I/O构造机制,除套接字句柄之外,实际上还可接受其他东西。然而,本节只打算讲述如何使用套接字句柄,来发挥完成端口模型的巨大威力。使用这种模型之前,首先要创建一个I/O完成端口对象,用它面向任意数量的套接字句柄。管理多个I/O请求。要做到这—点,需要调用CreateIoCompletionPort函数。该函数定义如下:
HANDLE CreateIoCompletionPort(
HANDLE FileHandle,
HANDLE ExistingCompletionPort,
DWORD CompletionKey,
DWORD NumberOfConcurrentThreads
);
在我们深入探讨其中的各个参数之前,首先要注意意该函数实际用于两个明显有别的目的:
■用于创建—个完成端口对象。
■将一个句柄同完成端口关联到一起。
最开始创建—个完成端口的时候,唯一感兴趣的参数便是NumberOfConcurrentThreads 并发线程的数量);前面三个参数都会被忽略。NumberOfConcurrentThreads 参数的特殊之处在于.它定义了在一个完成端口上,同时允许执行的线程数量。理想情况下我们希望每个处理器各自负责—个线程的运行,为完成端口提供服务,避免过于频繁的线程“场景”切换。若将该参数设为0,说明系统内安装了多少个处理器,便允许同时运行多少个线程!可用下述代码创建一个I/O完成端口:
CompetionPort=CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0)
该语加的作用是返问一个句柄.在为完成端口分配了—个套接字句柄后,用来对那个端
口进行标定(引用)。
1.工作者线程与完成端口
成功创建一个完成端口后,便可开始将套接字句柄与对象关联到一起。但在关联套接字之前、首先必须创建—个或多个“工作者线程”,以便在I/O请求投递给完成端口对象后。为完成端口提供服务。在这个时候,大家或许会觉得奇怪、到底应创建多少个线程。以便为完成端口提供服务呢?这实际正是完成端口模型显得颇为“复杂”的—个方面, 因为服务I/O请求所需的数量取决于应用程序的总体设计情况。在此要记住的—个重点在于,在我们调用CreateIoComletionPort时指定的并发线程数量,与打算创建的工作者线程数量相比,它们代表的并非同—件事情。早些时候,我们曾建议大家用CreateIoCompletionPort函数为每个处理器都指定一个线程(处理器的数量有多少,便指定多少线程)以避免由于频繁的线程“场景”交换活动,从而影响系统的整体性能。CreateIoCompletionPort函数的NumberofConcurrentThreads参数明确指示系统: 在一个完成端口上,一次只允许n个工作者线程运行。假如在完成端门上创建的工作者线程数量超出n个.那么在同一时刻,最多只允许n个线程运行。但实际上,在—段较短的时间内,系统有可能超过这个值。但很快便会把它减少至事先在CreateIoCompletionPort函数中设定的值。那么,为何实际创建的工作者线程数最有时要比CreateIoCompletionPort函数设定的多—些呢?这样做有必要吗?如先前所述。这主要取决于应用程序的总体设计情况,假设我们的工作者线程调用了一个函数,比如Sleep()或者WaitForSingleobject(),但却进入了暂停(锁定或挂起)状态、那么允许另—个线程代替它的位置。换行之,我们希望随时都能执行尽可能多的线程;当然,最大的线程数量是事先在CreateIoCompletonPort调用里设定好的。这样—来。假如事先预料到自己的线程有可能暂时处于停顿状态,那么最好能够创建比CreateIoCompletionPort的NumberofConcurrentThreads参数的值多的线程.以便到时候充分发挥系统的潜力。—旦在完成端口上拥有足够多的工作者线程来为I/O请求提供服务,便可着手将套接字句柄同完成端口关联到一起。这要求我们在—个现有的完成端口上调用CreateIoCompletionPort函数,同时为前三个参数: FileHandle,ExistingCompletionPort和CompletionKey——提供套接字的信息。其中,FileHandle参数指定—个要同完成端口关联在—一起的套接字句柄。
ExistingCompletionPort参数指定的是一个现有的完成端口。CompletionKey(完成键)参数则指定要与某个特定套接字句柄关联在—起的“单句柄数据”,在这个参数中,应用程序可保存与—个套接字对应的任意类型的信息。之所以把它叫作“单句柄数据”,是由于它只对应着与那个套接字句柄关联在—起的数据。可将其作为指向一个数据结构的指针、来保存套接字句柄;在那个结构中,同时包含了套接字的句柄,以及与那个套接字有关的其他信息。就象本章稍后还会讲述的那样,为完成端口提供服务的线程例程可通过这个参数。取得与其套字句柄有关的信息。
根据我们到目前为止学到的东西。首先来构建—个基本的应用程序框架。
程序清单8—9向人家阐述了如何使用完成端口模型。来开发—个回应(或“反射’)服务器应用。
在这个程序中。我们基本上按下述步骤行事:
1) 创建一个完成端口。第四个参数保持为0,指定在完成端口上,每个处理器一次只允许执行一个工作者线程。
2) 判断系统内到底安装了多少个处理器。
3) 创建工作者线程,根据步骤2)得到的处理器信息,在完成端口上,为已完成的I/O请求提供服务。在这个简单的例子中,我们为每个处理器都只创建—个工作者线程。这是出于事先已经预计到,到时候不会有任何线程进入“挂起”状态,造成由于线程数量的不足,而使处理器空闲的局面(没有足够的线程可供执行)。调用CreateThread函数时,必须同时提供—个工作者线程,由线程在创建好执行。本节稍后还会详细讨论线程的职责。
4) 准备好—个监听套接字。在端口5150上监听进入的连接请求。
5) 使用accept函数,接受进入的连接请求。
6) 创建—个数据结构,用于容纳“单句柄数据”。 同时在结构中存入接受的套接字句柄。
7) 调用CreateIoCompletionPort将自accept返回的新套接字句柄向完成端口关联到 一起,通过完成键(CompletionKey)参数,将但句柄数据结构传递给CreateIoCompletionPort。
8) 开始在已接受的连接上进行I/O操作。在此,我们希望通过重叠I/O机制,在新建的套接字上投递一个或多个异步WSARecv或WSASend请求。这些I/O请求完成后,一个工作者线程会为I/O请求提供服务,同时继续处理未来的I/O请求,稍后便会在步骤3)指定的工作者例程中。体验到这一点。
9) 重复步骤5)—8)。直到服务器终止。
程序清单8。9 完成端口的建立
StartWinsock()
//步骤一,创建一个完成端口
CompletionPort=CreateIoCompletionPort(INVALI_HANDLE_VALUE,NULL,0,0);
//步骤二判断有多少个处理器
GetSystemInfo(&SystemInfo);
//步骤三:根据处理器的数量创建工作线程,本例当中,工作线程的数目和处理器数目是相同的
for(i = 0; i < SystemInfo.dwNumberOfProcessers,i++){
HANDLE ThreadHandle;
//创建工作者线程,并把完成端口作为参数传给线程
ThreadHandle=CreateThread(NULL,0,
ServerWorkerThread,CompletionPort,
0, &ThreadID);
//关闭线程句柄(仅仅关闭句柄,并非关闭线程本身)
CloseHandle(ThreadHandle);
}
//步骤四:创建监听套接字
Listen=WSASocket(AF_INET,S0CK_STREAM,0,NULL,
WSA_FLAG_OVERLAPPED);
InternetAddr.sin_famlly=AF_INET;
InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
InternetAddr.sln_port = htons(5150);
bind(Listen,(PSOCKADDR)&InternetAddr,sizeof(InternetAddr));
//准备监听套接字
listen(Listen,5);
while(TRUE){
//步骤五,接入Socket,并和完成端口关联
Accept = WSAAccept(Listen,NULL,NULL,NULL,0);
//步骤六 创建一个perhandle结构,并和端口关联
PerHandleData=(LPPER_HANDLE_DATA)GlobalAlloc(GPTR,sizeof(PER_HANDLE_DATA));
printf("Socket number %d connected\n",Accept);
PerHandleData->Socket=Accept;
//步骤七,接入套接字和完成端口关联
CreateIoCompletionPort((HANDLE)Accept,
CompletionPort,(DWORD)PerHandleData,0);
//步骤八
//开始进行I/O操作,用重叠I/O发送一些WSASend()和WSARecv()
WSARecv(...)
作者:jackxiang@向东博客 专注WEB应用 构架之美 --- 构架之美,在于尽态极妍 | 应用之美,在于药到病除
地址:https://jackxiang.com/post/3879/
版权所有。转载时必须以链接形式注明作者和原始出处及本声明!
评论列表