【网络编程】并发服务器的设计

简介: 【网络编程】并发服务器的设计

并发服务器的设计

文章目录

多进程并发服务器

进程管理

流程:


1.主1 创建套接字并将其绑定到所提供服务的熟知地址上。让该套接字保持非连接


2.主2 将该端口设置为被动模式,使其准备为服务器所用


3.主3 反复调用accept以便接受来自客户的下一个连接请求,并创建新的从线程或进程来处理响应


4.从1 由主进程传递来的连接请求(即针对连接的套接字)开始


5.从2 用该连接与客户进行交互:读取请求并发回响应


6.从3 关闭连接并退出。在处理完来自客户的所有请求后,从线程就退出


进程定义:一个运行着一个或多个线程的地址空间和这些线程锁需要的系统资源


linux的进程有各种状态,用户态,内核态,内核中的就绪睡眠状态,就绪换出状态,睡眠换状态,被抢先,创建,僵尸态等等

创建线程:fork()函数,子进程new_pid返回0,父进程返回创建子线程的pid

pid_t  new_pid;
 new_pid = fork();
 switch(new_pid) {
 case -1 :
      …..
    break;
  case 0 : /*  we are child */
    ……..
    break;
  default  :   /*  we are parent  */
    ……..
   break;
  }

僵尸线程:子进程终止时,它与父进程之间的关联依然保持,这时它将成为一个死(defunct)进程或僵尸(zombie)进程. 直到父进程也正常终止或父进程调用wait才告结束

进程等待:wait调用将阻塞父进程直到状态信息出现

   pid_t wait(int *statloc);
   pid_t waitpid(pid_t pid, int *statloc, int option);
 #include <sys/type.h>
 #include <sys/wait.h>
 #include <unistd.h>
 #include <stdio.h>
 #include <stdlib.h>
 int main()
 {
    pid_t pid;
    char *message;
    int n;
    int exit_code;
     printf( “fork program starting\n”);
     pid = fork();
     Switch (pid)
   {
         case   -1;
             perror( “ fork  failed”);
             exit (1);
      case   0;
             massage = “ This  is the child”;
             n = 5;
             exit_code = 37;
             break;
        defaut:
             massage = “ This is the parent”;
             n = 3;
             exit_code = 0;
              break;
       }
      for (; n> 0; n--)     {
              puts ( message);
              sleep (1);
       }
      if ( pid != 0) {
        int stat_val;
        pid_t child_pid;
        child_pid = wait (&stat_val);
        printf(“Child has finished: PID = %d\n”,child_pid);
        if (WIFEXITED(stat_val))
          printf(“Child exited with code %d\n”,           WEXITSTATUS(stat_val));
        else
          printf(“Child terminated abnormally”);
       }
      exit(exit_code);
      } 

fork program starting

This is the child

This is the parent

This is the parent

This is the child

This is the parent

This is the child

This is the child

This is the child

Child has finished: PID = 1582

Child exited with code 37

waitpid的调用,比wait的接口更加完善,最主要的是使用WNOHANG:pid指定的子进程不是立即可取的,则waitpid不阻塞, 此时返回值为0

#include<sys/wait.h>
pid_t waitpid(pid_t pid, int *statloc, int option);

信号及信号处理

操作系统响应某些条件而产生的一个事件(软中断),信号是发送给进程的特殊消息,具有异步性

常见信号:

信号处理:使用signal函数,它将将一个给定的函数和一个特定的信号联系

  #include < signal.h>
  void ( *signal( int sig, void ( *func) ( int))) ( int);

举例:

#include <signal.h>
 #include <stdio.h>
 #include <unistd.h>
 void ouch(int sig)
 {
   printf(“OUCH! – I got signal %d\n”, sig);
   (void) signal(SIGINT, SIG_DFL);
  }
 int main()
 {
    (void)  signal(SIGINT, ouch);
     while (1)  {
                  printf(“Hello World\n”);
                  sleep(1);
                }
 }

输出:


信号用于子线程的终止:父进程捕捉到SIGCHLD信号,调用该信号的处理函数(可设定父进程不阻塞),若该处理函数中调用了wait系列函数完成子进程的最后清理工作,子进程不会成为僵尸进程

 #include<sys/wait.h>
 #include<signal.h>
 #include<unistd.h>
 #include<stdlib.h>
/*本函数被调用时,说明父进程接收到SIGCHLD信号,有子进程退出*/
  void * sigchld_handler(t sig)
{
     int state; 
while(waitpid(-1,&state,WNOHANG)>0)
                             ;
}
int main()
{
    pid=signal(SIGCHLD,sigchld_handler);//设置信号处理函数
}

多线程并发服务器

线程管理

算法流程:


1.主1 创建套接字并将其绑定到所提供服务的熟知地址上。让该套接字保持非连接


2.主2 将该端口设置为被动模式,使其准备为服务器所用


3.主3 反复调用accept以便接受来自客户的下一个连接请求,并创建新的从线程来处理响应


4.从1 由主线程传递来的连接请求(即针对连接的套接字)开始


5.从2 用该连接与客户进行交互:读取请求并发回响应


6.从3 关闭连接并退出。在处理完来自客户的所有请求后,从线程就退出


创建线程:

#include  <pthread.h>
int  pthread_create(pthread_t   thread,   pthread_attr_t   *attr, void *(*start_routine)(void*), void *arg);

返回值: 0成功,错误号表示失败

thread: 指向pthread_t类型的变量,新创建的线程标识符

attr:指向pthread_attr_t线程属性类型的变量

start_routine: 指向线程函数的指针,线程要执行的代码

arg: 指向线程参数的指针

终止线程:


1.从线程函数中返回,返回值为线程的退出码;

return(retu_val);


2.被同一进程的其他线程终止,即被取消;

pthread_cancel;


3.执行线程退出调用;

pthread_exit;


等待线程终止:

#include  <pthread.h>
int pthread_join(pthread_t  th, void **thread_return);

一个创建线程,运行,退出的例子:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
 void *th_function(void  *arg);
Char message[] = ‘hello word’;
 int  main()   {
   int              res;
   pthread_t   th;
   void *thread_result;
   res = pthread_create(*th, null, th_function, (void *)message);
   if (res i=0) {
      perror(‘Thread creation failed’);
      exit(EXIT_FAILURE);
   }
   printf( ‘ Waiting for thread to finish…\n’);
   res = pthread_join(th, *thread_result);
   if (res i=0) {
      perror(‘Thread join failed’);
      exit(EXIT_FAILURE);
   }
 printf(“Thread joined, it returned %s\n”, (char *) thread_result);
 printf(“Message is now &s\n”, message);
 exit(EXIT_SUCCESS);
 }
void *th_function(void *arg) {
 printf(“ th_function is running, 
                  Argument was %s\n”, (char *)arg);
 sleep(3);
 strcpy(message, “ Bye!”);
 pthread_exit(“ Thank you for the CPU time”);
 }

输出:


Wait for thread to finish…
thread_function is runing, Argment was Hello Word
Thread joined, it returned Thank you for the CPU time
Message is now   BYE!


线程同步

1. 互斥(mutext)

对共享数据项进行排它性访问

例子:主线程反复读入文本,线程函数统计字符数,读入“end”,程序结束

 #include <stdio.h>
 #include <unistd.h>
 #include < stdlib.h>
 #include <string>
 #include <pthread.h>
 #include <semaphore.h>
 void *thread_function( void  *arg );
 pthread_mutex_ t    work_mutex; 
 #define  WORK_SIZE  1024 
 char  work_area( WORK_SIZE );
 int   time_to_exit  =  0;
   int main() {
     int res;
     pthread_t  a_thread;
     void  *thread_result;
      res = pthread_mutex_init(&work_mutex, NULL);
      res = pthread_create(&a_thread, NULL,
                   thread_function, NULL);
       pthread_mutex_lock(&work_mutex);
        print( “ Input some text.  Enter  ‘end’  to finish\n”);
        while  ( !time_to_exit)   {
              fgets( work_area,  WORK_SIZE, stdin);
              pthread_mutex_unlock( &work_mutex);
              while (1)  {       
                pthread_mutex_lock( &work_mutex);
         if ( work_area[0]  != ‘\0’);  {
               pthread_mutex_unlock(&work_mutex);
               sleep(1);
           }
           else  {
                break;
            }
           }
          }
  pthread_mutex_unlock(&work_mutex);
  printf(“\n Waiting for thread to finish…. \n”);
  res = pthread_join( a_thread,   &thread_result);
  if ( res != 0);  {
       perror( “ Thread join failed”);
        exit ( EXIT_FAILURE);
    }
    printf( “ Thread joined\n”);
    pthread_mutex_destroy(&work_mutex);
    exit(EXIT_SUCCESS);
   }
void *thread_function(void *arg) {
     sleep(1);
     pthread_mutex_lock(&work_mutex);
     while ( strncmp( “end”, work_area, 3) !=0 ) {
     printf( “You input %d characters\n”, strlen(work_area) -1);
     work_area[0] = ‘\0’;
     pthread_mutex_unlock(&work_mutex);
     sleep(1);
     pthread_mutex_lock(&work_mutex);
     while (work_area[0] == ‘\0’) {
         pthread_mutex_unlock(&work_mutex);
         sleep(1);
         pthread_mutex_lock(&work_mutex);
     }
  }
  time_to_exit = 1;
  work_area[0] = ‘\0’ ;
  pthread_mutex_unlock(&work_mutex);
}

线程的属性:创建线程的时候,可以指定脱离的还是非脱离的,这就是线程的属性,进行属性初始化对象,并回收

pthread_attr_t  thread_attr;
  res = pthread_attr_init(&thread_attr);
  if (res !=0 ) {
      perror(“Attribute creation failed”);
      exit(EXIT_FAILURE);
  }
res = pthread_create(&a_thread, &pthread_attr,  thread_funtion,
                                        (void *)message);
  if (res != 0) {
       perror(“Thread creation failed”);
       exit(EXIT_FAILURE);
   }
   (void)pthread_attr_destroy(&thread_attr);
    while ( !thread_finished)  {
          printf(“Waiting for thread to say it’s finished…\n”);
          sleep(1);
     printf(“Other thread finished, bye!\n”);
     exit(EXIT_SUCCESS);
}

2. 信号量(semaphore)


用来控制并发访问的线程数量,它通过协调各个线程来保证资源的合理使用。其内部使用的是AQS机制(concurrent包下很多类实现原理都是基于AQS)。Semaphore实现控制并发线程数可以抽象为停车场模型,一个固定车位的停车场,当车位满了,便不再允许新的车辆进入;若当前车库驶出多少辆,则就允许进入多少辆

/**
 * @desc:信号量测试:50个管理员同时调用某个很耗时的接口,至多允许三个管理员同时调用,其余的排队等候
 **/
public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3, true);
        ExecutorService executor = Executors.newFixedThreadPool(50);
        for (int i = 0; i < 50; i++) {
            executor.execute(() -> {
                try {
                    semaphore.acquire();
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + "号管理员获取许可,任务开始");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName() + "号管理员释放许可,任务结束");
                }
            });
        }
        executor.shutdown();
    }
}
//输出结果:
pool-1-thread-2号管理员获取许可,任务开始
pool-1-thread-3号管理员获取许可,任务开始
pool-1-thread-1号管理员获取许可,任务开始
pool-1-thread-3号管理员释放许可,任务结束
pool-1-thread-2号管理员释放许可,任务结束
pool-1-thread-1号管理员释放许可,任务结束
pool-1-thread-5号管理员获取许可,任务开始
pool-1-thread-6号管理员获取许可,任务开始
pool-1-thread-4号管理员获取许可,任务开始
……

3. 条件变量(condition variable)

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒,参考Synchronized/wait()/notify/notifyAll()

// 1.构造带有lock和condition的线程
// 2.
try {
  lock.lock();
  condition.await();//或者condition.signal/signalAll()
  //do something
  } catch (InterruptedException e) {
    e.printStackTrace();
  }finally {
    lock.unlock();
  }

单线程并发服务器

由于很多服务器提供的服务几乎与时间分片无关,下层互联网使用离散的分组交付数据,因而数据是以突发方式(而不是平稳的数据流方式)到达服务器的。服务器上的每个从线程将大部分时间花在read调用中,即它被阻塞以等待下一个突发数据的到达


即,用多线程来进行分片,根本就是浪费资源,因为分到的时间也绝大部分用于IO的阻塞了


所以要用到数据驱动:对于仅需对每个请求进行很少处理的简单服务,采用由数据到达来驱动服务的机制,若并发服务器处理每个请求仅需很少的时间,通常它就按顺序方式执行,而执行由数据的到达驱动。只有在工作量太大,以致CPU不能顺序执行时,分时机制才取而代之


单线程服务器算法


1.创建套接字(msock)并将其绑定到所提供服务的熟知端口上。将该套接字加到一个表中,该表中的项是可以进行I/O的描述符

2.使用select在已有的套接字上等待I/O

3.如果msock准备就绪,使用accept获得下一个连接,并将这个新的套接字加入到表中。

4.如果是msock以外的某些套接字准备就绪,就使用recv或read获得下一个请求,构造响应,用send或write将响应发回给客户

5.继续按照以上的步骤2进行处理


作业:采用事件I/O机制设计并实现单线程并发ECHO服务器

/*
    创建AIO(异步,非阻塞)的服务器端:
    相关的类:
        是JDK1.7之后出现的
        java.nio.channels.AsynchronousServerSocketChannel:用于面向流的侦听套接字的异步通道。
    获取对象的方法:
        static AsynchronousServerSocketChannel open​() 打开异步服务器套接字通道。
    成员方法:
        AsynchronousServerSocketChannel bind​(SocketAddress local) 给服务器绑定指定的端口号
        void accept​(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler) 接受连接。
        参数:
           A attachment:附件,可以传递null
           CompletionHandler  handler:事件处理相关的类,用于处理accept方法监听到的事件
           CompletionHandler也叫回调函数,客户端请求服务器之后,会自动执行CompletionHandler接口中的方法
       CompletionHandler接口中的常用方法:用于消除异步I / O操作结果的处理程序。
            void completed​(V result, A attachment) 客户端连接成功执行的方法
            void failed​(Throwable exc, A attachment) 客户端连接失败执行的方法
    实现步骤:
        1.创建异步非阻塞AsynchronousServerSocketChannel服务器对象
        2.使用AsynchronousServerSocketChannel对象中的方法bind绑定指定的端口号
        3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端请求
 */
public class TCPServer {
    public static void main(String[] args) throws IOException {
        //1.创建异步非阻塞AsynchronousServerSocketChannel服务器对象
        AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
        //2.使用AsynchronousServerSocketChannel对象中的方法bind绑定指定的端口号
        serverSocketChannel.bind(new InetSocketAddress(8888));
        //3.使用AsynchronousServerSocketChannel对象中的方法accept监听客户端请求
        serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
            //客户端连接成功执行的方法
            @Override
            public void completed(AsynchronousSocketChannel result, Object attachment) {
                System.out.println("有客户端连接成功!");
                //获取客户端发送的数据
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                //Future<Integer> future = result.read(buffer);//是一个阻塞的方法,没有读取到客户端发送的数据,会阻塞
                /*
                    void read​(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler) 是一个非阻塞的方法
                    参数
                        ByteBuffer dst - 要传输字节的缓冲区
                        long timeout - 完成I / O操作的最长时间
                        TimeUnit unit - timeout参数的时间单位
                        attachment - 要附加到I / O操作的对象; 可以是null
                        CompletionHandler handler - 消费结果的处理程序,回调函数
                            客户端给服务器发送数据之后,就会执行回调函数中的方法
                 */
                result.read(buffer, 10, TimeUnit.SECONDS, null, new CompletionHandler<Integer, Object>() {
                    @Override
                    public void completed(Integer result, Object attachment) {
                        System.out.println("读取客户端发送数据成功,执行的方法");
                        buffer.flip();//缩小limit的范围
                        String msg = new String(buffer.array(), 0, buffer.limit());
                        System.out.println("服务器读取到客户端发送的信息:"+msg);
                    }
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("读取客户端发送数据失败,执行的方法");
                    }
                });
                System.out.println("readu读取数据的方法执行完毕!");
            }
            //客户端连接失败执行的方法
            @Override
            public void failed(Throwable exc, Object attachment) {
                System.out.println("有客户端连接失败!");
            }
        });
        System.out.println("accept方法执行结束...");
        while (true){}
    }
}
相关文章
|
4月前
|
机器学习/深度学习 人工智能 运维
企业内训|LLM大模型在服务器和IT网络运维中的应用-某日企IT运维部门
本课程是为某在华日资企业集团的IT运维部门专门定制开发的企业培训课程,本课程旨在深入探讨大型语言模型(LLM)在服务器及IT网络运维中的应用,结合当前技术趋势与行业需求,帮助学员掌握LLM如何为运维工作赋能。通过系统的理论讲解与实践操作,学员将了解LLM的基本知识、模型架构及其在实际运维场景中的应用,如日志分析、故障诊断、网络安全与性能优化等。
129 2
|
2月前
|
缓存 负载均衡 监控
HTTP代理服务器在网络安全中的重要性
随着科技和互联网的发展,HTTP代理IP中的代理服务器在企业业务中扮演重要角色。其主要作用包括:保护用户信息、访问控制、缓存内容、负载均衡、日志记录和协议转换,从而在网络管理、性能优化和安全性方面发挥关键作用。
94 2
|
3月前
|
弹性计算 监控 数据库
制造企业ERP系统迁移至阿里云ECS的实例,详细介绍了从需求分析、数据迁移、应用部署、网络配置到性能优化的全过程
本文通过一个制造企业ERP系统迁移至阿里云ECS的实例,详细介绍了从需求分析、数据迁移、应用部署、网络配置到性能优化的全过程,展示了企业级应用上云的实践方法与显著优势,包括弹性计算资源、高可靠性、数据安全及降低维护成本等,为企业数字化转型提供参考。
88 5
|
4月前
|
存储 安全 数据可视化
提升网络安全防御有效性,服务器DDoS防御软件解读
提升网络安全防御有效性,服务器DDoS防御软件解读
98 1
提升网络安全防御有效性,服务器DDoS防御软件解读
|
3月前
|
存储 关系型数据库 MySQL
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
查询服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
1287 2
|
4月前
|
Kubernetes 应用服务中间件 nginx
搭建Kubernetes v1.31.1服务器集群,采用Calico网络技术
在阿里云服务器上部署k8s集群,一、3台k8s服务器,1个Master节点,2个工作节点,采用Calico网络技术。二、部署nginx服务到k8s集群,并验证nginx服务运行状态。
1407 1
|
4月前
|
安全 区块链 数据库
|
4月前
|
测试技术
评测 AlibabaCloud 阿里云国际版 香港轻量云服务器的性能和网络怎么样
评测 AlibabaCloud 阿里云国际版 香港轻量云服务器的性能和网络怎么样
|
2天前
|
存储 机器学习/深度学习 人工智能
2025年阿里云GPU服务器租用价格、选型策略与应用场景详解
随着AI与高性能计算需求的增长,阿里云提供了多种GPU实例,如NVIDIA V100、A10、T4等,适配不同场景。2025年重点实例中,V100实例GN6v单月3830元起,适合大规模训练;A10实例GN7i单月3213.99元起,适用于混合负载。计费模式有按量付费和包年包月,后者成本更低。针对AI训练、图形渲染及轻量级推理等场景,推荐不同配置以优化成本和性能。阿里云还提供抢占式实例、ESSD云盘等资源优化策略,支持eRDMA网络加速和倚天ARM架构,助力企业在2025年实现智能计算的效率与成本最优平衡。 (该简介为原文内容的高度概括,符合要求的字符限制。)
|
3天前
|
存储 弹性计算 人工智能
2025年阿里云企业云服务器ECS选购与配置全攻略
本文介绍了阿里云服务器的核心配置选择方法论,涵盖算力需求分析、网络与存储设计、地域部署策略三大维度。针对不同业务场景,如初创企业官网和AI模型训练平台,提供了具体配置方案。同时,详细讲解了购买操作指南及长期运维优化建议,帮助用户快速实现业务上云并确保高效运行。访问阿里云官方资源聚合平台可获取更多最新产品动态和技术支持。

热门文章

最新文章