win32 多线程的性能(2)
| 作者:microsoft公司供稿 |
| ruediger r. asche microsoft developer network 技术小组 concurrentexecution 的内部工作 请注意:本节的讨论是非常技术性的,所以假设您理解很多有关 win32 线程 api 的知识。如果您对如何使用 concurrentexecution 类来收集测试数据更加感兴趣,而不是对 concurrentexecution::doforallobjects 是如何被实现的感兴趣,那么您现在就可以跳到下面的“使用 concurrentexecution 来试验线程性能”一节。 让我们从 doserial 开始,因为它很大程度上是一个“不费脑筋的家伙”: bool concurrentexecution::doserial(int inoofobjects,long *objectarray, concurrent_execution_routine pprocessor, concurrent_finishing_routine pterminator) { for (int iloop=0;iloop<inoofobjects;iloop++) { pterminator((lpvoid)objectarray[iloop],(lpvoid)pprocessor((lpvoid)objectarray[iloop])); }; return true; }; 这段代码只是循环遍历该数组,在每一次迭代中调用处理器,然后在处理器和对象本身的结果上调用终结器。干得既干净又漂亮,不是吗? 令人感兴趣的成员函数是 doforallobjects。乍一看,doforallobjects 所要做的也没有什么特别的——请求操作系统创建为每一个计算一个线程,并且确保终结器函数能够被正确地调用。但是,有两个问题使得 doforallobjects 比它的表面现象要复杂:第一,当计算的数目多于可用的线程数时,concurrentexecution 的一个实例所创建的“并发的最大度数”参数可能需要一些附加的记录(bookkeeping)。第二,每一个计算的终结器函数都是在调用 doforallobjects 的线程的上下文中被调用的,而不是在该计算运行所处的线程上下文中被调用的;并且,终结器是在处理器结束之后立刻被调用的。要处理这些问题还是需要很多技巧的。 让我们深入到代码中,看看究竟是怎么样的。该段代码是从文件 thrdlib.cpp 中继承来的,但是为了清除起见,已经被精简了: int concurrentexecution::doforallobjects(int inoofobjects,long *objectarray, concurrent_execution_routine pobjectprocessor, concurrent_finishing_routine pobjectterminated) { int iloop,iendloop; dword ithread; dword iarrayindex; dword dwreturncode; dword icurrentarraylength=0; bool bwefreedsomething; char szbuf[70]; m_icurrentnumberofthreads=inoofobjects; handle *hpnt=(handle *)virtualalloc(null,m_icurrentnumberofthreads*sizeof(handle) ,mem_commit,page_readwrite); for(iloop=0;iloop<m_icurrentnumberofthreads;iloop++) hpnt[iloop] = createthread(null,0,pobjectprocessor,(lpvoid)objectarray[iloop], create_suspended,(lpdword)&ithread); 首先,我们为每一个对象创建单独的线程。因为我们使用 create_suspended 来创建该线程,所以还没有线程被启动。另一种方法是在需要时创建每一个线程。我决定不使用这种替代的策略,因为我发现当在一个同时运行了多个线程的应用程序中调用时, createthread 调用是非常浪费的;这样,同在运行时创建每一个线程相比,在此时创建线程的开销将更加容易接受, for (iloop = 0; iloop < m_icurrentnumberofthreads; iloop++) { handle hnewthread; bwefreedsomething=false; // 如果数组为空,分配一个 slot 和 boogie。 if (!icurrentarraylength) { iarrayindex = 0; icurrentarraylength=1; } else { // 首先,检查我们是否可以重复使用任何的 slot。我们希望在查找一个新的 slot 之前首先// 做这项工作,这样我们就可以立刻调用该就线程的终结器... iarrayindex=waitformultipleobjects(icurrentarraylength, m_hthreadarray,false,0); if (iarrayindex==wait_timeout) // no slot free... { { if (icurrentarraylength >= m_imaxarraysize) { iarrayindex= waitformultipleobjects(icurrentarraylength, m_hthreadarray,false,infinite); bwefreedsomething=true; } else // 我们可以释放某处的一个 slot,现在就这么做... { icurrentarraylength++; iarrayindex=icurrentarraylength-1; }; // else iarrayindex points to a thread that has been nuked }; } else bwefreedsomething = true; }; // 在这里,iarrayindex 包含一个有效的索引以存储新的线程。 hnewthread = hpnt[iloop]; resumethread(hnewthread); if (bwefreedsomething) { getexitcodethread(m_hthreadarray[iarrayindex],&dwreturncode); //错误 closehandle(m_hthreadarray[iarrayindex]); pobjectterminated((void *)m_hobjectarray[iarrayindex],(void *)dwreturncode); }; m_hthreadarray[iarrayindex] = hnewthread; m_hobjectarray[iarrayindex] = objectarray[iloop]; }; // 循环结束 doforallobjects 的核心是 hpnt,它是一个对象数组,这些对象是当 concurrentexecution 对象被构造时分配的。该数组能够容纳最大数目的线程,此最大数目与在构造函数中指定的最大并发度数相对应;因此,该数组中的每一个元素都是一个"slot",并有一个计算居于之中。 关于决定如何填充和释放的 slots 算法如下:该对象数组是从头到尾遍历的,并且对于每一个对象,我们都做如下的事情:如果尚未有 slot 已经被填充,我们使用当前的对象来填充该数组中的第一个 slot,并且继续执行将要处理当前对象的线程。如果至少有一个 slot 被使用,我们使用 waitformultipleobjects 函数来决定是否有正在运行的任何计算已经结束;如果是,我们在该对象上调用终结器,并且为新对象“重用”该 slot。请注意,我们也可以首先填充每一个空闲的 slot,直到没有剩余的 slots 为止,然后开始填充空的 slot。但是,如果我们这样做了,那么空出 slot 的终结器函数将不会被调用,直到所有的 slot 都已经被填充,这样就违反了我们有关当处理器结束一个对象时,终结器立刻被调用的要求。 最后,还有没有空闲 slot 的情况(就是说,当前激活的线程数等于 concurrentexecution 对象所允许的最大并发度数)。在这种情况下,waitformultipleobjects 将被再次调用以使得 doforallobjects 处于“睡眠”状态,直到有一个 slot 空出;只要这种情况一发生,终结器就被在空出 slot 的对象上调用,并且工作于当前对象的线程被继续执行。 终于,所有的计算要么都已经结束,要么将占有对象数组中的 slot。下列的代码将会处理所有剩余的线程: iendloop = icurrentarraylength; for (iloop=iendloop;iloop>0;iloop--) { iarrayindex=waitformultipleobjects(iloop, m_hthreadarray,false,infinite); if (iarrayindex==wait_failed) { getlasterror(); _asm int 3; // 这里要做一些聪明的事... }; getexitcodethread(m_hthreadarray[iarrayindex],&dwreturncode); // 错误? if (!closehandle(m_hthreadarray[iarrayindex])) messagebox(getfocus(),"can't delete thread!","",mb_ok); // 使它更好... pobjectterminated((void *)m_hobjectarray[iarrayindex], (void *)dwreturncode); if (iarrayindex==iloop-1) continue; // 这里很好,没有需要向后填充 m_hthreadarray[iarrayindex]=m_hthreadarray[iloop-1]; m_hobjectarray[iarrayindex]=m_hobjectarray[iloop-1]; }; 最后,清除: if (hpnt) virtualfree(hpnt,m_icurrentnumberofthreads*sizeof(handle), mem_decommit); return icurrentarraylength; }; 使用 concurrentexecution 来试验线程性能 性能测试的范围如下:测试应用程序 threadlibtest.exe 的用户可以指定是否测试基于 cpu 的或基于 i/o 的计算、执行多少个计算、计算的时间有多长、计算是如何排序的(为了测试最糟的情况与随机延迟),以及计算是被并发执行还是串行执行。 为了消除意外的结果,每一个测试可以被执行十次,然后将十次的结果拿来平均,以产生一个更加可信的结果。 通过选择菜单选项 "run entire test set",用户可以请求运行所有测试变量的变形。在测试中使用的计算长度在基础值 10 和 3,500 ms 之间变动(我一会儿将讨论这一问题),计算的数目在 2 和 20 之间变化。如果在运行该测试的计算机上安装了 microsoft excel,threadlibtest.exe 将会把结果转储在一个 microsoft excel 工作表,该工作表位于 c:\temp\values.xls。在任何情况下结果值也将会被保存到一个纯文本文件中,该文件位于 c:\temp\results.fil。请注意,我对于协议文件的位置使用了硬编码的方式,纯粹是懒惰行为;如果您需要在您的计算机上重新生成测试结果,并且需要指定一个不同的位置,那么只需要重新编译生成该工程,改变文件 threadlibtestview.cpp 的开头部分的 textfileloc 和 sheetfileloc 标识符的值即可。 请牢记,运行整个的测试程序将总是以最糟的情况来排序计算(就是说,执行的顺序是串行的,最长的计算将被首先执行,其后跟随着第二长的计算,然后以次类推)。这种方案牺牲了串行执行的灵活性,因为并发执行的响应时间在一个非最糟的方案下也没有改变,而该串行执行的响应时间是有可能提高的。 正如我前面所提到的,在一个实际的方案中,您应该分析每一个计算的时间是否是可以预测的。 使用 concurrentexecution 类来收集性能数据的代码位于 threadlibtestview.cpp 中。示例应用程序本身 (threadlibtest.exe) 是一个真正的单文档界面 (sdi) 的 mfc 应用程序。所有与示例有关的代码都驻留在 view 类的实现 cthreadlibtestview 中,它是从 ceasyoutputview 继承而来的。(有关对该类的讨论,请参考"windows nt security in theory and practice"。)这里并不包含该类中所有的有趣代码,所包含的大部分是其数字统计部分和用户界面处理部分。执行测试中的 "meat" 在 cthreadlibtestview::executetest 中,将执行一个测试运行周期。下面是有关 cthreadlibtestview::executetest 的简略代码: void cthreadlibtestview::executetest() { concurrentexecution *ce; bcpubound=((m_icomptype&ct_iobound)==0); // 全局... ce = new concurrentexecution(25); if (!queryperformancecounter(&m_lioldval)) return; // 获得当前时间。 if (!m_icomptype&ct_iobound) timebeginperiod(1); if (m_icomptype&ct_concurrent) m_ithreadsused=ce->doforallobjects(m_inumberofthreads, (long *)m_inumbers, (concurrent_execution_routine)pprocessor, (concurrent_finishing_routine)pterminator); else ce->doserial(m_inumberofthreads, (long *)m_inumbers, (concurrent_execution_routine)pprocessor, (concurrent_finishing_routine)pterminator); if (!m_icomptype&ct_iobound) timeendperiod(1); delete(ce); < 其他的代码在一个数组中排序结果,以供 excel 处理...> } 该段代码首先创建一个 concurrentexecution 类的对象,然后,取样当前时间,(用于统计计算所消耗的时间和响应时间),并且,根据所请求的是串行执行还是并发执行,分别调用 concurrentexecution 对象 doserial 或 doforallobjects 成员。请注意,对于当前的执行我请求最大并发度数为 25;如果您想要运行有多于 25 个计算的测试程序,那么您应该提高该值,使它大于或等于运行您的测试程序所需要的最大并发数。 让我们看一下处理器和终结器,以得到精确测量的结果: extern "c" { long winapi pprocessor(long iarg) { pthreadblockstruct ptarg=(pthreadblockstruct)iarg; bool bresult=true; int idelay=(ptarg->idelay); if (bcpubound) { int iloopcount; iloopcount=(int)(((float)idelay/1000.0)*ptarg->tboutputtarget->m_ibiasfactor); queryperformancecounter(&ptarg->listart); for (int icounter=0; icounter<iloopcount; icounter++); } else { queryperformancecounter(&ptarg->listart); sleep(ptarg->idelay); }; return bresult; } long winapi pterminator(long iarg, long ireturncode) { pthreadblockstruct ptarg=(pthreadblockstruct)iarg; queryperformancecounter(&ptarg->lifinish); ptarg->iendorder=iendindex++; return(0); } } 处理器模拟一个计算,其长度已经被放到一个与计算有关的数据结构 threadblockstruct 中。threadblockstruct 保持着与计算有关的数据,如其延迟和终止时间(以性能计数“滴答”来衡量),以及反向指针,指向实用化该结构的视图(view)。 通过简单的使计算“睡眠”指定的时间就可以模拟基于i/o的计算。基于 cpu的计算将进入一个空的 for 循环。这里的一些注释是为了帮助理解代码的功能:计算是基于 cpu 的,并且假定其执行时间为指定的毫秒数。在本测试程序的早期版本中,我仅仅是要 for 循环执行足够多的次数以满足指定的延迟的需求,而不考虑数字的实际含义。(根据相关的代码,对于基于i/o的计算该数字实际意味着毫秒,而对于基于cpu的计算,该数字则意味着迭代次数。)但是,为了能够使用绝对时间来比较基于cpu的计算和基于i/o的计算,我决定重写这段代码,这样无论对于基于cpu的计算还是基于i/o的计算,与计算有关的延迟都是以毫秒测量。 我发现对于具有指定的、预先定义时间长度的基于cpu的计算,要编写代码来模拟它并不是一件简单的事情。原因是这样的代码本身不能查询系统时间,因为所引发的调用迟早都会交出 cpu,而这违背了基于 cpu 的计算的要求。试图使用异步多媒体时钟事件同样没有得到满意的效果,原因是 windows nt 下计时器服务的工作方式。设置了一个多媒体计时器的线程实际上被挂起,直到该计时器回调被调用;因此,基于 cpu 的计算突然变成了基于 i/o 的操作了。
本文关键:Win32 多线程的性能(2)
|