浅析State-Thread

栏目: 编程语言 · 发布时间: 5年前

内容简介:State-Thread(以下简称st),是一个由C语言编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析State-Thread,欢迎大家积极留言,和我们共同讨论。在开始这个话题之前,我们先来聊一聊协程。什么是协程?

State-Thread(以下简称st),是一个由 C语言 编写的小巧、简洁却高效的开源协程库。这个库基于单线程运作、不强制占用用户线程,给予了开发者最大程度的轻量级和较低的侵入性。本篇文章中,网易云信音视频研发大神将为大家简要分析State-Thread,欢迎大家积极留言,和我们共同讨论。

在开始这个话题之前,我们先来聊一聊协程。

什么是协程?

协程是一种程序组件。通常我们把协程理解为是一种程序自己实现调度、用于提高运行效率、降低开发复杂度的东西。提高运行效率很好理解,因为在程序层自己完成了部分的调度,降低了对系统调度的依赖,减少了大量的中断和换页操作。而降低了开发复杂度,则是指对于开发者而言,可以使用同步的方式去进行代码开发(不需要考虑异步模型的诸多回调),也不需要考虑多线程模型的线程调度和诸多的临界资源问题。

很多语言都拥有协程,例如 python 或者golang。而对于c/c++而言,通常实现协程的常见方式,通常是依赖于glibc提供的setjump&longjump或者基于汇编语言,当然还有基于语义实现(protothread)。linux上使用协程库的方式,通常也会分为替换函数和更为暴力的替换so来实现。当然而各种方式有各自的优劣。而st选用的汇编语言实现setjump&longjump和要求用户调用st_打头的函数来嵌入程序。所以st具备了跨平台的能力,以及让开发者们更开心的“与允许调用者自行选择切换时机”的能力。

st究竟是如何实现了这一切?

首先我们先看看st的整体工作流程:

在宏观的来看,ST的结构主要分成:

vp_schedule。主要是负责了一个调度的能力。有点类似于 linux 内核当中的schedule()函数。每次当这个函数被调用的时候,都会完成一次线程的切换。

各种Queue。用于保存各种状态下等待被调度协程(st_thread)

Timer。用于记录各种超时和sleep。

poll。用于监听各种io事件,会根据系统能力不同而进行切换(kqueue、epoll、poll、select)。

st_thread。用于保存各种协程的信息。

其中比较重要的是schedule模块和thread模块两者。这两者实现了一个完整的协程切换和调度。属于st的核心。而schedule部分通常是开发者们最需要关心的部分。

接下来我们会深入到代码层,看一下具体在这个过程里做了些什么。

通常对于st而言,所有暴露给用户的除了init函数,就是一系列的st_xxx函数了。那么先看看init函数。

int st_init(void)

{

_st_thread_t *thread;

if (_st_active_count) {

/ Already initialized /

return 0;

}

/ We can ignore return value here /

st_set_eventsys(ST_EVENTSYS_DEFAULT);

if (_st_io_init() < 0)

return -1;

memset(&_st_this_vp, 0, sizeof(_st_vp_t));

ST_INIT_CLIST(&_ST_RUNQ);

ST_INIT_CLIST(&_ST_IOQ);

ST_INIT_CLIST(&_ST_ZOMBIEQ);

if ((*_st_eventsys->init)() < 0)

return -1;

_st_this_vp.pagesize = getpagesize();

_st_this_vp.last_clock = st_utime();

/*

  • Create idle thread

*/

_st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,

NULL, 0, 0);

if (!_st_this_vp.idle_thread)

return -1;

_st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;

_st_active_count--;

_ST_DEL_RUNQ(_st_this_vp.idle_thread);

/*

  • Initialize primordial thread

*/

thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +

(ST_KEYS_MAX sizeof(void )));

if (!thread)

return -1;

thread->private_data = (void **) (thread + 1);

thread->state = _ST_ST_RUNNING;

thread->flags = _ST_FL_PRIMORDIAL;

_ST_SET_CURRENT_THREAD(thread);

_st_active_count++;

return 0;

}

这段函数一共做了3事情,创建了一个idle_thread, 初始化了_ST_RUNQ、_ST_IOQ、

_ST_ZOMBIEQ三个队列,把当前调用者初始化成原始函数(通常st_init会在main里面调用,所以这个原始的thread相当于是主线程)。idle_thread函数,其实就是整个IO和定时器相关的本体函数了。st会在每一次_ST_RUNQ运行完成后,调用idle_thread来获取可读写的io和定时器。这个我们后续再说。

那么,st_xxx一般会分成io类和延迟类(sleep)。两者入口其实是同一个,只不过在io类的会多调用一层。我们这里选择st_send为代表。

int st_sendmsg(_st_netfd_t fd, const struct msghdr msg, int flags,

st_utime_t timeout)

{

int n;

while ((n = sendmsg(fd->osfd, msg, flags)) < 0) {

if (errno == EINTR)

continue;

if (!_IO_NOT_READY_ERROR)

return -1;

/ Wait until the socket becomes writable /

if (st_netfd_poll(fd, POLLOUT, timeout) < 0)

return -1;

}

return n;

}

本质上所有的st函数都是以异步接口+ st_netfd_poll来实现的。在st_netfd_poll以内,会去调用st_poll,而st_poll本质上会调用并且切换线程。

int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)

{

struct pollfd pd;

int n;

pd.fd = fd->osfd;

pd.events = (short) how;

pd.revents = 0;

if ((n = st_poll(&pd, 1, timeout)) < 0)

return -1;

if (n == 0) {

/ Timed out /

errno = ETIME;

return -1;

}

if (pd.revents & POLLNVAL) {

errno = EBADF;

return -1;

}

return 0;

}

int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)

{

struct pollfd *pd;

struct pollfd *epd = pds + npds;

_st_pollq_t pq;

_st_thread_t *me = _ST_CURRENT_THREAD();

int n;

if (me->flags & _ST_FL_INTERRUPT) {

me->flags &= ~_ST_FL_INTERRUPT;

errno = EINTR;

return -1;

}

if ((*_st_eventsys->pollset_add)(pds, npds) < 0)

return -1;

pq.pds = pds;

pq.npds = npds;

pq.thread = me;

pq.on_ioq = 1;

_ST_ADD_IOQ(pq);

if (timeout != ST_UTIME_NO_TIMEOUT)

_ST_ADD_SLEEPQ(me, timeout);

me->state = _ST_ST_IO_WAIT;

_ST_SWITCH_CONTEXT(me);

n = 0;

if (pq.on_ioq) {

/ If we timed out, the pollq might still be on the ioq. Remove it /

_ST_DEL_IOQ(pq);

(*_st_eventsys->pollset_del)(pds, npds);

} else {

/ Count the number of ready descriptors /

for (pd = pds; pd < epd; pd++) {

if (pd->revents)

n++;

}

}

if (me->flags & _ST_FL_INTERRUPT) {

me->flags &= ~_ST_FL_INTERRUPT;

errno = EINTR;

return -1;

}

return n;

}

那么到此为止,st_poll中就出现了我们最关心的调度部分了。

当一个线程进行调度的时候一般都是poll_add(如果是io操作),add_queue, _ST_SWITCH_CONTEXT完成一次调度。根据不同的类型,会add到不同的queue。例如需要超时,则会add到IOQ和SLEEPQ。而_ST_SWITCH_CONTEXT,则是最关键的切换线程操作了。

_ST_SWITCH_CONTEXT其实是一个宏,它的本质是调用了MD_SETJMP和_st_vp_schedule().

define _ST_SWITCH_CONTEXT(_thread) \

ST_BEGIN_MACRO \

ST_SWITCH_OUT_CB(_thread); \

if (!MD_SETJMP((_thread)->context)) { \

_st_vp_schedule(); \

} \

ST_DEBUG_ITERATE_THREADS(); \

ST_SWITCH_IN_CB(_thread); \

ST_END_MACRO

这个函数其实就是一个完成的线程切换了。在st里线程的切换会使用MD_SETJMP->_st_vp_schedule->MD_LONGJMP。MD_SETJMP和MD_LONGJMP其实就是st使用汇编自己写的setjmp和longjmp函数(glibc),效果也是几乎等效的。(因为st本身会做平台适配,所以我们以x86-64的汇编为例)

elif defined(__amd64__) || defined(__x86_64__)

/*

  • Internal __jmp_buf layout

*/

define JB_RBX 0

define JB_RBP 1

define JB_R12 2

define JB_R13 3

define JB_R14 4

define JB_R15 5

define JB_RSP 6

define JB_PC 7

.file "md.S"

.text

/ _st_md_cxt_save(__jmp_buf env) /

.globl _st_md_cxt_save

.type _st_md_cxt_save, @function

.align 16

_st_md_cxt_save:

/*

  • Save registers.

*/

movq %rbx, (JB_RBX*8)(%rdi)

movq %rbp, (JB_RBP*8)(%rdi)

movq %r12, (JB_R12*8)(%rdi)

movq %r13, (JB_R13*8)(%rdi)

movq %r14, (JB_R14*8)(%rdi)

movq %r15, (JB_R15*8)(%rdi)

/ Save SP /

leaq 8(%rsp), %rdx

movq %rdx, (JB_RSP*8)(%rdi)

/ Save PC we are returning to /

movq (%rsp), %rax

movq %rax, (JB_PC*8)(%rdi)

xorq %rax, %rax

ret

.size _st_md_cxt_save, .-_st_md_cxt_save

/ /

/ _st_md_cxt_restore(__jmp_buf env, int val) /

.globl _st_md_cxt_restore

.type _st_md_cxt_restore, @function

.align 16

_st_md_cxt_restore:

/*

  • Restore registers.

*/

movq (JB_RBX*8)(%rdi), %rbx

movq (JB_RBP*8)(%rdi), %rbp

movq (JB_R12*8)(%rdi), %r12

movq (JB_R13*8)(%rdi), %r13

movq (JB_R14*8)(%rdi), %r14

movq (JB_R15*8)(%rdi), %r15

/ Set return value /

test %esi, %esi

mov $01, %eax

cmove %eax, %esi

mov %esi, %eax

movq (JB_PC*8)(%rdi), %rdx

movq (JB_RSP*8)(%rdi), %rsp

/ Jump to saved PC /

jmpq *%rdx

.size _st_md_cxt_restore, .-_st_md_cxt_restore

/ /

MD_SETJMP的时候,会使用汇编把所有寄存器的信息保留下来,而MD_LONGJMP则会把所有的寄存器信息重新加载出来。两者配合使用的时候,可以完成一次函数间的跳转。

那么我们已经看到了MD_SETJMP的调用,MD_LONGJMP调用在哪儿呢?

让我们继续看下去,在最一开始,我们就提及过_st_vp_schedule()这个核心函数。

void _st_vp_schedule(void)

{

_st_thread_t *thread;

if (_ST_RUNQ.next != &_ST_RUNQ) {

/ Pull thread off of the run queue /

thread = _ST_THREAD_PTR(_ST_RUNQ.next);

_ST_DEL_RUNQ(thread);

} else {

/ If there are no threads to run, switch to the idle thread /

thread = _st_this_vp.idle_thread;

}

ST_ASSERT(thread->state == _ST_ST_RUNNABLE);

/ Resume the thread /

thread->state = _ST_ST_RUNNING;

_ST_RESTORE_CONTEXT(thread);

}

这个函数其实非常简单,基本工作原理可以认为是执行以下几步: 1.查看当前RUNQ是否有可以调用的,如果有,则RUNQ pop一个thread。 2. 如果没有,则运行idle_thread。 3. 调用_ST_RESTORE_CONTEXT。

那么_ST_RESTORE_CONTEXT做了什么呢?

define _ST_RESTORE_CONTEXT(_thread) \

ST_BEGIN_MACRO \

_ST_SET_CURRENT_THREAD(_thread); \

MD_LONGJMP((_thread)->context, 1); \

ST_END_MACRO

简单来说,_ST_RESTORE_CONTEXT就是调用了我们之前所没有看到的MD_LONGJMP。

所以,我们可以简单地认为,在携程需要schedule的时候,会先把自身当前的栈通过MD_SETJMP保存起来,当线程被schedule再次调度出来的时候,则会使用MD_SETJMP来还原栈,完成一次协程切换。

然后我们来看看idle_thread做了什么。

虽然这个协程名字叫做idle,但是其实做了很多的事情。

void _st_idle_thread_start(void arg)

{

_st_thread_t *me = _ST_CURRENT_THREAD();

while (_st_active_count > 0) {

/ Idle vp till I/O is ready or the smallest timeout expired /

_ST_VP_IDLE();

/ Check sleep queue for expired threads /

_st_vp_check_clock();

me->state = _ST_ST_RUNNABLE;

_ST_SWITCH_CONTEXT(me);

}

/ No more threads /

exit(0);

/ NOTREACHED /

return NULL;

}

总的来说,idle_thread做了两件事情。1. _ST_VP_IDLE() 2. _st_vp_check_clock()。_st_vp_check_clock很好理解,就是检查定时器是否超时,如果超时了,则设置超时标记之后,放回RUNQ。而_ST_VP_IDLE,其实就是查看io是否已经ready了。例如linux的话,则会调用epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,

_st_epoll_data->evtlist_size, timeout)去查看是否有可响应的io。timeout值会根据当前空闲情况进行变化,通常来说会是一个极小的值。

那么看到这里,整体的线程调度已经全部走完了。(详见前面最一开始的流程图)总体流程总结来说基本上是func() -> st_xxxx() -> AddQ -> MD_SETJMP -> schedule() -> MD_LONG -> func()。

所以对于st而言,所以的调度,是基于用户调用。那么如果用户一直不调用st_xxx()(例如计算密集性服务),st也就无法进行协程切换,那么其他协程也就产生极大的阻塞了。这也是为什么st并不太合适计算密集型的原因(其实单线程框架大多都不合适计算密集型)

想要阅读更多技术干货文章,欢迎关注网易云信博客。

了解网易云信,来自网易核心架构的通信与视频云服务。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

绝对价值

绝对价值

[美] 伊塔马尔·西蒙森 艾曼纽·罗森 / 钱峰 / 中国友谊出版公司 / 2014-7 / 45.00元

绝对价值指的是经用户体验的产品质量,即使用某件产品或者享受某项服务的切实感受。 过去,消费就像是押宝。一件商品好不好,一家餐馆的环境如何,没有亲身体验过消费者无从得知,只能根据营销人员提供的有限信息去猜测。品牌、原产地、价位、广告,这些重要的质量线索左右着消费者的选择。 然而,互联网和新兴科技以一种前所未有的速度改变了商业环境。当消费者可以在购买前查看到交易记录和消费者评价,通过便捷的......一起来看看 《绝对价值》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

在线进制转换器
在线进制转换器

各进制数互转换器

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具