学习 MySQL Proxy 0.8.3 的源码后可知,其全部事件处理线程均对全局 socketpair 的读端进行了监听,以实现通知管道的功能:threads->event_notify_fds[0] 。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
int
chassis_event_threads_init_thread(chassis_event_threads_t *threads, chassis_event_thread_t *event_thread, chassis *chas) {
event_thread->event_base = event_base_new();
...
// 设置当前线程监听 fd 为 socketpair 的读端 fd
event_thread->notify_fd = dup(threads->event_notify_fds[0]);
...
event_set(&(event_thread->notify_fd_event), event_thread->notify_fd, EV_READ | EV_PERSIST, chassis_event_handle, event_thread);
event_base_set(event_thread->event_base, &(event_thread->notify_fd_event));
event_add(&(event_thread->notify_fd_event), NULL);
return
0;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
chassis_event_threads_t *chassis_event_threads_new() {
...
threads = g_new0(chassis_event_threads_t, 1);
/* create the ping-fds
*
* the event-thread write a byte to the ping-pipe to trigger a fd-event when
* something is available in the event-async-queues
*/
// 创建 socketpair
if
(0 != evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, threads->event_notify_fds)) {
...
}
...
/* make both ends non-blocking */
evutil_make_socket_nonblocking(threads->event_notify_fds[0]);
evutil_make_socket_nonblocking(threads->event_notify_fds[1]);
return
threads;
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
|
int
evutil_socketpair(
int
family,
int
type,
int
protocol,
int
fd[2])
{
#ifndef WIN32
return
socketpair(family, type, protocol, fd);
#else
/* This code is originally from Tor. Used with permission. */
/* This socketpair does not work when localhost is down. So
* it's really not the same thing at all. But it's close enough
* for now, and really, when localhost is down sometimes, we
* have other problems too.
*/
int
listener = -1;
int
connector = -1;
int
acceptor = -1;
struct
sockaddr_in listen_addr;
struct
sockaddr_in connect_addr;
int
size;
int
saved_errno = -1;
if
(protocol
#ifdef AF_UNIX
|| family != AF_UNIX
#endif
) {
EVUTIL_SET_SOCKET_ERROR(WSAEAFNOSUPPORT);
return
-1;
}
if
(!fd) {
EVUTIL_SET_SOCKET_ERROR(WSAEINVAL);
return
-1;
}
// 创建作为listener 的socket
listener = socket(AF_INET, type, 0);
if
(listener < 0)
return
-1;
memset
(&listen_addr, 0,
sizeof
(listen_addr));
listen_addr.sin_family = AF_INET;
listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
listen_addr.sin_port = 0;
/* kernel chooses port. */
// 进行绑定,内核会分配port
if
(bind(listener, (
struct
sockaddr *) &listen_addr,
sizeof
(listen_addr)) == -1)
goto
tidy_up_and_fail;
// 宣告开始监听连接请求
if
(listen(listener, 1) == -1)
goto
tidy_up_and_fail;
// 创建作为connector 的socket
connector = socket(AF_INET, type, 0);
if
(connector < 0)
goto
tidy_up_and_fail;
/* We want to find out the port number to connect to. */
size =
sizeof
(connect_addr);
// 获取bind 后内核为listener 分配的port ( ip 为INADDR_LOOPBACK )
if
(getsockname(listener, (
struct
sockaddr *) &connect_addr, &size) == -1)
goto
tidy_up_and_fail;
if
(size !=
sizeof
(connect_addr))
goto
abort_tidy_up_and_fail;
// 从connector 向listener 发起连接,connect_addr 为连接目的地址
if
(connect(connector, (
struct
sockaddr *) &connect_addr,
sizeof
(connect_addr)) == -1)
goto
tidy_up_and_fail;
size =
sizeof
(listen_addr);
// 在套接字listener 上accept ,函数返回后listen_addr 中为对端地址
acceptor = accept(listener, (
struct
sockaddr *) &listen_addr, &size);
if
(acceptor < 0)
goto
tidy_up_and_fail;
if
(size !=
sizeof
(listen_addr))
goto
abort_tidy_up_and_fail;
// 关闭listener
EVUTIL_CLOSESOCKET(listener);
/* Now check we are talking to ourself by matching port and host on the
two sockets. */
// 获取connect 后内核为connector 分配的地址信息-- 自动绑定功能
if
(getsockname(connector, (
struct
sockaddr *) &connect_addr, &size) == -1)
goto
tidy_up_and_fail;
// 将从两侧分别获得的地址地址进行比较
if
(size !=
sizeof
(connect_addr)
|| listen_addr.sin_family != connect_addr.sin_family
|| listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
|| listen_addr.sin_port != connect_addr.sin_port)
goto
abort_tidy_up_and_fail;
fd[0] = connector;
fd[1] = acceptor;
return
0;
abort_tidy_up_and_fail:
saved_errno = WSAECONNABORTED;
tidy_up_and_fail:
if
(saved_errno < 0)
saved_errno = WSAGetLastError();
if
(listener != -1)
EVUTIL_CLOSESOCKET(listener);
if
(connector != -1)
EVUTIL_CLOSESOCKET(connector);
if
(acceptor != -1)
EVUTIL_CLOSESOCKET(acceptor);
EVUTIL_SET_SOCKET_ERROR(saved_errno);
return
-1;
#endif
}
|
实现上述功能的另外一种方式是,使用 pipe 。用法很简单,摘抄代码如下(摘自 memcached-1.4.14):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
void
thread_init(
int
nthreads,
struct
event_base *main_base) {
...
// nthreads 为创建的工作线程数
for
(i = 0; i < nthreads; i++) {
int
fds[2];
if
(pipe(fds)) {
// 使用pipe 作为工作线程获取任务的通道
perror
(
"Can't create notify pipe"
);
exit
(1);
}
threads[i].notify_receive_fd = fds[0];
// 读端
threads[i].notify_send_fd = fds[1];
// 写端
// 设置用于每个工作线程的libevent 相关信息并创建CQ 结构
setup_thread(&threads[i]);
...
}
/* Create threads after we've done all the libevent setup. */
// 创建工作线程
for
(i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
...
}
|
====== 更新 2013-11-11 ======
最近写 Modb 代码时,想要利用上面的线程间通信机制,所以使用了相对简单的 pipe 实现方案,但在 windows 下调试时总会遇到 “Unknown error 10038” 错误。查阅相关文档后发现,结论是 windows 下不能将 pipe 和 select 一起使用,因为会认为 pipe 不是一个合法的 socket 句柄,然后 linux 下是没有这个问题的。
解决方案:
- 通过 socket 模拟 pipe 的实现;
- 使用上面的 socketpair 实现;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
int
pipe(
int
fildes[2])
{
int
tcp1, tcp2;
sockaddr_in name;
memset
(&name, 0,
sizeof
(name));
name.sin_family = AF_INET;
name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
int
namelen =
sizeof
(name);
tcp1 = tcp2 = -1;
int
tcp = socket(AF_INET, SOCK_STREAM, 0);
if
(tcp == -1){
goto
clean;
}
if
(bind(tcp, (sockaddr*)&name, namelen) == -1){
goto
clean;
}
if
(listen(tcp, 5) == -1){
goto
clean;
}
if
(getsockname(tcp, (sockaddr*)&name, &namelen) == -1){
goto
clean;
}
tcp1 = socket(AF_INET, SOCK_STREAM, 0);
if
(tcp1 == -1){
goto
clean;
}
if
(-1 == connect(tcp1, (sockaddr*)&name, namelen)){
goto
clean;
}
tcp2 = accept(tcp, (sockaddr*)&name, &namelen);
if
(tcp2 == -1){
goto
clean;
}
if
(closesocket(tcp) == -1){
goto
clean;
}
fildes[0] = tcp1;
fildes[1] = tcp2;
return
0;
clean:
if
(tcp != -1){
closesocket(tcp);
}
if
(tcp2 != -1){
closesocket(tcp2);
}
if
(tcp1 != -1){
closesocket(tcp1);
}
return
-1;
}
|
- 效率低下(是否所有其他实现方式都比基于 socket 的方式高效?)
- 占用了两个 TCP 端口(pipe 不会占用端口)
- accept 的返回值未必就是 tcp1 连接过来的(多线程或者别的进程在干预), 所以最好通过发送数据进行确认(这个比较严重,在有多个连接同时进入的时候确实无法保证当前连接时正确的)
- 由于不是匿名的, 所以可以在 netstat 里面看到(看到又怎样?)
将该 pipe 实现和上面的 socketpair 的实现进行对比,发现两者根本就是同一个东东,并且 pipe 的实现没有 libevent 中 socketpair 实现写的好。所以 pipe 实现的作者指出的那些缺点,本人持保留意见。看客自己斟酌。
补充:由于上面的 socketpair 是基于 INADDR_LOOPBACK 的,所以如果 lo 必须处于 up 状态才行。