network.c

简介: https://github.com/felipecruz/rio/blob/master/src/network.
 
 
#include "network.h"
#include "buffer.h"
#include "websocket.h"

int eag = 0 ;

static int
     on_message ( http_parser * parser )
{
     return 0 ;
}

static int
     on_path ( http_parser * parser , const char * at , size_t len )
{
     rio_client * client = parser -> data ;
     client -> path = malloc ( sizeof ( char ) * ( len + 1 ));

     if ( client -> path == NULL ) {
         error_exit ( "Malloc" );
     }
     strncpy ( client -> path , at , len );
     client -> path [ len ] = '\0' ;

     client -> method = ( unsigned char ) parser -> method ;

     debug_print ( "HTTP-REQ method: %d \n " , ( int ) client -> method );
     debug_print ( "HTTP-REQ Path: %s %d \n " , client -> path , ( int ) len );

     return 0 ;
}

int on_header_field ( http_parser * parser , const char * at , size_t len ) {
     return 0 ;
}

int on_header_value ( http_parser * parser , const char * at , size_t len ) {
     return 0 ;
}

int on_headers_complete ( http_parser * parser ) {
     return 0 ;
}

int on_body ( http_parser * parser , const char * at , size_t len ) {
     return 0 ;
}

int on_message_complete ( http_parser * parser ) {
     return 0 ;
}

http_parser_settings parser_settings =
{
     on_message ,
     on_path ,
     on_header_field ,
     on_header_value ,
     on_headers_complete ,
     on_body ,
     on_message_complete
};

void
     handle_write ( rio_worker * worker , rio_client * cli , char * resp )
{
     struct epoll_event event ;
     int s , ret ;
     khiter_t k ;

     cli -> buffer = new_rio_buffer_size ( strlen ( resp ));
     rio_buffer_copy_data ( cli -> buffer , resp , strlen ( resp ));

     free ( resp );

     debug_print ( "Handle Write: %d : %s \n " , cli -> fd ,
                                 rio_buffer_get_data ( cli -> buffer ));

     event . events = EPOLLOUT ;
     event . data . fd = cli -> fd ;

     if ( epoll_ctl ( worker -> epoll_fd , EPOLL_CTL_MOD , cli -> fd , & event ) == - 1 ) {
         debug_print ( "Error on epoll_ctl_mod on %d \n " ,
                                                 cli -> fd , worker -> epoll_fd );
     }

     k = kh_put ( clients , h , cli -> fd , & ret );
     kh_value ( h , k ) = * cli ;
}

void
     do_write ( rio_worker * worker , rio_client * cli , struct epoll_event * event )
{
     int sent ;

     debug_print ( "Do Write to fd: %d : %s \n " , cli -> fd ,
                                     rio_buffer_get_data ( cli -> buffer ));

     do {
         sent = send ( cli -> fd ,
                     rio_buffer_get_data ( cli -> buffer ),
                     cli -> buffer -> length ,
                     MSG_DONTWAIT );

         if ( sent < 0 && errno != EAGAIN ) {
             debug_print ( "Do Write: send error on fd: %d errno: %d \n " ,
                         cli -> fd , errno );
             break ;
         } else if ( sent < 0 && errno == EAGAIN ) {
              debug_print ( "Do Write: EAGAIN on fd: %d \n " , cli -> fd );
              break ;
         } else if ( sent > 0 ) {
             rio_buffer_adjust ( cli -> buffer , sent );
         }
     } while ( sent > 0 && rio_buffer_get_data ( cli -> buffer ) != NULL );

     debug_print ( "Do Write sent: %d strlen: %zu \n " , sent , cli -> buffer -> length );

     remove_and_close ( cli , worker , event );

}

int
     handle_read ( rio_worker * worker , rio_client * cli , struct epoll_event * ev )
{
     size_t len = 4096 ;
     ssize_t received = 0 ;
     ssize_t total_received = 0 ;

     //allocate space for data
     cli -> buffer = new_rio_buffer_size ( sizeof ( char ) * 4096 );

     debug_print ( "Handle Read from %d \n " , cli -> fd );

     do {
         received = recv ( ev -> data . fd , cli -> buffer -> content , len , MSG_DONTWAIT );
         if ( received < 0 ) {
             if ( errno != EAGAIN && errno != EWOULDBLOCK ) {
                 if ( received == 0 ) {
                     //if error, remove from epoll and close socket
                     debug_print ( "Client received error: disconnected"
                                 "errno %d \n " , errno );
                 } else {
                     debug_print ( "Some other error %d \n " , errno );
                 }
                 //handle_http will take care of this :)
             } else {
                 //received += 1;
                 debug_print ( "EAGAIN on recv from fd: %d \n " , cli -> fd );

                 //if EAGAIN, insert on epoll again
                 ev -> events = EPOLLIN | EPOLLET ;
                 //add socket to epoll
                 if ( epoll_ctl ( worker -> epoll_fd ,
                                 EPOLL_CTL_MOD ,
                                 cli -> fd ,
                                 ev ) == - 1 ) {
                     error_exit ( "Could not add conn_sock to epoll" );
                 }

                 eag += 1 ;
                 printf ( "EAGAIN %d \n " , eag );

             }
             break ;
         } else if ( received == 0 ) {
             //client disconnected
             return 0 ;
         } else {
             total_received += received ;
             debug_print ( "READ AGAIN on %d \n " , cli -> fd );
         }
     } while ( received > 0 && rio_buffer_get_data ( cli -> buffer ) != NULL );

     debug_print ( "Total received %zu \n " , total_received );

     cli -> buffer -> length = total_received ;
     return received ;
}

int
     remove_and_close ( rio_client * client ,
                      rio_worker * worker ,
                      struct epoll_event * event )
{
     int rc = epoll_ctl ( worker -> epoll_fd , EPOLL_CTL_DEL , client -> fd , event );

     if ( rc == - 1 ) {
        debug_print ( "[WARNING] on epoll_ctl_del on %d \n " ,
                    client -> fd , worker -> epoll_fd );
     }

     if ( close ( client -> fd ) == - 1 ) {
         debug_print ( "Error on close client %d \n " ,
                                             client -> fd , worker -> epoll_fd );
     }

     if ( client -> buffer != NULL ) {
         rio_buffer_free ( & client -> buffer );
     }

     return rc ;
}

void
     handle_http ( rio_worker * worker , struct epoll_event event , rio_client * cli )
{
     int response ;

     char buf [ 4096 ];
     char resp [ 1024 ];

     size_t n ;

     if ( event . events & EPOLLIN ) {
         //handle read
         int received = handle_read ( worker , cli , & event );

         //create http parser
         http_parser * parser = malloc ( sizeof ( http_parser ));

         if ( ! parser ){
             error_exit ( "malloc error: http_parser" );
         }

         http_parser_init ( parser , HTTP_REQUEST );

         //set parser data
         parser -> data = ( void * ) cli ;

         //execute http parsing only if data was read
         if ( received > 0 ) {
             debug_print ( "Execute http parsing client: %d \n " , cli -> fd );
             n = http_parser_execute ( parser ,
                                     & parser_settings ,
                                     rio_buffer_get_data ( cli -> buffer ),
                                     received );
         }

         if ( parser -> upgrade ) {
             //#TODO: what to do?
         } else if ( received == 0 ) { // client disconnected!
             debug_print ( "Client %d Disconnected! \n " , cli -> fd );

             //delete fd from epoll and close
             remove_and_close ( cli , worker , & event );

             free ( parser );
             return ;
         } else if ( n != received ) {
             debug_print ( "Error parsing, closing socket n:%zu received:%d \n " ,
                         n , received );

             //delete fd from epoll and close
             remove_and_close ( cli , worker , & event );

             free ( parser );
             return ;
         }

         rio_buffer_free ( & cli -> buffer );
         response = dispatch ( cli , cli -> path );

         if ( response != DISPATCH_FINISHED ) {
             //write response
             debug_print ( "Async Dispatch to fd: %d \n " , cli -> fd );
         }

         debug_print ( "Freeing %s \n " , cli -> path );

         free ( cli -> path );
         cli -> path = NULL ;
         free ( parser );
     } else if ( event . events & EPOLLOUT ) {
         //if socket is ready to write, do it!
         do_write ( worker , cli , & event );
     }
}

void
     init_clients ()
{
    h = kh_init ( clients );
}

void
     free_clients ()
{
     khiter_t element ;
     rio_client * cli ;

     debug_print ( "Closing clients structures \n " , h );

     for ( element = kh_begin ( h ); element != kh_end ( h ); ++ element ) {
         if ( kh_exist ( h , element )) {
             debug_print ( "%d \n " , (( rio_client ) kh_val ( h , element )). fd );
             kh_del ( clients , h , element );
         }
     }

     kh_destroy ( clients , h );
}

int
     socket_bind ()
{
     int server_fd ;
     int arg ;
     struct sockaddr_in sin ;

     //bind
     memset ( & sin , 0 , sizeof ( struct sockaddr_in ));
     sin . sin_family = AF_INET ;
     sin . sin_port = htons ( 80 );
     sin . sin_addr . s_addr = inet_addr ( "0.0.0.0" );

     //create socket
     if (( server_fd = socket ( AF_INET , SOCK_STREAM , 0 )) < 0 ) {
         error_exit ( "Could not create socket." );
     }

     //set socket non-blocking
     if ( fcntl ( server_fd , F_SETFL , O_NONBLOCK ) == - 1 ) {
         error_exit ( "Could not set socket non-blocking" );
     }

     //set socket options
     arg = 1 ;
     if ( setsockopt ( server_fd , SOL_SOCKET , SO_REUSEADDR , & arg , sizeof ( arg )) == - 1 ) {
         error_exit ( "Socket options" );
     }

     //bind socket to local addr
     if ( bind ( server_fd , ( struct sockaddr * ) & sin , sizeof ( sin )) < 0 ) {
         error_exit ( "bind" );
     }

     //listen on this socket
     if ( listen ( server_fd , MAX_EVENTS ) < 0 ) {
         error_exit ( "listen" );
     }

     return server_fd ;
}

void
     accept_incoming_connection ( rio_runtime * runtime , rio_worker * worker )
{
     int new_connection_socket ;
     int flags ;
     int ret ;

     unsigned int client_len ;

     struct epoll_event ev ;
     struct sockaddr_in temp_client ;

     rio_client cli ;
     khiter_t k ;

     client_len = sizeof ( temp_client );

     //accept client connection
     new_connection_socket = accept ( runtime -> server_fd ,
                                   ( struct sockaddr * ) & temp_client ,
                                    & client_len );

     if ( new_connection_socket == - 1 ) {
         //#TODO what to do?
         error_exit ( "Could not accept socket" );
     }

     //check sockets flags and set non-blocking after
     if ( - 1 == ( flags = fcntl ( new_connection_socket , F_GETFL , 0 ))) {
         flags = 0 ;
     }

     if ( fcntl ( new_connection_socket , F_SETFL , flags | O_NONBLOCK ) == - 1 ) {
         error_exit ( "Could not set client socket non-blocking" );
     }

     ev . events = EPOLLIN | EPOLLET ;
     ev . data . fd = new_connection_socket ;

     //add socket to epoll
     if ( epoll_ctl ( worker -> epoll_fd ,
                   EPOLL_CTL_ADD ,
                   new_connection_socket ,
                   & ev ) == - 1 ) {
         error_exit ( "Could not add conn_sock to epoll" );
     }

     //store client information
     cli . fd = new_connection_socket ;
     cli . websocket = 0 ;
     cli . buffer = NULL ;

     k = kh_put ( clients , h , new_connection_socket , & ret );
     kh_value ( h , k ) = cli ;

     debug_print ( "New Client: %d \n " , cli . fd );
}

void
     run_worker ( int id , rio_worker * worker , rio_runtime * runtime )
{
     int size_epoll_events ;
     int rc ;

     struct epoll_event ev , events [ MAX_EVENTS ];

     khiter_t k ;
     rio_client cli ;

     sprintf ( worker -> name , "worker %d" , id );
     debug_print ( "Identifying worker as %s pid %d \n " , worker -> name ,
                                                      getpid ());

     init_clients ();
     init_dispatcher ();
     init_static_server ();

     worker -> zmq_context = zmq_init ( 1 );
     worker -> master = zmq_socket ( worker -> zmq_context , ZMQ_SUB );

     zmq_setsockopt ( worker -> master , ZMQ_SUBSCRIBE , "" , strlen ( "" ));
     zmq_connect ( worker -> master , "ipc:///tmp/rio_master.sock" );

     //create epoll
     worker -> epoll_fd = epoll_create ( MAX_EVENTS );
     if ( worker -> epoll_fd == - 1 ) {
         error_exit ( "epoll_create" );
     }

     //configure epoll events and file descriptor
     ev . events = EPOLLIN | EPOLLPRI ;
     ev . data . fd = runtime -> server_fd ;

     //add listen socket to epoll
     if ( epoll_ctl ( worker -> epoll_fd ,
                   EPOLL_CTL_ADD ,
                   runtime -> server_fd ,
                   & ev ) == - 1 ) {
         error_exit ( "epoll_ctl: listen_sock" );
     }

     while ( 1 ) {
         //poll events
         size_epoll_events = epoll_wait ( worker -> epoll_fd ,
                                        events ,
                                        MAX_EVENTS ,
                                        100 );

         if ( size_epoll_events == - 1 && errno != EWOULDBLOCK ) {
             break ;
         }

         for ( int n = 0 ; n < size_epoll_events ; ++ n ) {
             //if event fd == server fd -> accept new connection
             if ( events [ n ]. data . fd == runtime -> server_fd ) {
                 accept_incoming_connection ( runtime , worker );
             } else { //handle in out readyness :)
                 //retrieve client info by fd and handle event
                 k = kh_get ( clients , h , events [ n ]. data . fd );
                 cli = kh_val ( h , k );
                 handle_http ( worker , events [ n ], & cli );
             }
         }
         //dispatch responses
         dispatch_responses ( worker );

         //look for master messages
         zmq_msg_t msg ;
         zmq_msg_init ( & msg );
         rc = zmq_recv ( worker -> master , & msg , ZMQ_NOBLOCK );
         if ( rc == 0 ) {
             debug_print ( "Worker %d Received %s from master \n " ,
                                             id ,
                                             ( char * ) zmq_msg_data ( & msg ));
         }

         if ( strcmp (( char * ) zmq_msg_data ( & msg ), "terminate" ) == 0 ) {
             zmq_msg_close ( & msg );
             break ;
         }

         zmq_msg_close ( & msg );

     }
     debug_print ( " \n Worker terminating gracefully \n " , worker );

     rc = zmq_close ( worker -> master );
     debug_print ( "Worker ZMQ Socket close return %d \n " , rc );

     rc = zmq_term ( worker -> zmq_context );
     debug_print ( "Worker ZMQ Context termination return :%d \n " , rc );

     free_clients ();
     destroy_static_server ();
     destroy_dispatcher ();
     close ( worker -> epoll_fd );
}
目录
相关文章
|
机器学习/深度学习 数据挖掘 TensorFlow
How to Deploy a Neural Network on TH1520
How to Deploy a Neural Network on TH1520
296 0
How to Deploy a Neural Network on TH1520
|
机器学习/深度学习 数据可视化 数据挖掘
【图像分类系列】(一)--Network In Network
【图像分类系列】(一)--Network In Network
114 0
【图像分类系列】(一)--Network In Network
|
网络协议 网络虚拟化
|
网络协议