理解muduo之服务端编程
本文例程描述的是muduo单Reactor服务端编程
0. 应用服务端程序 InetAddress serverAddr(port)
j将设置为固定端口,可接收任何连接,即ip地址设置为INADDR_ANY
。
以下为muduo例程asio_chat_server的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 #include "examples/asio/chat/codec.h" #include "muduo/base/Logging.h" #include "muduo/base/Mutex.h" #include "muduo/net/EventLoop.h" #include "muduo/net/TcpServer.h" #include "muduo/base/Logging.h" #include <set> #include <stdio.h> #include <unistd.h> using namespace muduo;using namespace muduo::net;class ChatServer : noncopyable { public : ChatServer (EventLoop* loop, const InetAddress& listenAddr) : server_ (loop, listenAddr, "ChatServer" ), codec_ (std::bind (&ChatServer::onStringMessage, this , _1, _2, _3)) { server_.setConnectionCallback ( std::bind (&ChatServer::onConnection, this , _1)); server_.setMessageCallback ( std::bind (&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3)); } void start () { server_.start (); } private : void onConnection (const TcpConnectionPtr& conn) { LOG_INFO << conn->peerAddress ().toIpPort () << " -> " << conn->localAddress ().toIpPort () << " is " << (conn->connected () ? "UP" : "DOWN" ); if (conn->connected ()) { connections_.insert (conn); } else { connections_.erase (conn); } } void onStringMessage (const TcpConnectionPtr&, const string& message, Timestamp) { for (ConnectionList::iterator it = connections_.begin (); it != connections_.end (); ++it) { codec_.send (get_pointer (*it), message); } } typedef std::set<TcpConnectionPtr> ConnectionList; TcpServer server_; LengthHeaderCodec codec_; ConnectionList connections_; };int main (int argc, char * argv[]) { Logger::setLogLevel (Logger::TRACE); LOG_INFO << "pid = " << getpid (); if (argc > 1 ) { EventLoop loop; uint16_t port = static_cast <uint16_t >(atoi (argv[1 ])); InetAddress serverAddr (port) ; ChatServer server (&loop, serverAddr) ; server.start (); loop.loop (); } else { printf ("Usage: %s port\n" , argv[0 ]); } }
1. loop初始化 服务端直接将loop循环绑定到主线程模式,网络模型为单Reactor模型。此时acceptor与TcpCpnnection在同一线程。
loop循环的建立与客户端基本一致。
2. 服务端如何建立TCP连接 该章节是本文的重点,主要目的为理解acceptor接收器。
TcpServer
的实例化,进而初始化了接收器Acceptor
与线程池EventLoopThreadPool
。
在接收器中,首先通过sockets::createNonblockingOrDie
创建了一个使用TCP的非阻塞socket。并将sockfd绑定到acceptChannel_
。然后设置地址地址复用,端口复用,bind绑定地址等。最后设置读事件回调。
在这一步中,实际上完成了socket中的socket
与bind
。
注意,muduo使用了idleFd_来处理当服务端达到最大连接时的情况。具体操作是,事先申请一个/dev/null
的文件fd, 然后在errno == EMFILE
的情况下,先关闭fd,接收一个TCP连接后,将TCP连接关闭,最后再申请一个/dev/null
的文件fd,留着下次备用。这样就可以将服务端无法accept连接转变为接收连接,然后关闭连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 Acceptor::Acceptor (EventLoop* loop, const InetAddress& listenAddr, bool reuseport) : loop_ (loop), acceptSocket_ (sockets::createNonblockingOrDie (listenAddr.family ())), acceptChannel_ (loop, acceptSocket_.fd ()), listening_ (false ), idleFd_ (::open ("/dev/null" , O_RDONLY | O_CLOEXEC)) { assert (idleFd_ >= 0 ); acceptSocket_.setReuseAddr (true ); acceptSocket_.setReusePort (reuseport); acceptSocket_.bindAddress (listenAddr); acceptChannel_.setReadCallback ( std::bind (&Acceptor::handleRead, this )); }
另外,在TcpServer
中,设置了连接的回调处理函数。
在服务端应用程序中,将调用ChatServer
中的start
方法,进而调用TcpServer
中的start
方法。
get_pointer(acceptor_)
为传递类的原始指针。
1 2 3 4 5 6 7 8 9 10 11 12 void TcpServer::start () { if (started_.getAndSet (1 ) == 0 ) { threadPool_->start (threadInitCallback_); assert (!acceptor_->listening ()); loop_->runInLoop ( std::bind (&Acceptor::listen, get_pointer (acceptor_))); } }
1 2 3 4 5 6 7 8 void Acceptor::listen () { loop_->assertInLoopThread (); listening_ = true ; acceptSocket_.listen (); acceptChannel_.enableReading (); }
当客户端connect
时,服务端触发读事件,读事件处理函数为接收器的handleRead
函数。在建立连接后,执行newConnectionCallback_
。该回调处理函数在TcpServer
中设置,为TcpServer::newConnection
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void Acceptor::handleRead () { loop_->assertInLoopThread (); InetAddress peerAddr; int connfd = acceptSocket_.accept (&peerAddr); if (connfd >= 0 ) { if (newConnectionCallback_) { newConnectionCallback_ (connfd, peerAddr); } else { sockets::close (connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead" ; if (errno == EMFILE) { ::close (idleFd_); idleFd_ = ::accept (acceptSocket_.fd (), NULL , NULL ); ::close (idleFd_); idleFd_ = ::open ("/dev/null" , O_RDONLY | O_CLOEXEC); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 void TcpServer::newConnection (int sockfd, const InetAddress& peerAddr) { loop_->assertInLoopThread (); EventLoop* ioLoop = threadPool_->getNextLoop (); char buf[64 ]; snprintf (buf, sizeof buf, "-%s#%d" , ipPort_.c_str (), nextConnId_); ++nextConnId_; string connName = name_ + buf; LOG_INFO << "TcpServer::newConnection [" << name_ << "] - new connection [" << connName << "] from " << peerAddr.toIpPort (); InetAddress localAddr (sockets::getLocalAddr(sockfd)) ; TcpConnectionPtr conn (new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)) ; connections_[connName] = conn; conn->setConnectionCallback (connectionCallback_); conn->setMessageCallback (messageCallback_); conn->setWriteCompleteCallback (writeCompleteCallback_); conn->setCloseCallback ( std::bind (&TcpServer::removeConnection, this , _1)); ioLoop->runInLoop (std::bind (&TcpConnection::connectEstablished, conn)); }
1 2 3 4 5 6 7 8 9 10 11 void TcpConnection::connectEstablished () { loop_->assertInLoopThread (); assert (state_ == kConnecting); setState (kConnected); channel_->tie (shared_from_this ()); channel_->enableReading (); connectionCallback_ (shared_from_this ()); }
3. 如何发送与接收数据 发送数据与接收数据的流程与客户端差别不大,回显主要是在消息处理函数中,将收到的数据发出去,而且是发送给建立连接的每一个客户端。
1 2 3 4 5 6 7 8 9 10 11 12 13 void onStringMessage (const TcpConnectionPtr&, const string& message, Timestamp) { for (ConnectionList::iterator it = connections_.begin (); it != connections_.end (); ++it) { codec_.send (get_pointer (*it), message); } }
4. 日志 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 fzy@fzy-Lenovo:~/Downloads/03_net_lib/build/release-cpp11/bin$ ./asio_chat_server 18799 20230424 05:52:06.629547Z 17693 INFO pid = 17693 - server.cc:74 20230424 05:52:06.629617Z 17693 TRACE updateChannel fd = 4 events = 3 index = -1 - EPollPoller.cc:111 20230424 05:52:06.629641Z 17693 TRACE update epoll_ctl op = ADD fd = 4 event = { 4: IN PRI } - EPollPoller.cc:179 20230424 05:52:06.629867Z 17693 DEBUG EventLoop EventLoop created 0x7FFC0780DD80 in thread 17693 - EventLoop.cc:79 20230424 05:52:06.629876Z 17693 TRACE updateChannel fd = 5 events = 3 index = -1 - EPollPoller.cc:111 20230424 05:52:06.629882Z 17693 TRACE update epoll_ctl op = ADD fd = 5 event = { 5: IN PRI } - EPollPoller.cc:179 20230424 05:52:06.629975Z 17693 TRACE updateChannel fd = 6 events = 3 index = -1 - EPollPoller.cc:111 20230424 05:52:06.629984Z 17693 TRACE update epoll_ctl op = ADD fd = 6 event = { 6: IN PRI } - EPollPoller.cc:179 20230424 05:52:06.629999Z 17693 TRACE loop EventLoop 0x7FFC0780DD80 start looping - EventLoop.cc:111 20230424 05:52:06.630005Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57 20230424 05:52:16.631134Z 17693 TRACE poll nothing happened - EPollPoller.cc:75 20230424 05:52:16.631180Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57 20230424 05:52:26.641613Z 17693 TRACE poll nothing happened - EPollPoller.cc:75 20230424 05:52:26.641659Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57 20230424 05:52:28.262505Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66 20230424 05:52:28.262555Z 17693 TRACE printActiveChannels {6: IN } - EventLoop.cc:277 20230424 05:52:28.262577Z 17693 TRACE handleEventWithGuard 6: IN - Channel.cc:86 20230424 05:52:28.262619Z 17693 INFO TcpServer::newConnection [ChatServer] - new connection [ChatServer-0.0.0.0:18799 20230424 05:52:28.262659Z 17693 DEBUG TcpConnection TcpConnection::ctor[ChatServer-0.0.0.0:18799 20230424 05:52:28.262677Z 17693 TRACE updateChannel fd = 8 events = 3 index = -1 - EPollPoller.cc:111 20230424 05:52:28.262691Z 17693 TRACE update epoll_ctl op = ADD fd = 8 event = { 8: IN PRI } - EPollPoller.cc:179 20230424 05:52:28.262711Z 17693 INFO 127.0.0.1:33996 -> 127.0.0.1:18799 is UP - server.cc:39 20230424 05:52:28.262723Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57 20230424 05:52:38.272934Z 17693 TRACE poll nothing happened - EPollPoller.cc:75 20230424 05:52:38.272972Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57 20230424 05:52:47.287853Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66 20230424 05:52:47.287885Z 17693 TRACE printActiveChannels {8: IN } - EventLoop.cc:277 20230424 05:52:47.287915Z 17693 TRACE handleEventWithGuard 8: IN - Channel.cc:86 20230424 05:52:47.288136Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57 20230424 05:52:56.887515Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66 20230424 05:52:56.887543Z 17693 TRACE printActiveChannels {8: IN } - EventLoop.cc:277 20230424 05:52:56.887567Z 17693 TRACE handleEventWithGuard 8: IN - Channel.cc:86 20230424 05:52:56.887582Z 17693 TRACE handleClose fd = 8 state = kConnected - TcpConnection.cc:411 20230424 05:52:56.887589Z 17693 TRACE updateChannel fd = 8 events = 0 index = 1 - EPollPoller.cc:111 20230424 05:52:56.887598Z 17693 TRACE update epoll_ctl op = DEL fd = 8 event = { 8: } - EPollPoller.cc:179 20230424 05:52:56.887626Z 17693 INFO 127.0.0.1:33996 -> 127.0.0.1:18799 is DOWN - server.cc:39 20230424 05:52:56.887648Z 17693 INFO TcpServer::removeConnectionInLoop [ChatServer] - connection ChatServer-0.0.0.0:18799 20230424 05:52:56.887665Z 17693 TRACE removeChannel fd = 8 - EPollPoller.cc:155 20230424 05:52:56.887679Z 17693 DEBUG ~TcpConnection TcpConnection::dtor[ChatServer-0.0.0.0:18799 20230424 05:52:56.887802Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57