协程

用户级线程

线程运行的时候主要依靠寄存器和栈空间,而协程正是由用户自己维护寄存器和栈空间地址

线程调度由操作系统来进行切换,需要系统调用陷入内核和上下文切换,而协程的调度只需要改变寄存器的值

对称协程与非对称协程:

对称协程协程之间可以直接相互调度和切换

非对称协程被调度协程只能将控制权返回给调度协程

非对称协程需要一个协程调度器类似池的概念,比较适合IO密集型;而对称协程适合计算密集型

协程实现三种方式:

ucontex

通过是同ucontex函数族实现协程,struct ucontext中,设置uc_link指向恢复的上下文,uc_stack指向改协程使用的栈

getcontext()初始化,填充当前上下文

makecontext()设置协程执行函数,设置上下文

swapcontext()上下文切换,当前上下文保存到oucp,切换到ucp

setcontext()慎用,直接跳到指定上下文,当前上下文就没有了

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <ucontext.h>
typedef struct ucontext {
struct ucontext *uc_link;
sigset_t uc_sigmask;
stack_t uc_stack;
mcontext_t uc_mcontext;
...
} ucontext_t;

int getcontext(ucontext_t *ucp);
int setcontext(const ucontext_t *ucp);
void makecontext(ucontext_t *ucp, void (*func)(void), int argc, ...);
int swapcontext(ucontext_t *restrict oucp, const ucontext_t *restrict ucp);

setjmp/longjmp

1
2
int setjmp(jmp_buf env);	// 设置将来返回点
longjmp(jmp_buf env, int n) // 返回返回点,setjmp返回n值

汇编

参考libco的swap汇编代码,这里截取64位的实现

这里coxtx_t通过void *regs[ 14 ];保存了十四个值,因为64位,每个指针8个字节共占112字节,rdi、rsi、rdx、rcx为第1~4个参数,swap函数保存当前上下文,切换到其他上下文

void coctx_swap(coctx_t*, coctx_t*) asm("coctx_swap");

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
leaq (%rsp),%rax
movq %rax, 104(%rdi)
movq %rbx, 96(%rdi)
movq %rcx, 88(%rdi)
movq %rdx, 80(%rdi)
movq 0(%rax), %rax
movq %rax, 72(%rdi)
movq %rsi, 64(%rdi)
movq %rdi, 56(%rdi)
movq %rbp, 48(%rdi)
movq %r8, 40(%rdi)
movq %r9, 32(%rdi)
movq %r12, 24(%rdi)
movq %r13, 16(%rdi)
movq %r14, 8(%rdi)
movq %r15, (%rdi)
xorq %rax, %rax

movq 48(%rsi), %rbp
movq 104(%rsi), %rsp
movq (%rsi), %r15
movq 8(%rsi), %r14
movq 16(%rsi), %r13
movq 24(%rsi), %r12
movq 32(%rsi), %r9
movq 40(%rsi), %r8
movq 56(%rsi), %rdi
movq 80(%rsi), %rdx
movq 88(%rsi), %rcx
movq 96(%rsi), %rbx
leaq 8(%rsp), %rsp
pushq 72(%rsi)

movq 64(%rsi), %rsi
ret

基于ucontex方式的协程以及协程调度实现

主线程的主协程默认工作协程,只能通过GetThis()创建主协程,可以看到调度协程和线程主协程是同一个;这里其实是声明了三个全局静态变量来指向当前协程,主协程和调度协程,所以在一个线程中,其实只能保存三个协程上下文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::shared_ptr<Fiber> Fiber::GetThis()
{
if(t_fiber)
{
return t_fiber->shared_from_this();
}

std::shared_ptr<Fiber> main_fiber(new Fiber());
t_thread_fiber = main_fiber;
t_scheduler_fiber = main_fiber.get();

assert(t_fiber == main_fiber.get());
return t_fiber->shared_from_this();
}

调度协程由SetSchedulerFiber设置,在调度器创建时创建,多线程下让主线程也工作的话那么需要一个调度协程来切换到工作协程,因为我们上面主协程只能由GetThis()特殊创建,并且是没有传入cb的,因此需要reset一个new Fiber让其执行run()也就是调度任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Scheduler::Scheduler(size_t threads, bool use_caller, const std::string &name):
m_useCaller(use_caller), m_name(name)
{
assert(threads>0 && Scheduler::GetThis()==nullptr);
SetThis();
Thread::SetName(m_name);
if(use_caller)
{
threads --;
Fiber::GetThis();
m_schedulerFiber.reset(new Fiber(std::bind(&Scheduler::run, this), 0, false));
Fiber::SetSchedulerFiber(m_schedulerFiber.get());

m_rootThread = Thread::GetThreadId();
m_threadIds.push_back(m_rootThread);
}
m_threadCount = threads;
if(debug) std::cout << "Scheduler::Scheduler() success\n";
}

而一个Fiber的创建或重置每次调用makecontext都是传入MainFunc这个函数,我们传入的cb回调函数将会赋值给m_cb(),最终有MainFunc管理,因为在协程环境初始化时makeconetx传入的是MainFunc

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Fiber::MainFunc()
{
std::shared_ptr<Fiber> curr = GetThis();
assert(curr!=nullptr);

curr->m_cb();
curr->m_cb = nullptr;
curr->m_state = TERM;

// 运行完毕 -> 让出执行权
auto raw_ptr = curr.get();
curr.reset();
raw_ptr->yield();
}

现在我们来看run()函数,设置两个线程,一个主线程,一个子线程,并且主线程也进行任务调度

1
2
3
4
5
6
7
8
9
10
11
Fiber(): main id = 0
Fiber(): child id = 1
Scheduler::Scheduler() success
Schedule::run() starts in thread: 12156
Scheduler::start() success
Fiber(): main id = 2
Fiber(): child id = 3
MainFunc start in Fiber id: 3
Scheduler::idle(), sleeping in thread : 12156
idle() : yield() in Fiber id3
Scheduler::idle(), sleeping in thread : 12156

可以看到主线程中,主协程id=0,调度协程id=1,Scheduler创建
在start()中,创建子线程12156并把Scheduler::run传进去,run函数发现不在主线程,调用GetThis()同时是主协程和调度协程id=2,idle协程id=3
于是在fiber 3在MainFunc调用idle,每循环一秒就yield()

什么时候停止?当主线程调用stop()时,stopping()条件满足,idle执行结束,这时候回到MainFunc令Fiber 3的state=TERM

1
2
3
4
5
6
7
8
9
void Scheduler::idle()
{
while(!stopping())
{
if(debug) std::cout << "Scheduler::idle(), sleeping in thread: " << Thread::GetThreadId() << std::endl;
sleep(1);
Fiber::GetThis()->yield();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
...
begin post

Fiber(): child id = 4
Fiber(): child id = 5
Fiber(): child id = 6
idle() : yield() in Fiber id3
MainFunc start in Fiber id: 4
task 0 is under processing in thread: 12156
Fiber::MainFunc() : yield() in Fiber id : 4
~Fiber(): id = 4
MainFunc start in Fiber id: 5
task 1 is under processing in thread: 12156
Fiber::MainFunc() : yield() in Fiber id : 5
~Fiber(): id = 5
...

post again

Fiber(): child id = 7
Fiber(): child id = 8
Fiber(): child id = 9
idle() : yield() in Fiber id3
MainFunc start in Fiber id: 7
...
Schedule::stop() starts in thread: 12155
MainFunc start in Fiber id: 1
Schedule::run() starts in thread: 12155
Fiber(): child id = 10

可以看到这时候主线程调用stop(),主协程切换到调度协程执行Scheduler::run
因为是在主线程里,其实这里不用判断直接GetThis()也行,因为t_fiber主协程已经存在,只会直接返回主协程,idle协程id=10

1
2
3
4
5
6
7
8
MainFunc start in Fiber id: 10
Scheduler::idle(), sleeping in thread : 12155
Fiber::MainFunc() : yield() in Fiber id : 9
~Fiber(): id = 9
Fiber::MainFunc() : yield() in Fiber id : 3
Schedule::run() ends in thread: 12156
~Fiber(): id = 3
~Fiber(): id = 2

可以看到fiber 3在退出while,结束idle(),这时候MainFunc释放资源,run()中break,2和3结束生命周期

1
2
3
4
5
6
7
8
9
10
idle() : yield() in Fiber id10
Fiber::MainFunc() : yield() in Fiber id : 10
Schedule::run() ends in thread: 12155
~Fiber(): id = 10
Fiber::MainFunc() : yield() in Fiber id : 1
m_schedulerFiber ends in thread:12155
Schedule::stop() ends in thread:12155
Scheduler::~Scheduler() success
~Fiber(): id = 1
~Fiber(): id = 0

最后回到主协程结束main函数,可以看出这里的任务遵循着先来先到的原则取的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
void Scheduler::run()
{
int thread_id = Thread::GetThreadId();
if (debug)
std::cout << "Schedule::run() starts in thread: " << thread_id << std::endl;

// set_hook_enable(true);

SetThis();

// 运行在新创建的线程 -> 需要创建主协程
if (thread_id != m_rootThread)
{
Fiber::GetThis();
}

std::shared_ptr<Fiber> idle_fiber = std::make_shared<Fiber>(std::bind(&Scheduler::idle, this));
ScheduleTask task;

while (true)
{
task.reset();
bool tickle_me = false;

{
std::lock_guard<std::mutex> lock(m_mutex);
auto it = m_tasks.begin();
// 1 遍历任务队列
while (it != m_tasks.end())
{
if (it->thread != -1 && it->thread != thread_id)
{
it++;
tickle_me = true;
continue;
}

// 2 取出任务
assert(it->fiber || it->cb);
task = *it;
m_tasks.erase(it);
m_activeThreadCount++;
break;
}
tickle_me = tickle_me || (it != m_tasks.end());
}

if (tickle_me)
{
tickle();
}

// 3 执行任务
if (task.fiber)
{
{
std::lock_guard<std::mutex> lock(task.fiber->m_mutex);
if (task.fiber->getState() != Fiber::TERM)
{
task.fiber->resume();
}
}
m_activeThreadCount--;
task.reset();
}
else if (task.cb)
{
std::shared_ptr<Fiber> cb_fiber = std::make_shared<Fiber>(task.cb);
{
std::lock_guard<std::mutex> lock(cb_fiber->m_mutex);
cb_fiber->resume();
}
m_activeThreadCount--;
task.reset();
}
// 4 无任务 -> 执行空闲协程
else
{
// 系统关闭 -> idle协程将从死循环跳出并结束 -> 此时的idle协程状态为TERM -> 再次进入将跳出循环并退出run()
if (idle_fiber->getState() == Fiber::TERM)
{
if (debug)
std::cout << "Schedule::run() ends in thread: " << thread_id << std::endl;
break;
}
m_idleThreadCount++;
idle_fiber->resume();
m_idleThreadCount--;
}
}
}

这里有个bug就是每次scheduleLock()传入指定的threadid,而task都不由指定线程调用时

就有陷入死循环,而这里的id得调用t.getId()或Thread::GetThreadId()

scheduler->scheduleLock(fiber, Thread::GetThreadId());

[!NOTE]

当前协程调度效果不好,run一直在while(1)按顺序执行task,且每个task由MainFunc执行cb和yield也就说明任务只能从开始到结束,长任务占比时间长,且不断轮询调用idle占cpu;除此之外,倘若任务阻塞,则协程调度也就阻塞了

基于epoll的IO协程调度设计

在IOManager中封装了EventContext

1
2
3
4
5
6
struct EventContext
{
Scheduler *scheduler = nullptr;
std::shared_ptr<Fiber> fiber;
std::function<void()> cb;
};

以及封装FdContext的epoll反应堆

1
2
3
4
5
6
7
8
9
10
11
12
struct FdContext
{
EventContext read;
EventContext write;
int fd = 0;
Event events = NONE;
std::mutex mutex;

EventContext &getEventContext(Event event);
void resetEventContext(EventContext &ctx);
void triggerEvent(Event event);
};

我们还是继续拿test来说,这里还是创建两个线程,默认user_caller的scheduler,这里创建socket连接到baidu的ipv4来当例子,添加写event和读event

1
2
3
4
5
...
IOManager manager(2);
...
manager.addEvent(sock, IOManager::WRITE, &send_func2);
manager.addEvent(sock, IOManager::READ, &recv_func);

一开始还是和之前一样,fd是6,添加了WRITE和READ,

还是一样主线程创建主协程0和调度协程1,开始start(),子线程运行run()创建fiber 2和idle fiber 3

Fiber(): main id = 0
Fiber(): child id = 1
Scheduler::Scheduler() success
Schedule::run() starts in thread: 33052
Fiber(): main id = 2
Fiber(): child id = 3
Scheduler::start() success
addevent fd: 6 event: 4
addevent fd: 6 event: 1
event has been posted

这时候main执行完开始析构IOManager,调用stop,释放资源,开始运行调度协程1开始执行,然后进入run(),idle fiber 4

1
2
3
4
5
IOManager::~IOManager() {
stop();
close(m_epfd);
...
}

Schedule::stop() starts in thread: 33051
MainFunc start in Fiber id: 1
Schedule::run() starts in thread: 33051
Fiber(): child id = 4
MainFunc start in Fiber id: 4

这里idle()才开始执行,在IOManager中重写了idle(),进行while(true)循环epoll_wait,这里因为WRITE event所以触发triggerEvent(),scheduleLock一个任务,

这里的策略是每次触发后就把当前的事件回调重置了,下次不会再触发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void IOManager::FdContext::triggerEvent(IOManager::Event event) {
assert(events & event);
events = (Event)(events & ~event);
EventContext& ctx = getEventContext(event);
if (ctx.cb)
{
ctx.scheduler->scheduleLock(&ctx.cb);
}
else
{
ctx.scheduler->scheduleLock(&ctx.fiber);
}
resetEventContext(ctx);
return;
}

IOManager::idle(), run in thread: 33051
MainFunc start in Fiber id: 3
IOManager::idle(), run in thread: 33052
triggerEvent()

这里重写了tickle()函数,什么时候会发送呢?即当idle运行时因为加入了epoll,所以设置tickle,定时向m_tickleFds[0]发送字符唤醒epoll_wait,以便能yield出来,回到run函数执行task

1
2
3
4
5
void IOManager::tickle() {
if (!hasIdleThreads())
return;
int rt = write(m_tickleFds[1], "T", 1);
}

其实这类的tickle是在run()发生的,执行条件的遇到执行其他线程执行的task,或取到task时,先执行tickle()

fiber 5执行WRITE event结束,第二个triggerEvent是加入READ event,fiber 6执行READ event读到www.baidu.com的网页

IOManaer::tickle()
Fiber(): child id = 5
MainFunc start in Fiber id: 5
Fiber::MainFunc() : yield() in Fiber id : 5
~Fiber(): id = 5
IOManager::idle(), run in thread: 33052
IOManager::idle: receive tickle data
IOManager::idle(), run in thread: 33052
triggerEvent()
IOManaer::tickle()
Fiber(): child id = 6
MainFunc start in Fiber id: 6
HTTP/1.0 200 OK
Accept-Ranges: bytes
Cache-Control: no-cache
Content-Length: 29524
Content-Type: text/html
……
ack

所有事件执行完后stop会向所有阻塞在epoll_wait的协程发送tickle,结束idle,退出run,回收资源

Fiber::MainFunc() : yield() in Fiber id : 6
Fiber(): id = 6
IOManager::idle(), run in thread: 33051
name = IOManageridle exits in thread: 33051
Fiber::MainFunc() : yield() in Fiber id : 4
Schedule::run() ends in thread: 33051
~Fiber(): id = 4
Fiber::MainFunc() : yield() in Fiber id : 1
m_schedulerFiber ends in thread:33051
IOManager::idle: receive tickle data
IOManager::idle(), run in thread: 33052
name = IOManageridle exits in thread: 33052
Fiber::MainFunc() : yield() in Fiber id : 3
Schedule::run() ends in thread: 33052
~Fiber(): id = 3
~Fiber(): id = 2
Schedule::stop() ends in thread:33051
Scheduler::
Scheduler() success
~Fiber(): id = 1
~Fiber(): id = 0

[!NOTE]

相比于之前的协程调度,IOManager首先是结合了epoll,实现了IO事件回调,自动执行网络上可读可写的事件回调;其次在IOManager增加了addEvent,delEvent,cancelEvent功能,任务一旦添加不再是一定会执行,还可以del,当cancel时事件会执行一边再取消;同时还有一个好处是epoll_wait是线程安全且阻塞的,因此可以释放cpu。

但是依然会出现某个事件sleep或等待IO就绪发生整个线程不在工作,执行task低效的问题

实现hook模块

hook实现机制

  1. 通过修改 LD_PRELOAD环境变量 或者 -Wl,-rpath=. 编译参数 来优先加载自己实现的动态库进行覆盖

    通过这种方法我们可以不用重新编译,而是生成自己的动态库,gcc -fPIC -shared hook.c -o libhook.so,修改LD_PRELOAD=”libhook.so”或者指定运行时的动态库搜索路径,-Wl,-rpath=./来实现hook

    结果通过ldd a.out可以看到动态库的链接顺序,实现全局符合介入

  2. 在源文件实现重载,使用dlsym找到系统调用符号地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    void *dlsym(void *handle, const char *symbol);
    /*
    RTLD_NEXT
    Find the next occurrence of the desired symbol in the
    search order after the current object. This allows one to
    provide a wrapper around a function in another shared
    object, so that, for example, the definition of a function
    in a preloaded shared object (see LD_PRELOAD in ld.so(8))
    can find and invoke the "real" function provided in
    another shared object (or for that matter, the "next"
    definition of the function in cases where there are
    multiple layers of preloading).
    RTLD_NEXT找出另一个同一个全局符号的下一个实现*/

    这一种方法我们直接在源码中重写改系统调用,然后通过dlsym定位到系统调用

hook了这些函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define HOOK_FUN(XX) \
XX(sleep) \
XX(usleep) \
XX(nanosleep) \
XX(socket) \
XX(connect) \
XX(accept) \
XX(read) \
XX(readv) \
XX(recv) \
XX(recvfrom) \
XX(recvmsg) \
XX(write) \
XX(writev) \
XX(send) \
XX(sendto) \
XX(sendmsg) \
XX(close) \
XX(fcntl) \
XX(ioctl) \
XX(getsockopt) \
XX(setsockopt)

像这些休眠函数什么也不做,我们就可以实现了定时器,当timeout时回到当前协程环境,而这段时间让cpu继续工作下去

在这里将fd封装成FdCtx,hook后返回fd就直接返回了FdCtx对象,通过声明static HookIniter s_hook_initer;使得name_f指向了系统调用,当hook没开启则调用原来的系统调用,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void hook_init()
{
static bool is_inited = false;
if (is_inited)
{
return;
}

// test
is_inited = true;

// assignment -> sleep_f = (sleep_fun)dlsym(RTLD_NEXT, "sleep"); -> dlsym -> fetch the original symbols/function
#define XX(name) name##_f = (name##_fun)dlsym(RTLD_NEXT, #name);
HOOK_FUN(XX)
#undef XX
}

// fd info
class FdCtx : public std::enable_shared_from_this<FdCtx>
{
private:
bool m_isInit = false;
bool m_isSocket = false;
bool m_sysNonblock = false;
bool m_userNonblock = false;
bool m_isClosed = false;
int m_fd;

// read event timeout
uint64_t m_recvTimeout = (uint64_t)-1;
// write event timeout
uint64_t m_sendTimeout = (uint64_t)-1;

public:
FdCtx(int fd);
~FdCtx();

bool init();
bool isInit() const { return m_isInit; }
bool isSocket() const { return m_isSocket; }
bool isClosed() const { return m_isClosed; }

void setUserNonblock(bool v) { m_userNonblock = v; }
bool getUserNonblock() const { return m_userNonblock; }

void setSysNonblock(bool v) { m_sysNonblock = v; }
bool getSysNonblock() const { return m_sysNonblock; }

void setTimeout(int type, uint64_t v);
uint64_t getTimeout(int type);
};

接下来做的好的一点就是,实现了IO操作的统一接口管理,让所有hook的IO操作函数调用do_io这个接口,

scheduleLock -> 调度协程 run -> fiber a -> func -> FdMgr添加FdCtx -> connect(hook为connect_with_timeout) -> send ->do_io(add WRITE event) -> a.yield ->b,c,d重复add WRITE event ->

调度协程triggerEvent(WRITE) ->这里cb为nullptr,因此sheduleLock(a) -> 后面又回到fiber a 的func进行 recv

到目前为止协程库就实现了异步操作,每个协程可以中途退出,同时设置了超时机制确保及时回到协程的func

下面是只有一个线程,只做一次send和recv

Fiber(): main id = 0
Fiber(): child id = 1
Scheduler::Scheduler() success
Scheduler::start() success
Schedule::stop() starts in thread: 49317
MainFunc start in Fiber id: 1
Schedule::run() starts in thread: 49317
Fiber(): child id = 2
Fiber(): child id = 3
MainFunc start in Fiber id: 3
addevent fd: 6 event: 4
MainFunc start in Fiber id: 2
IOManager::idle(), run in thread: 49317
IOManager::idle(), run in thread: 49317
triggerEvent()
IOManaer::tickle()
connected
send success
addevent fd: 6 event: 1
fiber 3yield()
IOManager::idle(), run in thread: 49317
IOManager::idle: receive tickle data
IOManager::idle(), run in thread: 49317
triggerEvent()
IOManaer::tickle()
recv success
HTTP/1.0 2
Fiber::MainFunc() : yield() in Fiber id : 3
Fiber(): id = 3
IOManager::idle(), run in thread: 49317
name = IOManageridle exits in thread: 49317
Fiber::MainFunc() : yield() in Fiber id : 2
Schedule::run() ends in thread: 49317
~Fiber(): id = 2
Fiber::MainFunc() : yield() in Fiber id : 1
m_schedulerFiber ends in thread:49317
Schedule::stop() ends in thread:49317
Scheduler::
Scheduler() success
~Fiber(): id = 1
~Fiber(): id = 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// universal template for read and write function
template <typename OriginFun, typename... Args>
static ssize_t do_io(int fd, OriginFun fun, const char *hook_fun_name, uint32_t event, int timeout_so, Args &&...args)
{
if (!mycoro::t_hook_enable)
{
return fun(fd, std::forward<Args>(args)...);
}

std::shared_ptr<mycoro::FdCtx> ctx = mycoro::FdMgr::GetInstance()->get(fd);
if (!ctx)
{
return fun(fd, std::forward<Args>(args)...);
}

if (ctx->isClosed())
{
errno = EBADF;
return -1;
}

if (!ctx->isSocket() || ctx->getUserNonblock())
{
return fun(fd, std::forward<Args>(args)...);
}

// get the timeout
uint64_t timeout = ctx->getTimeout(timeout_so);
// timer condition
std::shared_ptr<timer_info> tinfo(new timer_info);

retry:
// run the function
ssize_t n = fun(fd, std::forward<Args>(args)...);

// EINTR ->Operation interrupted by system ->retry
while (n == -1 && errno == EINTR)
{
n = fun(fd, std::forward<Args>(args)...);
}

// 0 resource was temporarily unavailable -> retry until ready
if (n == -1 && errno == EAGAIN)
{
mycoro::IOManager *iom = mycoro::IOManager::GetThis();
// timer
std::shared_ptr<mycoro::Timer> timer;
std::weak_ptr<timer_info> winfo(tinfo);

// 1 timeout has been set -> add a conditional timer for canceling this operation
if (timeout != (uint64_t)-1)
{
timer = iom->addConditionTimer(timeout, [winfo, fd, iom, event]()
{
auto t = winfo.lock();
if(!t || t->cancelled)
{
return;
}
t->cancelled = ETIMEDOUT;
// cancel this event and trigger once to return to this fiber
iom->cancelEvent(fd, (mycoro::IOManager::Event)(event)); }, winfo);
}

// 2 add event -> callback is this fiber
int rt = iom->addEvent(fd, (mycoro::IOManager::Event)(event));
if (rt)
{
std::cout << hook_fun_name << " addEvent(" << fd << ", " << event << ")";
if (timer)
{
timer->cancel();
}
return -1;
}
else
{
mycoro::Fiber::GetThis()->yield();

// 3 resume either by addEvent or cancelEvent
if (timer)
{
timer->cancel();
}
// by cancelEvent
if (tinfo->cancelled == ETIMEDOUT)
{
errno = tinfo->cancelled;
return -1;
}
goto retry;
}
}
return n;
}

至此一个N-M协程调度器就实现,结合了epoll和时间堆实现了io协程调度,对需要进行网络io的函数封装hook,完成了异步io读写功能

最后切换时间进程>线程>协程,在高并发的情况下,协程不能发挥更快的响应和更高的吞吐量,也让回调式的异步编程变得更加容易

然而本项目其实距离真的的高性能还有一步优化,因为协程的创建和销毁所需要的时间也是不可忽视的,在测试中,频繁的创建和销毁其实和多线程的性能并无差别,当高并发,高io,引入协程池的时候才有质的飞跃,可以参考微信libco