Loading AI tools
来自维基百科,自由的百科全书
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,对于计算密集型任务,线程数上限一般取CPU逻辑核心数+2,线程数过多会导致额外的线程切换开销。
此条目需要扩展。 (2010年4月8日) |
任务调度以执行线程的常见方法是使用同步队列,称作任务队列。池中的线程等待队列中的任务,并把执行完的任务放入完成队列中。
线程池模式一般分为两种:HS/HA半同步/半异步模式、L/F领导者与跟随者模式。
线程池的伸缩性对性能有较大的影响。
Windows操作系统的API提供了一套线程池的实现接口。[1]可以方便地创建、使用线程池。Windows线程池API被设计为一组协同对象, 其中有些对象表示工作单位、计时器、异步I/O 等等。使用下述用户模式的对象来管理线程池及相关的数据:
SubmitThreadpoolWork
投寄工作对象到任务队列。如果不创建新的工作对象,则不能更改函数和上下文。CreateThreadpoolTimer
)并设定(SetThreadpoolTimer
)定时器对象。当定时器对象到期时,工作线程会执行定时器对象的回调函数。CreateThreadpoolWait
)并设定(SetThreadpoolWait
)等待对象。线程池内部的一个线程调用WaitForMultipleObjects并传入由SetThreadpoolWait函数注册的句柄,不断地组成一个句柄组,同时将Wait*函数的bWaitAll设为FALSE,这样当任何一个句柄被触发,线程池就会被唤醒。因WaitForMultipleObjects不允许将同一个句柄传入多次,因此必须确保不会用SetThreadpoolWait来多次注册同一个句柄,但可以调用DuplicationHandle复制句柄并传给Set*函数。因WaitForMultipleObjects一次最多只能等待64个内核对象,因此线程池实际上为每64个内核对象分配一个线程来等待,所以效率比较高。如果要等待超过64个以上的内核对象,系统会每64个内核对象,就开辟一个线程来等待这些内核对象。当线程池中一个线程调用了传入的回调函数,对应的等待项将进入“不活跃”状态;这意味着如果在同一个内核对象被触发后如果想再次调用这个回调函数,需要调用SetThreadpoolWait再次注册;如果传入的hObject为NULL,将把pWaitItem这个等待项从线程中移除。相关的API函数:[2]
示例程序1如下:
#include <windows.h>
#include <tchar.h>
#include <stdio.h>
//
// Thread pool wait callback function template
//
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WAIT Wait, TP_WAIT_RESULT WaitResult)
{
// Instance, Parameter, Wait, and WaitResult not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Wait); UNREFERENCED_PARAMETER(WaitResult);
// Do something when the wait is over.
_tprintf(_T("MyWaitCallback: wait is over.\n"));
}
//
// Thread pool timer callback function template
//
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_TIMER Timer)
{
// Instance, Parameter, and Timer not used in this example.
UNREFERENCED_PARAMETER(Instance); UNREFERENCED_PARAMETER(Parameter); UNREFERENCED_PARAMETER(Timer);
// Do something when the timer fires.
_tprintf(_T("MyTimerCallback: timer has fired.\n"));
}
//
// This is the thread pool work callback function.
//
VOID CALLBACK MyWorkCallback( PTP_CALLBACK_INSTANCE Instance, PVOID Parameter, PTP_WORK Work)
{
// Instance, Parameter, and Work not used in this example.
UNREFERENCED_PARAMETER(Instance);UNREFERENCED_PARAMETER(Parameter);UNREFERENCED_PARAMETER(Work);
// Do something when the work callback is invoked.
_tprintf(_T("MyWorkCallback: Task performed.\n"));
}
int main(void)
{
PTP_WAIT Wait = NULL;
PTP_WAIT_CALLBACK waitcallback = MyWaitCallback;
HANDLE hEvent = NULL;
UINT i = 0;
UINT rollback = 0;
// Create an auto-reset event and initialized as nonsignaled.
hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
if (NULL == hEvent) {
// Error Handling
return 0;
}
rollback = 1; // CreateEvent succeeded
Wait = CreateThreadpoolWait(waitcallback,
NULL, // 回调函数的输入参数
NULL); // 使用缺省的回调环境
if (NULL == Wait) {
_tprintf(_T("CreateThreadpoolWait failed. LastError: %u\n"),GetLastError());
goto new_wait_cleanup;
}
rollback = 2; // CreateThreadpoolWait succeeded
// must re-register the event with the wait object before signaling it each time to trigger the wait callback
// each time before signaling the event to trigger the wait callback.
for (i = 0; i < 5; i++) {
SetThreadpoolWait(Wait, //线程池等待对象
hEvent, //内核等待对象
NULL); //超时设定
SetEvent(hEvent); //触发内核等待对象
// Delay for the waiter thread to act if necessary.
Sleep(500);
// Block here until the callback function is done executing.
WaitForThreadpoolWaitCallbacks(Wait, FALSE);
}
new_wait_cleanup:
switch (rollback) {
case 2:
// Unregister the wait by setting the event to NULL.
SetThreadpoolWait(Wait, NULL, NULL); //the wait object will cease to queue new callbacks (but callbacks already queued will still occur
// Close the wait.
CloseThreadpoolWait(Wait);
case 1:
// Close the event.
CloseHandle(hEvent);
default:
break;
}
BOOL bRet = FALSE;
PTP_WORK work = NULL;
PTP_TIMER timer = NULL;
PTP_POOL pool = NULL;
PTP_WORK_CALLBACK workcallback = MyWorkCallback;
PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
TP_CALLBACK_ENVIRON CallBackEnviron;
PTP_CLEANUP_GROUP cleanupgroup = NULL;
FILETIME FileDueTime;
ULARGE_INTEGER ulDueTime;
rollback = 0;
InitializeThreadpoolEnvironment(&CallBackEnviron); //不使用缺省的线程池与缺省的回调环境
// Create a custom, dedicated thread pool.
pool = CreateThreadpool(NULL);
if (NULL == pool) {
_tprintf(_T("CreateThreadpool failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 1; // pool creation succeeded
// The thread pool is made persistent simply by setting both the minimum and maximum threads to 1.
SetThreadpoolThreadMaximum(pool, 1);
bRet = SetThreadpoolThreadMinimum(pool, 1);
if (FALSE == bRet) {
_tprintf(_T("SetThreadpoolThreadMinimum failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
// Create a cleanup group for this thread pool.
cleanupgroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupgroup) {
_tprintf(_T("CreateThreadpoolCleanupGroup failed. LastError: %u\n"),GetLastError());
goto main_cleanup;
}
rollback = 2; // Cleanup group creation succeeded
// Associate the callback environment with our thread pool.
SetThreadpoolCallbackPool(&CallBackEnviron, pool);
// Associate the cleanup group with our thread pool.
// Objects created with the same callback environment as the cleanup group become members of the cleanup group.
SetThreadpoolCallbackCleanupGroup(&CallBackEnviron, //回调环境
cleanupgroup, //Cleanup Group
NULL); //Cleanup Group的回调函数,当释放其所包含的对象之前先调用该回调函数
// Create work with the callback environment.
work = CreateThreadpoolWork(workcallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == work) {
_tprintf(_T("CreateThreadpoolWork failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 3; // Creation of work succeeded
// Submit the work to the pool. Because this was a pre-allocated work item (using CreateThreadpoolWork), it is guaranteed to execute.
SubmitThreadpoolWork(work);
// Create a timer with the same callback environment.
timer = CreateThreadpoolTimer(timercallback, //回调函数
NULL, //回调函数的输入参数
&CallBackEnviron); //回调环境
if (NULL == timer) {
_tprintf(_T("CreateThreadpoolTimer failed. LastError: %u\n"), GetLastError());
goto main_cleanup;
}
rollback = 4; // Timer creation succeeded
// Set the timer to fire in one second.
ulDueTime.QuadPart = (ULONGLONG)-(1 * 10 * 1000 * 1000);
FileDueTime.dwHighDateTime = ulDueTime.HighPart;
FileDueTime.dwLowDateTime = ulDueTime.LowPart;
SetThreadpoolTimer(timer, //线程池定时器对象
&FileDueTime, //到期时间
0, //周期时期,为0则表示一次性定时器
0); //操作系统调用回调函数的最大延迟时间
// Delay for the timer to be fired
Sleep(1500);
// Wait for all callbacks to finish.
// CloseThreadpoolCleanupGroupMembers also releases objects that are members of the cleanup group,
// so it is not necessary to call close functions on individual objects after calling CloseThreadpoolCleanupGroupMembers.
CloseThreadpoolCleanupGroupMembers(cleanupgroup, //Cleanup Group
FALSE, //为真则取消还未开始执行的pending的回调函数
NULL); //CleanupGroup回调函数的输入参数
// Already cleaned up the work item with the
// CloseThreadpoolCleanupGroupMembers, so set rollback to 2.
rollback = 2;
goto main_cleanup;
main_cleanup:
// Clean up any individual pieces manually
// Notice the fall-through structure of the switch.
// Clean up in reverse order.
switch (rollback) {
case 4:
case 3:
// Clean up the cleanup group members.
CloseThreadpoolCleanupGroupMembers(cleanupgroup,FALSE, NULL);
case 2:
// Clean up the cleanup group.
CloseThreadpoolCleanupGroup(cleanupgroup);
case 1:
// Clean up the pool.
CloseThreadpool(pool);
default:
break;
}
return 0;
}
关于IO线程池的一个示例:
#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>
#include <iostream>
#include <limits>
void PressEnterToContinue()
{
std::cout << "Press ENTER to continue... " << std::flush;
std::cin.ignore( (std::numeric_limits<std::streamsize>::max)( ), '\n');
}
//////////////////////////////////////////////////////////////////////////
#define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz)
#define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}
#define QMLX_ASSERT(s) if(!(s)){DebugBreak();}
#define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\
(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);
//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 2 //每个线程最大写入次数
#define MAXWRITETHREAD 2 //写入线程的数量
#define OP_READ 0x01 //读操作
#define OP_WRITE 0x02 //写操作
//#pragma pack(show)
//单IO数据
typedef struct __declspec(align(16)) _tagPerIoData {
OVERLAPPED m_ol;
HANDLE m_hFile; //操作的文件句柄
DWORD m_dwOp; //操作类型,OP_READ或OP_WRITE
LPVOID m_pData; //操作的数据
UINT m_nLen; //操作的数据长度
DWORD m_dwWrite; //写入的字节数
DWORD m_dwTimestamp; //起始操作的时间戳
}PER_IO_DATA, *PPER_IO_DATA;
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio);
//写文件的线程
DWORD WINAPI WriteThread(LPVOID lpParam);
//当前操作的文件对象的指针
LARGE_INTEGER g_liFilePointer = { 0 };
//IOCP线程池
PTP_IO g_pThreadpoolIo = NULL;
//////////////////////////////////////////////////////////////////////////
//获取可模块的路径名(路径后含‘\’)
VOID GetAppPath(LPTSTR pszBuffer) {
DWORD dwLen = 0;
if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
return;
for (DWORD i = dwLen; i > 0; i--) {
if ('\\' == pszBuffer[i]) {
pszBuffer[i + 1] = '\0';
break;
}
}
}
int _tmain() {
_tsetlocale(LC_ALL, _T("chs"));
TCHAR pFileName[MAX_PATH] = {};
GetAppPath(pFileName);
StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));
HANDLE ahWThread[MAXWRITETHREAD] = {};
DWORD dwWrited = 0;
//创建文件
HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
if (INVALID_HANDLE_VALUE == hTxtFile) {
_tprintf(_T("CreateFile(%s)失败,错误码:%u\n"), GetLastError());
_tsystem(_T("PAUSE"));
return 0;
}
//初始化线程池回调环境
TP_CALLBACK_ENVIRON poolEnv = {};
InitializeThreadpoolEnvironment(&poolEnv);
//创建IOCP线程池
g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback, hTxtFile, &poolEnv);
//启动IOCP线程池
StartThreadpoolIo(g_pThreadpoolIo);
//写入UNICODE文件的前缀码,以便正确打开
PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = hTxtFile;
pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
QMLX_ASSERT(pIo->m_pData != NULL);
*((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
pIo->m_nLen = sizeof(WORD);
//偏移文件指针
pIo->m_ol.Offset = g_liFilePointer.LowPart;
pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
g_liFilePointer.QuadPart += pIo->m_nLen;
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
//等待IOCP线程池完成操作
WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);
//启动写入线程进行日志写入操作
for (int i = 0; i < MAXWRITETHREAD; i++) {
ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
}
//让主线程等待这些写入线程结束
WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);
for (int i = 0; i < MAXWRITETHREAD; i++) {
CloseHandle(ahWThread[i]);
}
//关闭IOCP线程池
CloseThreadpoolIo(g_pThreadpoolIo);
//关闭日志文件
if (INVALID_HANDLE_VALUE != hTxtFile) {
CloseHandle(hTxtFile);
hTxtFile = INVALID_HANDLE_VALUE;
}
_tsystem(_T("PAUSE"));
return 0;
}
//IOCP线程池回调函数,实际就是完成通知的响应函数
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
if (NO_ERROR != IoResult) {
_tprintf(_T("I/O操作出错,错误码:%u\n"), IoResult);
return;
}
PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
DWORD dwCurTimestamp = GetTickCount();
switch (pIo->m_dwOp)
{
case OP_WRITE://写操作结束
{//写入操作结束
_tprintf(_T("线程[0x%x]得到IO完成通知,完成操作(%s),缓冲(0x%08x)长度(%ubytes),写入时间戳(%u)当前时间戳(%u)时差(%u)\n"),
GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);
QMLX_SAFEFREE(pIo->m_pData);
QMLX_SAFEFREE(pIo);
}
break;
case OP_READ: //读操作结束
break;
default:
break;
}
}
//写文件的线程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
TCHAR pTxtContext[MAX_LOGLEN] = {};
PPER_IO_DATA pIo = NULL;
size_t szLen = 0;
LPTSTR pWriteText = NULL;
StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("这是一条模拟的日志记录,由线程[0x%x]写入\r\n"),
GetCurrentThreadId());
StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);
szLen += 1;
int i = 0;
for (; i < MAXWRITEPERTHREAD; i++) {
pWriteText = (LPTSTR)QMLX_CALLOC(szLen * sizeof(TCHAR));
QMLX_ASSERT(NULL != pWriteText);
StringCchCopy(pWriteText, szLen, pTxtContext);
//为每个操作申请一个“单IO数据”结构体
pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
QMLX_ASSERT(pIo != NULL);
pIo->m_dwOp = OP_WRITE;
pIo->m_hFile = (HANDLE)lpParam;
pIo->m_pData = pWriteText;
pIo->m_nLen = (szLen - 1) * sizeof(TCHAR);
//这里使用原子操作同步文件指针,写入不会相互覆盖
//这个地方体现了lock-free算法的精髓,使用了基本的CAS操作控制文件指针
//比传统的使用关键代码段并等待的方法,这里用的方法要轻巧的多,付出的代价也小
*((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
pIo->m_dwTimestamp = GetTickCount(); //记录时间戳
StartThreadpoolIo(g_pThreadpoolIo);
//写入
WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
&pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
if (ERROR_IO_PENDING != GetLastError()) {
CancelThreadpoolIo(g_pThreadpoolIo);
}
}
return i;
}
命名空间System.Threading中的类ThreadPool提供一个线程池,该线程池可用于执行任务、发送工作项、处理异步 I/O、代表其他线程等待以及处理计时器。[3]
Seamless Wikipedia browsing. On steroids.
Every time you click a link to Wikipedia, Wiktionary or Wikiquote in your browser's search results, it will show the modern Wikiwand interface.
Wikiwand extension is a five stars, simple, with minimum permission required to keep your browsing private, safe and transparent.