network-program

0.socket

TCP

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    int socket(int domain,int type,int protocol);
    /*
    domain 协议簇
    PF_INET IPV4
    PF_INET6 IPV6
    PF_LOCAL 本地通信
    Type 套接字类型
    SOCK_STREAM 面向连接的套接字
    SOCK_DGRAM 面向消息的套接字
    */
  • 端口号用于区分同一操作系统不同套接字,最大(0-65535),0-1023为知名端口。TCP套接字和UDP套接字不会共用端口号,允许重复。

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    //Ip地址表示
    struct{
    sa_family_t sin_family;
    uint16_t sin_port;//16 端口
    struct in_addr sin_adr;//32bitIp地址
    char sin_zero[8];//dumy
    }
    struct in_addr{
    In_addr_t s_addr;//32 bit IPV4地址
    }
    unsigned short htons(unsigned short)//short 2字节 端口转换 主机字节序->网络字节序
    unsigned short ntohs(unsigned short)//
    unsigned long htonl(unsigned long)//long 4字节 IP地址转换 主机字节序->网络字节序
    unsigned long ntohl(unsigned long)
    /*
    char* arr1="1234"
    主机字节序 小端--0x007EAC58 31 32 33 34
    网络字节序 大端--0x4030201(Ip地址)
    */
    in_addr_t inet_addr(const char* str)//将字符串返回32大端整数 IP地址,失败返回INADDR_NONE

    char* inet_ntoa(struct in_addr ar);//32bitIp地址 网络字节序 转为字符串IP


    int bind(int sockfd,struct sockaddr* myaddr,socklen_t addrlen)
    /*
    sockfd 套接字fd
    addrlen 第二个结构体变量长度
    */
    int listen(int sock,int backlog)
    /*
    sock希望进入到等待连接请求状态的套接字文件描述符
    backlog 连接请求等待队列的长度,=5时,最多使5个连接请求进入到队列
    */

    int accept(int sock,struct sockaddr* addr,socklen_t* addr_len)
    /*
    sock 服务器套接字fd
    addr 保存 请求连接的客户端地址信息的结构体
    addrlen 第二个addr结构体长度
    返回用于数据IO的套接字的fd,失败返回-1
    若等待队列为空,accept不会返回,直等到等待队列有新的客户端连接
    */
    int connect(int sock,struct sockaddr* servaddr,socklen_t addrlen);
    /*
    sock 客户端套接字fd
    servaddr 保存目标服务器地址信息的变量结构体
    addrlen 以字节为单位传递给第二个结构体参数servaddr地址变量长度
    出现如下情况返回:
    服务器收到连接请求
    发生断网等异常而中断请求
    */
    客户端在调用connect函数时,在内核中,给客户端套接字分配IP和随机端口
  • 客户端在调用connect函数时,在内核中,给客户端套接字分配IP和随机端口

  • TCP服务器

    • socket()->bind()->listen()->accept()->read()/write()->close()
  • TCP 客户端

    • socket()->connect()->read()/write()->close()
  • TCP IO缓冲

    • write 调用瞬间,数据将移至输出缓冲,read函数调用瞬间,从输入缓冲读取数据

    • IO缓冲在每个套接字中单独存在

    • IO缓冲在创建套接字时自动生成

    • 即使关闭套接字也会继续输出缓冲中遗留的数据

    • 关闭套接字将丢失输入缓冲中的数据

    • write函数 send函数不会在完成向对方主机的数据传输时返回,而是在数据移出到输出缓冲时 。TCP保证对输出缓冲数据的传输。所以,write在数据传输完成时返回

  • TCP套接字

    • 向10个客户端提供服务,守门的服务器套接字+10个服务器端套接字
  • TCP半关闭

    • 1
      2
      3
      4
      5
      6
      int shutdown(itn sock,int howto);
      /*
      SHUT_RD 断开输入流
      SHUT_WR 断开 输出流
      SHUT_RDWR 同时断开I/O流,分两次调用shudown
      */
    • close 函数同时断开IO流,同时发送EOF信息

UDP

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    ssize_t sendto(int sock,void * buff,size_t nbytes,int flags, struct sockaddr* to,socklen_t addrlen);//
    /*
    sock 传输数据的UDP套接字fd
    buff 发送数据缓冲地址
    nbytes 发送的数据长度/bytes
    flags
    to 目标地址信息
    addrlen sockaddr 结构体长度
    返回值 返回传输的字节数 否则失败 -1
    需要向它传送目标地址信息
    */
    ssize_t recvfrom (int sock,void * buff,size_t nbytes,int flags,struct sockaddr* from,socklen_t * addrlen)
    /*
    sock 接受数据fd
    buff 接受数据缓冲地址
    nbytes 可接受的最大字节数
    flags
    from 保存 发送端地址信息
    addrlen 结构体长度
    */
  • UDP服务器

    • sock()->bind()->recvfrom()->sendto()
  • UDP客户端

    • sock()->sendto()->recvfrom()
    • 在调用sendto函数时检测到还未分配IP和端口,自动分配IP和端口号,IP主机IP,端口尚未使用的任意端口
  • 未连接UDP套接字VS已连接UDP套接字

    • 未连接套接字 sendto作用
      • 向UDP套接字注册目标IP和端口号
      • 传输数据
      • 删除UDP套接字中注册的目标地址信息
    • 每次调用都变更目标地址,可以重复利用同一UDP套接字向不同目标传输数据
    • 已连接UDP套接字
      • 通过connect注册目标IP和地址信息
      • sock()->connect()->sendto()->recvfrom()

套接字选项

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    int getsockopt(int sock,int level,int optname,void * optval,socklen_t* optlen)
    /*
    sock
    level 查看可选项的协议层
    optname 可选项名
    optval 保存查看结果的缓冲地址值
    optlen 向optval传递的缓冲大小
    */
    int setsockopt(int sock,int level,itn optname,const void* optval,socklen_t* optlen);
  • TIEM_WAIT状态,服务器发送FIN后,客户端发送ACK,进入到TIME_WAIT状态,防止此ACK丢失,可以重发

  • 客户端每次动态分配端口号,不必担心TIME_WAIT状态,但是不管服务器、客户端只要先断开套接字都会有TIME_WAIT状态。

  • 1
    2
    3
    option=TRUE;
    optlen=sizeof(option)
    setopt(serv_sock,SOL_SOCKET,SO_REUSEADDR,(void*)&option,optlen);
  • 禁用NAGLE算法–TCP

  • 1
    2
    int opt_val=1;
    setopt(sock,IPPROTO_TCP,TCP_NODELAY,(void*)&opt_val,sizeof(opt_val));

多进程服务器

  • 1
    2
    3
    4
    5
    pid_t fork();
    /*
    父进程返回子进程ID
    子进程 返回0
    */
  • 调用fork之后,复制文件描述符,一个套接字对应多个文件描述符,需要关闭无关的套接字文件描述符

  • 僵尸进程

    • 子进程退出后,父进程没有调用wait函数或者wait_pid函数等待子进程结束,又没有显示忽略SIGCHLD信号

    • 占用资源和进程号

    • 销毁–向创建子进程的父进程传递子进程的exit参数值或return语句的返回值

      • 1
        2
        3
        4
        5
        6
        7
        pid_t wait(int * statloc);//函数
        if(WIFEXITED(status))//判断是否正常结束进程
        {
        WEXITSTATUS(status);//子进程结束返回值
        }

        //调用wait时,如果有子进程终止,那么子进程终止时传递的返回值将保存到statloc中,没有进程终止将阻塞
      • 1
        2
        3
        4
        5
        6
        pid_t waitpid(pid_t pid,int* statloc,it options)
        /*
        pid 等待终止的目标子进程id,-1时可以等待任意子进程
        options 传递头文件sys/wait.h文件中常量WNOHANG,即使没有子进程终止也不会进入到阻塞
        返回终止的子进程ID(或0),失败-1
        */
      • 1
        信号和signal函数

进程间通信

  • 管道

    • 1
      2
      3
      4
      5
      6
      7
      int pipe(int filedes[2]);
      /*
      成功时,返回0,失败-1
      filedesc[0] 通过管道接受数据
      filedesc[2] 通过管道发送数据
      */
      //管道单工
    • 管道只允许有血缘关系的进程间通信,内部保持同步机制,从而保证访问数据的一致性

    • 管道随进程,进程在,管道在,进程消失,管道对应的端口也关闭,两个进程都消失,管道也消失

IO复用 select

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    //#define __FD_SETSIZE    1024 否则重新编译内核
    fd_set set;//bit为单位,保存需要监视的文件描述符[0-1024]
    FD_SET(1,&set);//设置1 bit为1
    FD_CLR(2,&set);//清空 第2bit
    FD_ZERO(&set); //清空 set

    int select(int maxfd,fd_set* readset,fd_set* write_set,fd_set* exceptset,const struct timeval* timeout)
    /*
    maxfd 监视对象文件描述符的数量 ---最大的描述符+1 (从0开始)
    readset 将所有关注“是否存在待读取数据”的文件描述符注册到fd_set变量,并传递其地址
    writeset “是否可传输无阻塞数据”
    exceptset "是否发生异常"
    timeout
    返回 -1 -错误,0-超时,>0 表示发生事件的文件描述符数量
    */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  
* select每次调用后,fd_set中保存着发生事件的文件描述符,其余均为0,所以,每次都需要对fd_set重新赋值中。timeout变为超时前剩余时间,每次都需要对timeout重新赋值

## 多种IO函数

* ```c++
ssize_t send(int sockfd,const void* buf,size_t nbytes,int flags);
ssize_t recv(int sockfd,void* buf,size_t nbytes,int flags);
/*
可选项
MSG_OOB 传输带外数据
MSG_PEEK 验证输入缓冲中是否存在接收的数据
MSG_DNOTWAIT 调用IO函数时不阻塞,用于非阻塞I/O
MSG_WAITTAIL 防止函数返回,直到接收全部请求的字节数
*/
  • MSG_OOB并不能传输带外数据,督促消息处理

  • MSG_PEEK | MSG_DNOTWAIT 验证输入缓冲中是否存在接收的数据,读取到的数据并不会从输入缓冲中删除

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    struct iovec{
    void * iov_base;//缓冲地址
    size_t iov_len;//缓冲大小
    }
    //将分散在多个缓冲中的数据一块发送,减少IO函数调用次数
    ssize_t writev(int filedesc,const struct iovec* iov,int iovcnt)
    /*
    filedesv 传输对象的fd
    iov iovec 保存着待发送数据的位置和size
    iovcnt 数据长度

    返回发送的字节数 或者-1
    */
    ssize_t readv(int filedesc,cosnt struct iovec* iov,int iovcnt);
    /*
    返回成功接收的字节数 或者 -1
    */

套接字和标准IO

  • 创建套接字时操作系统会准备I/O缓冲,仅用于维持TCP协议存在。TCP丢失的数据。。。。套接字的删除缓冲

  • 标准I/O提高性能

    • 传输的数据量—-多个小文件中包含的头信息多
    • 数据向输出缓冲移动的次数
    • 缺点
      • 不容易双向通信
      • 有可能频繁调用flush函数
      • 需要以FILE结构体指针的形式返回文件描述符
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    FILE* fdopen(int files,const char* mode)//文件描述符转为FILE*
    /*
    fildes 需要转换的文件描述符
    mode “w” "a"
    r 打开只读文件,r+打开可读写文件,都要求文件必须存在
    w 打开只写文件,w+打开可读写文件,都会将文件长度清零,文件不存在时会创建文件
    a 打开只写文件,a+打开可读写文件,都会将写的内容加到文件后面,文件不存在时创建文件
    */

    int fileno(FILE* stream)//FILE* 转为fd

关于IO分离的其他内容

  • 针对于FILE指针的半关闭

    • 1
      2
      FILE* readfp=fdopen(clnt_sock,"r");
      FILE* writefp=fdopen(clnt_sock,"w");
    • 针对于任意一个FILE指针调用fclose函数时,文件描述符都会关闭(读+写)

    • 销毁所有文件描述符后,套接字才销毁

    • 1
      2
      3
      4
      5
      6
      7
      int dup(int fildes);
      int dup2(int fildes,int fildes2);
      /*
      fildes 需要复制的文件描述符
      fildes2 明确指定文件描述符整数值
      失败返回-1,成功返回复制的文件描述符
      */
    • 无论复制多少文件描述符,都应调用shutdown函数发送EOF并进入半关闭状态

多线程服务器实现

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    int pthread_create(pthread_t* restrict thread,const pthread_attr_t* restrict arrt,void *(* start_routine)(void*),void* restrict arg);
    /*
    thread 保存新创建线程ID的变量地址
    attr 传递线程属性参数,null时,默认属性
    start_rountine 线程处理函数 函数指针
    arg 线程处理函数的参数地址

    成功时返回0,失败时返回其他值
    */

    int pthread_join(pthread_t thread,void** status)
    /*
    thread 该参数值ID的线程终止后才会从该函数返回
    status 保存线程的main函数返回值的指针变量地址值
    */
  • 1
    2
    3

    struct hostent* gethostbyname(const char* hostname);//线程非安全函数
    struct hostent* gethostbyname_r(const char* name,struct hostent* result,char* buff,int buflen,int* h_errnop)//线程安全函数
  • 线程安全函数后缀_r

  • 或者声明头文件前定义 _REENTRANT 宏(重入)

  • 或者编译时 gcc -D_REENTRANT mythread.c -o mthread -lpthreadz

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    int pthread_init(pthread_mutex_t* mutex,const pthread_mutexattr_t* arrt)
    int pthread_mutex_destory(pthread_mutex_t* mutex);
    /*
    mutex创建互斥量时传递保存互斥量的变量地址值,销毁时传递需要销毁的互斥量地址值
    attr 传递即将创建的互斥量属性值,没有特别需要指定的属性时null
    */

    int pthread_mutex_lock(pthread_mutex_t* mutex)
    int pthread_mutex_unlock(pthread_mutex_t* mutex);

    int pthread_join(pthread_t thread);
    /*
    等待线程终止,且引导线程销毁。线程终止前,调用该函数的线程进入到阻塞状态
    */
    int pthread_detach(pthread_t thread);
    /*

    */
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    int sem_init(sem_t* sem,int pshared,unsigned int value);
    int sem_destory(sem_t* sem);
    /*
    sem 创建信号量时传递保存信号量的变量地址值、销毁时传递需要销毁的信号量变量的地址值
    pshared 传递其他值时,创建可由多个进程共享的信号量;0时,创建只允许一个进程内部使用的信号量
    。同一进程内的线程同步=0
    value 指定新创建的信号量的初始值
    */
    int sem_post(sem_t* sem);
    int sem_wait(sem_t* sem);
    /*
    传递保存信号量读取值的变量地址值,传递给sem_post时信号量+1,sem_wait 信号量-1
    信号量的值不能小于0。在信号量值为0的情况下,调用sem_wait函数时,调用函数的线程将进入阻塞状态(函数未返回)
    */

1. epoll

  • epoll_create

  • 1
    2
    3
    4
    5
    6
    int epoll_create(int size)
    /*
    select 中为了存放监视对象文件描述符,声明了一个fd_set变量。epoll类似,由操作系统负责保存监视对象文件描述符,需要向操作系统请求创建保存文件描述符的空间。
    linux2.6.8之后忽略,内核根据情况调整epoll 一个实例的大小
    成功时返回epoll实例文件描述符,失败-1
    */
  • epoll_ctl

  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    /* epoll中监视对象事件 的结构*/
    struct epoll_event{
    __uint32_t events;
    epoll_data_t data;
    }
    typedef union epoll_data{
    void* ptr;
    int fd;
    __UInt32_t u32;
    __uint64_t u64;
    } epoll_data_t;
    /*
    events:
    EPOLLIN :读取数据
    EPOLLOUT:s输出缓冲为空,可以立即发送数据
    EPOLLPRI:收到OOB的情况
    EPOLLRDHUP:断开连接或者半关闭情况,边缘触发时非常有用
    EPOLLERR:发生错误
    EPOLLET:边缘触发的方式得到事件通知
    EPOLLONESHOT:发生一次事件后,相应文件描述符不在收到事件通知。需要向epoll——ctl函数第二个参数
    传递EPOLL_CTL_MOD,再次设置事件
    */
    /*
    epfd epoll实例的文件描述符 epoll_create返回值
    op 监视对象的操作
    fd 监视对象的文件描述符
    event 监视对象的事件类型
    */
    epoll_ctl(int epfd,int op,int fd,struct epoll_event * event);
    /*
    op
    EPOLL_CTL_ADD 文件描述符注册到epoll实例
    EPOLL_CTL_DEL 从epoll例程中删除文件描述符
    EPOLL_CTL_MOD 更改注册的文件描述符的关注事件发生的情况
    */
  • epoll_wait

  • 1
    2
    3
    4
    5
    6
    7
    8
    /*
    返回发生事件的文件描述符的个数
    epfd epoll文件描述符
    events 保存事件发生的文件描述符集合的结构体地址
    maxevents 第二个参数中可以保存的最大事件数
    timeout 1/1000秒为单位的等待事件,-1永不超时,一直等待直至发生事件
    */
    int epoll_wait(int epfd,struct epoll_evetn* events,int maxevents,int timeout)
  • 条件触发–默认方式

    • 只要输入缓冲有数据就会一直通知该事件
    • select模型———-条件触发
  • 边缘触发

    • 边缘触发输入缓冲收到数据时仅注册1次事件,即使输入缓冲中还有数据,也不会再次进行注册。接受数据时,仅仅注册一次事件。所以一旦发生事件,需要读取输入缓冲中的全部数据。

    • 通过以下方式验证输入缓冲是否为空

      • read函数返回-1,且全局变量errno=EAGAIN
    • 设置文件描述符为非阻塞式

      • 1
        2
        3
        4
        5
        int fcntl(int filedes,int cmd,....)
        /*
        int flag=fcntl(fd,F_GETFL,0);//获取之前的属性
        fcntl(fd,F_SETFL,flag|O_NONBLOCK);//设置非阻塞
        */
      • 边缘触发下,以阻塞方式工作的read&write函数有可能引起服务器端的长时间停顿。边缘触发一定采用非阻塞read&write

    • 可以分离接受数据和处理数据的时间点

Appendix

  • 读panda有感

  • threadpool

    • 线程池的任务队列中存放的是IThreadHandle* ,有threadhandle接口,即线程处理函数

      • 1
        2
        3
        4
        5
        typedef Queue::CQueue<IThreadHandle *> queue_handle_t;//线程处理函数
        queue_handle_t m_taskqueue;

        typedef std::vector<pthread_t> vector_tid_t;//创建的线程ID
        vector_tid_t m_thread;
    • 在CThreadPool构造函数中调用create_threadpool 来产生一定数量的线程,并保存创建好的线程的id,每个线程都相应函数都为process_task。这里利用领导者/跟随者模式从任务队列中取出任务并执行thread_handle。

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        CThreadPool::CThreadPool(size_t threadnum, size_t tasknum): 
        m_taskqueue(tasknum), m_hasleader(false)
        {
        m_threadnum = (threadnum > THREADNUM_MAX)? THREADNUM_MAX: threadnum;
        create_threadpool();
        }
        void CThreadPool::create_threadpool()
        {
        pthread_attr_t thread_attr;
        pthread_attr_init(&thread_attr);
        for(size_t i = 0; i < m_threadnum; i++){
        pthread_t tid = 0;
        if(pthread_create(&tid, &thread_attr, process_task, (void *)this) < 0){
        errsys("create thread[%d] filed\n", (int)i);
        continue;
        }

        m_thread.push_back(tid);
        }
        pthread_attr_destroy(&thread_attr);
        trace("create thread pool, thread number %d\n", (int)m_thread.size());
        }
        void *CThreadPool::process_task(void * arg)
        {
        CThreadPool &threadpool = *(CThreadPool *)arg;
        while(true){
        threadpool.promote_leader();
        IThreadHandle *threadhandle = NULL;
        int ret = threadpool.m_taskqueue.pop(threadhandle);
        threadpool.join_follwer();

        if(ret == 0 && threadhandle)
        threadhandle->threadhandle();//IThreadHandle 接口存在threadhandle处理函数
        }

        pthread_exit(NULL);
        }
    • 领导者/跟随者

      • [两种高效的并发模式]: https://blog.csdn.net/ZYZMZM_/article/details/98055416#LeaderFollowers_41

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        void CThreadPool::promote_leader()
        {
        Pthread::CGuard guard(m_identify_mutex);//m_identify_mutex 加锁,析构时,释放锁
        while(m_hasleader){ // more than one thread can return
        m_befollower_cond.wait(m_identify_mutex);
        /*
        如果m_hasleader =true,直接返回,表示当前已存在领导者
        否则,m_befollower_cond条件变量释放m_identify_mutex锁,但是阻塞并等待被唤醒,之后在join_follwer中可以获得锁继续执行,设置m_hasleader=false,并唤醒m_befollower_cond,释放锁m_identify_mutex,之后promote_leader中m_befollower_cond.wait(m_identify_mutex);被唤醒并加锁m_identify_mutex,由于m_hasleader为false,跳出循环
        */
        }
        m_hasleader = true;
        }
        void CThreadPool::join_follwer()
        {
        Pthread::CGuard guard(m_identify_mutex);
        m_hasleader = false;
        m_befollower_cond.signal();
        }
      • 关于pthread_cond_signal即可以放在pthread_mutex_lock和pthread_mutex_unlock之间,也可以放在pthread_mutex_lock和pthread_mutex_unlock之后,但是各有有缺点。

      [再谈互斥锁与条件变量!]: https://blog.csdn.net/brian12f/article/details/73882166?utm_source=copy

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        pthread_mutex_lock
        xxxxxxx
        pthread_cond_signal
        pthread_mutex_unlock
        /*
        缺点:在某下线程的实现中,会造成等待线程从内核中唤醒(由于cond_signal)然后又回到内核空间(因为cond_wait返回后会有原子加锁的 行为),所以一来一回会有性能的问题。但是在LinuxThreads或者NPTL里面,就不会有这个问题,因为在Linux 线程中,有两个队列,分别是cond_wait队列和mutex_lock队列, cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗。
        所以在Linux中推荐使用这种模式。
        */
        pthread_mutex_lock
        xxxxxxx
        pthread_mutex_unlock
        pthread_cond_signal
        /*
        优点:不会出现之前说的那个潜在的性能损耗,因为在signal之前就已经释放锁了
        缺点:如果unlock和signal之前,有个低优先级的线程正在mutex上等待的话,那么这个低优先级的线程就会抢占高优先级的线程(cond_wait的线程),而这在上面的放中间的模式下是不会出现的。
        */
  • epoll异步框架

    • 在CEvent的构造函数中创建一个eventwait_thread 等待线程。

      • 1
        2
        if(pthread_create(&tid, NULL, eventwait_thread, (void *)this) == 0)
        m_detectionthread = tid;
    • 在eventwait_thread 线程中调用epoll_wait函数,并根据监控结果,将文件描述符,事件类型,封装成任务CEvent *,push到线程池的任务的队列中。CEvent继承至IThreadHandle,有重写的threadhandle的函数

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        59
        60
        61
        62
        void *CEvent::eventwait_thread(void *arg)
        {
        CEvent &cevent = *(CEvent *)(arg);//
        if(INVALID_FD(cevent.m_epollfd)){
        seterrno(EINVAL);
        pthread_exit(NULL);
        }
        for(;;){
        int nevent = epoll_wait(cevent.m_epollfd, &cevent.m_eventbuff[0], EventBuffLen, -1);
        if(nevent < 0 && errno != EINTR){
        errsys("epoll wait error\n");
        break;
        }
        for(int i = 0; i < nevent; i++){
        int fd = cevent.m_eventbuff[i].data.fd;
        EventType events = static_cast<EventType>(cevent.m_eventbuff[i].events);
        if(cevent.pushtask(fd, events) == 0x00){
        cevent.m_ithreadpool->pushtask(&cevent);//将CEvent* 对象push到任务队列中,有重写thread_handle函数
        }
        }
        }
        pthread_exit(NULL);
        }
        void CEvent::threadhandle()
        {
        int fd = 0x00;
        EventType events;
        if(poptask(fd, events) < 0){
        return;
        }
        CNetObserver *observer = get_observer(fd);
        if(observer == NULL)
        return;
        /*
        * 关闭时递减引用计数。在对象的所有回调处理完时真正释放
        */
        if(events & ECLOSE){
        cleartask(fd);
        observer->subref();
        }else{
        if(events & EERR){
        observer->handle_error(fd);
        }
        if(events & EIN){
        observer->handle_in(fd);
        }
        if(events & EOUT){
        observer->handle_out(fd);
        }
        }
        /*
        * unregister_event 执行后于handle_close将会出现当前套接字关闭后
        * 在仍未执行完unregister_event时新的连接过来,得到一样的描述符
        * 新的连接调用register_event却未注册进入epoll。同时han_close中
        * 关闭了套接字,unregister_event中epoll删除关闭的套接字报错
        */
        if(observer->subref_and_test()){
        unregister_event(fd);
        observer->handle_close(fd);
        observer->selfrelease();
        }
        }
      • CEvent中threadhandle,调用的是和每个fd所绑定的CNetObserver*的处理函数

        • 1
          2
          3
          4
          typedef std::map<int, CNetObserver *> EventMap_t;
          typedef std::map<int, EventType> EventTask_t;
          EventMap_t m_eventreg;
          EventTask_t m_events;
      • CNetObserver 继承至INetObserver,一方面负责fd的事件处理函数,另一方面负责相应的fd和事件处理函数的同步,为了确保CNetObserver * 的正确销毁时机,用了引用计数功能。

        • 1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          27
          28
          29
          30
          31
          32
          33
          34
          35
          int CEvent::record(int fd, EventType type, IEventHandle *handle)
          {
          CNetObserver *newobserver = new CNetObserver(*handle, type);
          assert(newobserver != NULL);

          Pthread::CGuard guard(m_eventreg_mutex);
          m_eventreg[fd] = newobserver;
          return 0;
          }
          class CNetObserver: public INetObserver
          {
          friend class CEvent;
          public:
          CNetObserver(INetObserver &, EventType);
          ~CNetObserver();
          inline void addref();//
          inline void subref();
          inline bool subref_and_test();
          inline void selfrelease();
          inline EventType get_regevent();
          inline const INetObserver *get_handle();

          protected: //负责fd的事件处理函数
          void handle_in(int);
          void handle_out(int);
          void handle_close(int);
          void handle_error(int);

          private:
          EventType m_regevent;
          INetObserver &m_obj;

          int32_t m_refcount;
          Pthread::CMutex m_refcount_mutex;
          };
    • 在CEvent中负责事件的处理函数,负责向线程池的任务队列Push_task,一个单例的CEventProxy 进一步封装CEvent

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        class CEventProxy: public IEvent
        {
        public:
        static CEventProxy *instance();
        int register_event(int, IEventHandle *,EventType);
        int register_event(Socket::ISocket &, IEventHandle *,EventType);
        int shutdown_event(int);
        int shutdown_event(Socket::ISocket &);

        private:
        CEventProxy(size_t neventmax);
        ~CEventProxy();

        private:
        IEvent *m_event;
        };
    • 为了确保在INetObserver中可以注册事件,关闭事件,在IEventHandle相应接口中调用CEventProxy中的CEvent实现相应接口

      • 1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        class IEventHandle: public INetObserver
        {
        public:
        // desc: 注册进入事件中心
        // param: fd/套接字描述符 type/事件类型
        // return: 0/成功 -1/失败
        int register_event(int fd, EventType type = EDEFULT);

        // desc: 注册进入事件中心
        // param: socket/套接字对象 type/事件类型
        // return: 0/成功 -1/失败
        int register_event(Socket::ISocket &socket, EventType type = EDEFULT);

        // desc: 关闭事件
        // param: fd/套接字描述符
        // return: 0/成功 -1/失败
        int shutdown_event(int fd);

        // desc: 关闭事件
        // param: socket/套接字对象
        // return: 0/成功 -1/失败
        int shutdown_event(Socket::ISocket &);
        };

        int IEventHandle::register_event(int fd, EventType type)
        {
        return CEventProxy::instance()->register_event(fd, this, type);
        }
  • HTTP

    *

文章目录
  1. 1. 0.socket
    1. 1.1. TCP
    2. 1.2. UDP
    3. 1.3. 套接字选项
    4. 1.4. 多进程服务器
    5. 1.5. 进程间通信
    6. 1.6. IO复用 select
    7. 1.7. 套接字和标准IO
    8. 1.8. 关于IO分离的其他内容
    9. 1.9. 多线程服务器实现
  2. 2. 1. epoll
  3. 3. Appendix