浅析State-Thread

栏目: 编程语言 · 发布时间: 6年前

内容简介:State-Thread(以下简称st),是一个由C语言编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析State-Thread,欢迎大家积极留言,和我们共同讨论。在开始这个话题之前,我们先来聊一聊协程。什么是协程?

State-Thread(以下简称st),是一个由 C语言 编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析State-Thread,欢迎大家积极留言,和我们共同讨论。

在开始这个话题之前,我们先来聊一聊协程。

什么是协程?

协程是一种程序组件。通常我们把协程理解为是一种程序自己实现调度、用于提高运行效率、降低开发复杂度的东西。提高运行效率很好理解,因为在程序层自己完成了部分的调度,降低了对系统调度的依赖,减少了大量的中断和换页操作。而降低了开发复杂度,则是指对于开发者而言,可以使用同步的方式去进行代码开发(不需要考虑异步模型的诸多回调),也不需要考虑多线程模型的线程调度和诸多的临界资源问题。

很多语言都拥有协程,例如 python 或者golang。而对于c/c++而言,通常实现协程的常见方式,通常是依赖于glibc提供的setjump&longjump或者基于汇编语言,当然还有基于语义实现(protothread)。linux上使用协程库的方式,通常也会分为替换函数和更为暴力的替换so来实现。当然而各种方式有各自的优劣。而st选用的汇编语言实现setjump&longjump和要求用户调用st_打头的函数来嵌入程序。所以st具备了跨平台的能力,以及让开发者们更开心的“与允许调用者自行选择切换时机”的能力。

st究竟是如何实现了这一切?

首先我们先看看st的整体工作流程:

在宏观的来看,ST的结构主要分成:

vp_schedule。主要是负责了一个调度的能力。有点类似于 linux 内核当中的schedule()函数。每次当这个函数被调用的时候,都会完成一次线程的切换。

各种Queue。用于保存各种状态下等待被调度协程(st_thread)

Timer。用于记录各种超时和sleep。

poll。用于监听各种io事件,会根据系统能力不同而进行切换(kqueue、epoll、poll、select)。

st_thread。用于保存各种协程的信息。

其中比较重要的是schedule模块和thread模块两者。这两者实现了一个完整的协程切换和调度。属于st的核心。而schedule部分通常是开发者们最需要关心的部分。

接下来我们会深入到代码层,看一下具体在这个过程里做了些什么。

通常对于st而言,所有暴露给用户的除了init函数,就是一系列的st_xxx函数了。那么先看看init函数。

int st_init(void)

{

_st_thread_t *thread;

if (_st_active_count) {

/ Already initialized /

return 0;

}

/ We can ignore return value here /

st_set_eventsys(ST_EVENTSYS_DEFAULT);

if (_st_io_init() < 0)

return -1;

memset(&_st_this_vp, 0, sizeof(_st_vp_t));

ST_INIT_CLIST(&_ST_RUNQ);

ST_INIT_CLIST(&_ST_IOQ);

ST_INIT_CLIST(&_ST_ZOMBIEQ);

if ((*_st_eventsys->init)() < 0)

return -1;

_st_this_vp.pagesize = getpagesize();

_st_this_vp.last_clock = st_utime();

/*

  • Create idle thread

*/

_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,

NULL, 0, 0);

if (!_st_this_vp.idle_thread)

return -1;

_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;

_st_active_count--;

_ST_DEL_RUNQ(_st_this_vp.idle_thread);

/*

  • Initialize primordial thread

*/

thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +

(ST_KEYS_MAX sizeof(void )));

if (!thread)

return -1;

thread->private_data = (void **) (thread + 1);

thread->state = _ST_ST_RUNNING;

thread->flags = _ST_FL_PRIMORDIAL;

_ST_SET_CURRENT_THREAD(thread);

_st_active_count++;

return 0;

}

这段函数一共做了3事情,创建了一个idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、

_ST_ZOMBIEQ三个队列,把当前调用者初始化成原始函数(通常st_init会在main里面调用,所以这个原始的thread相当于是主线程)。idle_thread函数,其实就是整个IO和定时器相关的本体函数了。st会在每一次_ST_RUNQ运行完成后,调用idle_thread来获取可读写的io和定时器。这个我们后续再说。

那么,st_xxx一般会分成io类和延迟类(sleep)。两者入口其实是同一个,只不过在io类的会多调用一层。我们这里选择st_send为代表。

int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags,

st_utime_t timeout)

{

int n;

while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {

if (errno == EINTR)

continue;

if (!_IO_NOT_READY_ERROR)

return -1;

/ Wait until the socket becomes writable /

if (st_netfd_poll(fd, POLLOUT, timeout) < 0)

return -1;

}

return n;

}

本质上所有的st函数都是以异步接口+ st_netfd_poll来实现的。在st_netfd_poll以内,会去调用st_poll,而st_poll本质上会调用并且切换线程。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)

{

struct pollfd pd;

int n;

pd.fd = fd->osfd;

pd.events = (short) how;

pd.revents = 0;

if ((n = st_poll(&pd, 1, timeout)) < 0)

return -1;

if (n == 0) {

/ Timed out /

errno = ETIME;

return -1;

}

if (pd.revents & POLLNVAL) {

errno = EBADF;

return -1;

}

return 0;

}

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)

{

struct pollfd *pd;

struct pollfd *epd = pds + npds;

_st_pollq_t pq;

_st_thread_t *me = _ST_CURRENT_THREAD();

int n;

if (me->flags & _ST_FL_INTERRUPT) {

me->flags &= ~_ST_FL_INTERRUPT;

errno = EINTR;

return -1;

}

if ((*_st_eventsys->pollset_add)(pds, npds) < 0)

return -1;

pq.pds = pds;

pq.npds = npds;

pq.thread = me;

pq.on_ioq = 1;

_ST_ADD_IOQ(pq);

if (timeout != ST_UTIME_NO_TIMEOUT)

_ST_ADD_SLEEPQ(me, timeout);

me->state = _ST_ST_IO_WAIT;

_ST_SWITCH_CONTEXT(me);

n = 0;

if (pq.on_ioq) {

/ If we timed out, the pollq might still be on the ioq. Remove it /

_ST_DEL_IOQ(pq);

(*_st_eventsys->pollset_del)(pds, npds);

} else {

/ Count the number of ready descriptors /

for (pd = pds; pd < epd; pd++) {

if (pd->revents)

n++;

}

}

if (me->flags & _ST_FL_INTERRUPT) {

me->flags &= ~_ST_FL_INTERRUPT;

errno = EINTR;

return -1;

}

return n;

}

那么到此为止,st_poll中就出现了我们最关心的调度部分了。

当一个线程进行调度的时候一般都是poll_add(如果是io操作),add_queue, _ST_SWITCH_CONTEXT完成一次调度。根据不同的类型,会add到不同的queue。例如需要超时,则会add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,则是最关键的切换线程操作了。

_ST_SWITCH_CONTEXT其实是一个宏,它的本质是调用了MD_SETJMP和_st_vp_schedule().

define _ST_SWITCH_CONTEXT(_thread) \

ST_BEGIN_MACRO \

ST_SWITCH_OUT_CB(_thread); \

if (!MD_SETJMP((_thread)->context)) { \

_st_vp_schedule(); \

} \

ST_DEBUG_ITERATE_THREADS(); \

ST_SWITCH_IN_CB(_thread); \

ST_END_MACRO

这个函数其实就是一个完成的线程切换了。在st里线程的切换会使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其实就是st使用汇编自己写的setjmp和longjmp函数(glibc),效果也是几乎等效的。(因为st本身会做平台适配,所以我们以x86-64的汇编为例)

elif defined(__amd64__) || defined(__x86_64__)

/*

  • Internal __jmp_buf layout

*/

define JB_RBX 0

define JB_RBP 1

define JB_R12 2

define JB_R13 3

define JB_R14 4

define JB_R15 5

define JB_RSP 6

define JB_PC 7

.file "md.S"

.text

/ _st_md_cxt_save(__jmp_buf env) /

.globl _st_md_cxt_save

.type _st_md_cxt_save, @function

.align 16

_st_md_cxt_save:

/*

  • Save registers.

*/

movq %rbx, (JB_RBX*8)(%rdi)

movq %rbp, (JB_RBP*8)(%rdi)

movq %r12, (JB_R12*8)(%rdi)

movq %r13, (JB_R13*8)(%rdi)

movq %r14, (JB_R14*8)(%rdi)

movq %r15, (JB_R15*8)(%rdi)

/ Save SP /

leaq 8(%rsp), %rdx

movq %rdx, (JB_RSP*8)(%rdi)

/ Save PC we are returning to /

movq (%rsp), %rax

movq %rax, (JB_PC*8)(%rdi)

xorq %rax, %rax

ret

.size _st_md_cxt_save, .-_st_md_cxt_save

/ /

/ _st_md_cxt_restore(__jmp_buf env, int val) /

.globl _st_md_cxt_restore

.type _st_md_cxt_restore, @function

.align 16

_st_md_cxt_restore:

/*

  • Restore registers.

*/

movq (JB_RBX*8)(%rdi), %rbx

movq (JB_RBP*8)(%rdi), %rbp

movq (JB_R12*8)(%rdi), %r12

movq (JB_R13*8)(%rdi), %r13

movq (JB_R14*8)(%rdi), %r14

movq (JB_R15*8)(%rdi), %r15

/ Set return value /

test %esi, %esi

mov $01, %eax

cmove %eax, %esi

mov %esi, %eax

movq (JB_PC*8)(%rdi), %rdx

movq (JB_RSP*8)(%rdi), %rsp

/ Jump to saved PC /

jmpq *%rdx

.size _st_md_cxt_restore, .-_st_md_cxt_restore

/ /

MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成一次函数间的跳转。

那么我们已经看到了MD_SETJMP的调用,MD_LONGJMP调用在哪儿呢?

让我们继续看下去,在最一开始,我们就提及过_st_vp_schedule()这个核心函数。

void _st_vp_schedule(void)

{

_st_thread_t *thread;

if (_ST_RUNQ.next != &_ST_RUNQ) {

/ Pull thread off of the run queue /

thread = _ST_THREAD_PTR(_ST_RUNQ.next);

_ST_DEL_RUNQ(thread);

} else {

/ If there are no threads to run, switch to the idle thread /

thread = _st_this_vp.idle_thread;

}

ST_ASSERT(thread->state == _ST_ST_RUNNABLE);

/ Resume the thread /

thread->state = _ST_ST_RUNNING;

_ST_RESTORE_CONTEXT(thread);

}

这个函数其实非常简单,基本工作原理可以认为是执行以下几步: 1.查看当前RUNQ是否有可以调用的,如果有,则RUNQ pop一个thread。 2. 如果没有,则运行idle_thread。 3. 调用_ST_RESTORE_CONTEXT。

那么_ST_RESTORE_CONTEXT做了什么呢?

define _ST_RESTORE_CONTEXT(_thread) \

ST_BEGIN_MACRO \

_ST_SET_CURRENT_THREAD(_thread); \

MD_LONGJMP((_thread)->context, 1); \

ST_END_MACRO

简单来说,_ST_RESTORE_CONTEXT就是调用了我们之前所没有看到的MD_LONGJMP。

所以,我们可以简单地认为,在携程需要schedule的时候,会先把自身当前的栈通过MD_SETJMP保存起来,当线程被schedule再次调度出来的时候,则会使用MD_SETJMP来还原栈,完成一次协程切换。

然后我们来看看idle_thread做了什么。

虽然这个协程名字叫做idle,但是其实做了很多的事情。

void _st_idle_thread_start(void arg)

{

_st_thread_t *me = _ST_CURRENT_THREAD();

while (_st_active_count > 0) {

/ Idle vp till I/O is ready or the smallest timeout expired /

_ST_VP_IDLE();

/ Check sleep queue for expired threads /

_st_vp_check_clock();

me->state = _ST_ST_RUNNABLE;

_ST_SWITCH_CONTEXT(me);

}

/ No more threads /

exit(0);

/ NOTREACHED /

return NULL;

}

总的来说,idle_thread做了两件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是检查定时器是否超时,如果超时了,则设置超时标记之后,放回RUNQ。而_ST_VP_IDLE,其实就是查看io是否已经ready了。例如linux的话,则会调用epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,

_st_epoll_data->evtlist_size, timeout)去查看是否有可响应的io。timeout值会根据当前空闲情况进行变化,通常来说会是一个极小的值。

那么看到这里,整体的线程调度已经全部走完了。(详见前面最一开始的流程图)总体流程总结来说基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。

所以对于st而言,所以的调度,是基于用户调用。那么如果用户一直不调用st_xxx()(例如计算密集性服务),st也就无法进行协程切换,那么其他协程也就产生极大的阻塞了。这也是为什么st并不太合适计算密集型的原因(其实单线程框架大多都不合适计算密集型)

想要阅读更多技术干货文章,欢迎关注网易云信博客。

了解网易云信,来自网易核心架构的通信与视频云服务。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Algorithms to Live By

Algorithms to Live By

Brian Christian、Tom Griffiths / Henry Holt and Co. / 2016-4-19 / USD 30.00

A fascinating exploration of how insights from computer algorithms can be applied to our everyday lives, helping to solve common decision-making problems and illuminate the workings of the human mind ......一起来看看 《Algorithms to Live By》 这本书的介绍吧!

XML 在线格式化
XML 在线格式化

在线 XML 格式化压缩工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB CMYK 转换工具
RGB CMYK 转换工具

RGB CMYK 互转工具