原实例在APUE(第三版)17.2 UNIX域套接字
1、使用UNIX与套接字轮询XSI消息队列(poll版,原版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
#include "apue.h"
#include <poll.h>
#include <pthread.h>
#include <sys/msg.h>
#include <sys/socket.h>
#define NQ 3 //队列的数量
#define MAXMSZ 512 //消息的最大长度
#define KEY 0x123 //消息队列的第一个key值
struct
threadinfo {
int
qid;
int
fd;
};
struct
mymesg {
long
mtype;
char
mtext[MAXMSZ];
};
void
*helper(
void
*arg)
{
int
n;
struct
mymesg m;
struct
threadinfo *tip = arg;
for
(;;) {
printf
(
"helper qid %d, fd %d, tid %u\n"
, tip->qid, tip->fd, (unsigned)pthread_self());
memset
(&m, 0,
sizeof
(m));
if
((n = msgrcv(tip->qid, &m, MAXMSZ, 0, MSG_NOERROR)) < 0) {
err_sys(
"msgrcv error"
);
}
if
(write(tip->fd, m.mtext, n) < 0) {
err_sys(
"write error"
);
}
}
}
int
main()
{
int
i, n, err;
int
fd[2];
int
qid[NQ];
struct
pollfd pfd[NQ];
struct
threadinfo ti[NQ];
pthread_t tid[NQ];
char
buf[MAXMSZ];
for
(i = 0; i < NQ; ++i) {
if
((qid[i] = msgget((KEY+i), IPC_CREAT|0666)) < 0) {
//创建一个新队列
err_sys(
"msgget error"
);
}
printf
(
"queue %d ID is %d\n"
, i, qid[i]);
if
(socketpair(AF_UNIX, SOCK_DGRAM, 0, fd) < 0) {
//创建UNXI域套接字(fd管道)
err_sys(
"socketpair error"
);
}
printf
(
"fd[0]:%d\n"
, fd[0]);
printf
(
"fd[1]:%d\n"
, fd[1]);
pfd[i].fd = fd[0];
pfd[i].events = POLLIN;
ti[i].qid = qid[i];
ti[i].fd = fd[1];
if
((err = pthread_create(&tid[i], NULL, helper, &ti[i])) != 0) {
//创建线程
err_exit(err,
"pthread_create error"
);
}
}
for
(;;) {
if
(poll(pfd, NQ, -1) < 0) {
//等待事件发生
err_sys(
"poll error"
);
}
for
(i = 0; i < NQ; ++i) {
//printf("i:%d\n", i);
if
(pfd[i].revents & POLLIN) {
if
((n = read(pfd[i].fd, buf,
sizeof
(buf))) < 0) {
err_sys(
"read error"
);
}
buf[n] = 0;
printf
(
"queue %d id %d, message %s\n"
, i, qid[i], buf);
}
}
}
exit
(0);
}
|
编译命令:
1
|
gcc pollmsg.c -o pollmsg -lapue -lpthread -std=c99
|
2、使用UNIX与套接字轮询XSI消息队列(epoll版,改版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
#include "apue.h"
#include <pthread.h>
#include <sys/msg.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#define NQ 3 //队列的数量
#define MAXMSZ 512 //消息的最大长度
#define KEY 0x123 //消息队列的第一个key值
#define FDSIZE 1000
#define EPOLLEVENTS 100
struct
threadinfo {
int
qid;
int
fd;
};
struct
mymesg {
long
mtype;
char
mtext[MAXMSZ];
};
void
*helper(
void
*arg)
{
int
n;
struct
mymesg m;
struct
threadinfo *tip = arg;
for
(;;) {
printf
(
"helper qid %d, fd %d, tid %u\n"
, tip->qid, tip->fd, (unsigned)pthread_self());
memset
(&m, 0,
sizeof
(m));
if
((n = msgrcv(tip->qid, &m, MAXMSZ, 0, MSG_NOERROR)) < 0) {
err_sys(
"msgrcv error"
);
}
if
(write(tip->fd, m.mtext, n) < 0) {
err_sys(
"write error"
);
}
}
}
int
main()
{
int
i, n, err;
int
fd[2];
int
qid[NQ];
int
epollfd;
struct
epoll_event events[EPOLLEVENTS];
struct
threadinfo ti[NQ];
pthread_t tid[NQ];
char
buf[MAXMSZ];
epollfd = epoll_create(FDSIZE);
//创建epoll文件描述符
for
(i = 0; i < NQ; ++i) {
if
((qid[i] = msgget((KEY+i), IPC_CREAT|0666)) < 0) {
//创建一个新队列
err_sys(
"msgget error"
);
}
printf
(
"queue %d ID is %d\n"
, i, qid[i]);
if
(socketpair(AF_UNIX, SOCK_DGRAM, 0, fd) < 0) {
//创建UNXI域套接字(fd管道)
err_sys(
"socketpair error"
);
}
struct
epoll_event ev;
ev.events = EPOLLIN;
ev.data.fd = fd[0];
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd[0], &ev);
//注册fd[0]到epoll
ti[i].qid = qid[i];
ti[i].fd = fd[1];
if
((err = pthread_create(&tid[i], NULL, helper, &ti[i])) != 0) {
//创建线程
err_exit(err,
"pthread_create error"
);
}
}
for
(;;) {
int
occurred;
if
((occurred = epoll_wait(epollfd, events, EPOLLEVENTS, -1)) < 0) {
//等待事件发生
err_sys(
"epoll error"
);
}
if
(occurred == 0) {
err_sys(
"epoll timeout"
);
}
for
(i = 0; i < occurred; ++i) {
if
(events[i].events & EPOLLIN) {
if
((n = read(events[i].data.fd, buf,
sizeof
(buf))) < 0) {
err_sys(
"read error"
);
}
buf[n] = 0;
printf
(
"main thread %u, message %s\n"
, (unsigned)pthread_self(), buf);
}
}
}
exit
(0);
}
|
编译命令:
1
|
gcc epollmsg.c -o epollmsg -lapue -lpthread -std=c99
|
3、给XSI消息队列发送消息(测试程序,原版)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
#include "apue.h"
#include <sys/msg.h>
#define MAXMSZ 512
struct
mymesg {
long
mtype;
char
mtext[MAXMSZ];
};
int
main(
int
argc,
char
*argv[])
{
key_t key;
long
qid;
size_t
nbytes;
struct
mymesg m;
if
(argc != 3) {
fprintf
(stderr,
"usage: sendmsg KEY message\n"
);
exit
(1);
}
key =
strtol
(argv[1], NULL, 0);
//printf("key:0x%08X\n", (unsigned )key);
if
((qid = msgget(key, 0)) < 0) {
//打开一个现有队列
err_sys(
"can't open queue key %s"
, argv[1]);
}
memset
(&m, 0,
sizeof
(m));
strncpy
(m.mtext, argv[2], MAXMSZ - 1);
nbytes =
strlen
(m.mtext);
m.mtype = 1;
//printf("qid:%ld\n", qid);
if
(msgsnd(qid, &m, nbytes, 0) < 0) {
//发送消息给指定消息队列
err_sys(
"can't send message"
);
}
exit
(0);
}
|
编译命令:
1
|
gcc sendmsg.c -o sendmsg -lapue -std=c99
|
相关阅读:
1、select、poll、epoll之间的区别总结[整理]
3、APUE读书笔记
*** walker ***
本文转自walker snapshot博客51CTO博客,原文链接http://blog.51cto.com/walkerqt/1768445如需转载请自行联系原作者
RQSLT