从 Go 源码目录结构和对应代码文件了解到 Go 在不同平台下的网络 I/O 模式的有不同实现。比如,在 Linux 系统下基于 epoll,freeBSD 系统下基于 kqueue,以及 Windows 系统下基于 iocp。

因为我们的代码都是部署在Linux上的,所以本文以epoll封装实现为例子来讲解Go语言中I/O多路复用的源码实现。

https://cloud.tencent.com/developer/article/1787492

EPOLL

  • 与select,poll一样,对I/O多路复用的技术
  • 只关心“活跃”的链接,无需遍历全部描述符集合
  • 能够处理大量的链接请求(系统可以打开的文件数目)

创建EPOLL

1
2
3
4
5
6
/** 
 * @param size 告诉内核监听的数目 
 * 
 * @returns 返回一个epoll句柄(即一个文件描述符) 
 */
int epoll_create(int size);
1
int epfd = epoll_create(1000);

image1

创建一个epoll句柄,实际上是在内核空间,建立一个红黑树的root根节点,这个根节点的关系与epfd相对应。

控制EPOLL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @param epfd 用epoll_create所创建的epoll句柄
* @param op 表示对epoll监控描述符控制的动作
*
* EPOLL_CTL_ADD(注册新的fd到epfd)
* EPOLL_CTL_MOD(修改已经注册的fd的监听事件)
* EPOLL_CTL_DEL(epfd删除一个fd)
*
* @param fd 需要监听的文件描述符
* @param event 告诉内核需要监听的事件
*
* @returns 成功返回0,失败返回-1, errno查看错误信息
*/
int epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);


struct epoll_event {
	__uint32_t events; /* epoll 事件 */
	epoll_data_t data; /* 用户传递的数据 */
}

/*
 * events : {EPOLLIN, EPOLLOUT, EPOLLPRI,
						 EPOLLHUP, EPOLLET, EPOLLONESHOT}
 */
typedef union epoll_data {
	void *ptr;
	int fd;
	uint32_t u32;
	uint64_t u64;
} e
1
2
3
4
5
6
struct epoll_event new_event;

new_event.events = EPOLLIN | EPOLLOUT;//写、读
new_event.data.fd = 5;

epoll_ctl(epfd, EPOLL_CTL_ADD, 5, &new_event);

创建一个用户态的事件,绑定到某个fd上,然后添加到内核中的epoll红黑树中。

image2

等待EPOLL

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
/**
*
* @param epfd 用epoll_create所创建的epoll句柄
* @param event 从内核得到的事件集合
* @param maxevents 告知内核这个events有多大,
* 注意: 值 不能大于创建epoll_create()时的size.
* @param timeout 超时时间
* -1: 永久阻塞
* 0: 立即返回,非阻塞
* >0: 指定微秒
*
* @returns 成功: 有多少文件描述符就绪,时间到时返回0
* 失败: -1, errno 查看错误
*/
int epoll_wait(int epfd, struct epoll_event *event,int maxevents, int timeout);

使用

1
2
3
struct epoll_event my_event[1000];

int event_cnt = epoll_wait(epfd, my_event, 1000, -1);

epoll_wait是一个阻塞的状态,如果内核检测到IO的读写响应,会抛给上层的epoll_wait, 返回给用户态一个已经触发的事件队列,同时阻塞返回。开发者可以从队列中取出事件来处理,其中事件里就有绑定的对应fd是哪个(之前添加epoll事件的时候已经绑定)。

image2

使用epoll编程主流程骨架

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
int epfd = epoll_crete(1000);

//将 listen_fd 添加进 epoll 中
epoll_ctl(epfd, EPOLL_CTL_ADD, listen_fd,&listen_event);

while (1) {
	//阻塞等待 epoll 中 的fd 触发
	int active_cnt = epoll_wait(epfd, events, 1000, -1);

	for (i = 0 ; i < active_cnt; i++) {
		if (evnets[i].data.fd == listen_fd) {
			//accept. 并且将新accept 的fd 加进epoll中.
		}
		else if (events[i].events & EPOLLIN) {
			//对此fd 进行读操作
		}
		else if (events[i].events & EPOLLOUT) {
			//对此fd 进行写操作
		}
	}
}

(1) 水平触发

img img

水平触发的主要特点是,如果用户在监听epoll事件,当内核有事件的时候,会拷贝给用户态事件,但是如果用户只处理了一次,那么剩下没有处理的会在下一次epoll_wait再次返回该事件

这样如果用户永远不处理这个事件,就导致每次都会有该事件从内核到用户的拷贝,耗费性能,但是水平触发相对安全,最起码事件不会丢掉,除非用户处理完毕。

(2) 边缘触发

img

img

边缘触发,相对跟水平触发相反,当内核有事件到达, 只会通知用户一次,至于用户处理还是不处理,以后将不会再通知。这样减少了拷贝过程,增加了性能,但是相对来说,如果用户马虎忘记处理,将会产生事件丢的情况。

模型1、单线程Accept+单线程读写业务

image-20220420112238352

image-20220420112256134

模型2、单线程Accept+多线程读写业务

image-20220420112530367

image-20220420112600290

模型3、单线程多路IO复用

image-20220420112639172

image-20220420112658951

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//telnet localhost 2001
package main
 
import (
	"fmt"
	"net"
	"os"
)
 
func main() {
	service := ":2001"
	tcpAddr, err := net.ResolveTCPAddr("tcp", service)
	checkError(err)
 
	mylistener, err := net.ListenTCP("tcp", tcpAddr)
	checkError(err)
 
	for {
		conn, err := mylistener.Accept()
		if err != nil {
			continue
		}
		handleRequest(conn)
		conn.Close()
 
	}
 
}
func checkError(err error) {
	if err != nil {
		fmt.Println("Fatal error :", err.Error())
		os.Exit(1)
	}
}
 
func handleRequest(conn net.Conn) {
	var mybuff [512]byte
	for {
		n, err := conn.Read(mybuff[0:])
		if err != nil {
			return
		}
		fmt.Println(string(mybuff[0:]))
		fmt.Println("localaddr is:", conn.LocalAddr())
		fmt.Println("remoteaddr is:", conn.RemoteAddr())
		fmt.Println("##########")
 
		_, err2 := conn.Write(mybuff[0:n])
		if err2 != nil {
			return
		}
	}
 
}

go的accept底层就实现了IO多路复用

模型4、单线程多路IO复用+多线程读写业务(工作池)

image-20220420112726073

image-20220420112805592

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main
 
import (
	"fmt"
	"net"
	"os"
)
 
func main() {
	service := ":2001"
	tcpAddr, err := net.ResolveTCPAddr("tcp", service)
	checkError(err)
 
	mylistener, err := net.ListenTCP("tcp", tcpAddr)
	checkError(err)
	for {
		
		conn, err := mylistener.Accept()
		defer conn.Close()
		if err != nil {
			continue
		}
		go handleRequest(conn)
		
 
	}
 
}
func checkError(err error) {
	if err != nil {
		fmt.Println("Fatal error :", err.Error())
		os.Exit(1)
	}
}
 
func handleRequest(conn net.Conn) {
	var mybuff [512]byte
	for {
		n, err := conn.Read(mybuff[0:])
		if err != nil {
			return
		}
		fmt.Println(string(mybuff[0:]))
		fmt.Println("localaddr is:", conn.LocalAddr())
		fmt.Println("remoteaddr is:", conn.RemoteAddr())
		fmt.Println("##########")
 
		_, err2 := conn.Write(mybuff[0:n])
		if err2 != nil {
			return
		}
	}
 
}
 

模型5、单线程IO复用+多线程IO复用(链接线程池)

image-20220420112836779

image-20220420112907499

模型5、单进程多路IO复用+多进程多路IO复用(进程池)

image-20220420112937393

image-20220420113022206

模型6、单线程多路IO复用+多线程多路IO复用+多线程

image-20220420113041582

image-20220420113121542

总结

综上,我们整理了7中Server的服务器处理结构模型,每个模型都有各自的特点和优势,那么对于多少应付高并发和高CPU利用率的模型,目前多数采用的是模型五(或模型五进程版,如Nginx就是类似模型五进程版的改版)。

至于并发模型并非设计的约复杂越好,也不是线程开辟的越多越好,我们要考虑硬件的利用与和切换成本的开销。模型六设计就极为复杂,线程较多,但以当今的硬件能力无法支撑,反倒导致该模型性能极差。所以对于不同的业务场景也要选择适合的模型构建,并不是一定固定就要使用某个来应用。