使用select多路复用技术的非阻塞模型
select多路复用通常具有很好的跨平台性,也能提供不错的并发性能,但是在通常情况下有最大监听文件描述符的限制(通常1024),如果不需要达到C10K这种前端高性能服务器的要求,采用select+nonblocking的方式能降低编程的难度
用到的接口
FD_SETSIZE;
FD_SET(<#fd#>, <#fdsetp#>);
FD_ISSET(<#fd#>, <#fdsetp#>);
FD_ZERO(<#fdsetp#>);
select(<#(int)__nfds#>, <#(fd_set*)__readfds#>, <#(fd_set*)__writefds#>, <#(fd_set*)__exceptfds#>, <#(struct timeval*)__timeout#>)
//select-nonblocking
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <errno.h>
#include <assert.h>
#include <sys/socket.h>
#include <sys/select.h>
#define MAX_READ_SIZE 20*1024*1024
#define MAX_WRITE_SIZE 20*1024*1024
#define MAX_CHUNCK_SIZE 2*1024*1024
#define KEEP_ALIVE 0
/*
*
* a simple server using select
* feilengcui008@gmail.com
*
*/
void run();
int main(int argc, char *argv[])
{
run();
return 0;
}
//struct for storing read-write buffer
struct fd_state {
char buffer[MAX_READ_SIZE];
char write_buffer[MAX_WRITE_SIZE];
int writing;
size_t read_len;
size_t written_len;
size_t write_upto_len;
};
//alloc fd_state
struct fd_state *alloc_fd_state(void)
{
struct fd_state *p = (struct fd_state *) malloc(sizeof(struct fd_state));
if (!p) return NULL;
p->read_len = p->writing = p->written_len = p->write_upto_len = 0;
return p;
};
//free fd_state pointer
void free_fd_state(struct fd_state *p)
{
free(p);
}
//handle error
void error_exit()
{
perror("errors happen");
exit(EXIT_FAILURE);
}
//set sock non-blocking
void made_nonblock(int fd)
{
if(fcntl(fd, F_SETFL, O_NONBLOCK)==-1){
error_exit();
}
}
//handle read event
int do_read(int fd,struct fd_state *state)
{
char buf[MAX_CHUNCK_SIZE];
int i;
ssize_t result;
while (1){
result = recv(fd, buf, sizeof(buf), 0);
/*printf("readlen:%d\n",(int)result);
fflush(stdout);*/
if (result<=0)
break;
//read buf to fd_state.buffer until "\n"
for (int j = 0; j < result; ++j) {
if (state->read_len< sizeof(state->buffer))
state->buffer[state->read_len++] = buf[j];
/*read until "\n"
todo:
handle the read buffer and set the write buffer
if (buf[j]=='\n'){
state->writing = 1;
state->write_upto_len = state->read_len;
break;
}*/
}
}
state->writing = 1;
state->write_upto_len = state->read_len;
//write(1, state->buffer, state->read_len);
//nothing read
if (result==0){
return 1;
}
if (result<0){
//nonblocking
if (errno== EAGAIN)
return 0;
else
return -1;
}
return 0;
}
int do_write(int fd,struct fd_state *state)
{
//we borrow the readbuffer just for test
//we should have a write buffer to store our http response body
ssize_t result;
while (state->written_len<state->write_upto_len){
result = send(fd, state->buffer+state->written_len, state->write_upto_len-state->written_len, 0);
if (result<=0){
break;
}
state->written_len+=result;
}
if (state->written_len==state->read_len)
state->written_len = state->write_upto_len = state->read_len = 0;
state->writing = 0;
if (result<0){
if (errno== EAGAIN)
return 0;
else
return -1;
}
if (result==0)
return 1;
return 0;
}
void do_select(int serverfd)
{
//set init fd_state
struct fd_state *state[FD_SETSIZE];
for (int i = 0; i < FD_SETSIZE; ++i) {
state[i] = NULL;
}
fd_set read_set,write_set,error_set;
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_ZERO(&error_set);
//main loop
while(1){
int maxfd = serverfd;
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_ZERO(&error_set);
FD_SET(serverfd, &read_set);
//server accept operation just alloc the fd_state struct
//do_read and do_write just change the fd_state's state
//here we add server sock and client socks to fd_set
//we don not pass the readset/writeset to do_read or do_write func
//so we add read write event here
//when use poll,we can direct change the state in do_read and do_write use the events status
for (int i = 0; i < FD_SETSIZE; ++i) {
if (state[i]){
if (i>maxfd){
maxfd = i;
}
FD_SET(i, &read_set);
if (state[i]->writing){
FD_SET(i, &write_set);
}
}
}
//block until event happens
if(select(maxfd+1, &read_set, &write_set, &error_set, NULL)<0){
error_exit();
}
// if server becomes readable then accept the client sock
// and alloc the client sock state
if(FD_ISSET(serverfd, &read_set)){
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int client = accept(serverfd, (struct sockaddr *)&ss, &slen);
if(client<0){
error_exit();
}else if(client> FD_SETSIZE){
close(client);
}else{
made_nonblock(client);
state[client] = alloc_fd_state();
}
}
//handle the fd_set
for (int j = 0; j < maxfd + 1; ++j) {
int flag = 0;
if (j==serverfd) continue;
//handle read
if (FD_ISSET(j, &read_set)) flag = do_read(j,state[j]);
//handle write
if (flag==0&& FD_ISSET(j, &write_set)){
flag = do_write(j, state[j]);
//no matter what flag is,we close after write ops
if(!KEEP_ALIVE){
free_fd_state(state[j]);
state[j] = NULL;
close(j);
}
}
//handle error
if (flag){
free_fd_state(state[j]);
state[j] = NULL;
close(j);
}
}
}
}
int create_server_socket()
{
int serverfd = socket(AF_INET, SOCK_STREAM, 0);
if(serverfd<0){
error_exit();
}
int reuse = 1;
if (setsockopt(serverfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))<0){
error_exit();
}
return serverfd;
}
void run()
{
int serverfd = create_server_socket();
made_nonblock(serverfd);
//server sockaddr
struct sockaddr_in sin;
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = 0;
sin.sin_port = htons(8000);
if(bind(serverfd, (struct sockaddr *)&sin, sizeof(sin))<0){
error_exit();
}
if(listen(serverfd, 50)<0){
error_exit();
}
do_select(serverfd);
}