理解muduo之服务端编程

理解muduo之服务端编程

本文例程描述的是muduo单Reactor服务端编程

0. 应用服务端程序

InetAddress serverAddr(port)j将设置为固定端口,可接收任何连接,即ip地址设置为INADDR_ANY

以下为muduo例程asio_chat_server的代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// examples/asio/chat/server.cc
#include "examples/asio/chat/codec.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"

#include "muduo/base/Logging.h"

#include <set>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class ChatServer : noncopyable
{
public:
ChatServer(EventLoop* loop,
const InetAddress& listenAddr)
: server_(loop, listenAddr, "ChatServer"),
codec_(std::bind(&ChatServer::onStringMessage, this, _1, _2, _3))
{
server_.setConnectionCallback(
std::bind(&ChatServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
}

void start()
{
server_.start();
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->peerAddress().toIpPort() << " -> "
<< conn->localAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");

if (conn->connected())
{
connections_.insert(conn); // 插入连接集群
}
else
{
connections_.erase(conn);
}
}

void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}

typedef std::set<TcpConnectionPtr> ConnectionList;
TcpServer server_;
LengthHeaderCodec codec_;
ConnectionList connections_;
};

int main(int argc, char* argv[])
{
Logger::setLogLevel(Logger::TRACE);
LOG_INFO << "pid = " << getpid();
if (argc > 1)
{
EventLoop loop; // main reactor
uint16_t port = static_cast<uint16_t>(atoi(argv[1]));
InetAddress serverAddr(port);
ChatServer server(&loop, serverAddr);
server.start();
loop.loop();
}
else
{
printf("Usage: %s port\n", argv[0]);
}
}

1. loop初始化

服务端直接将loop循环绑定到主线程模式,网络模型为单Reactor模型。此时acceptor与TcpCpnnection在同一线程。

loop循环的建立与客户端基本一致。

2. 服务端如何建立TCP连接

该章节是本文的重点,主要目的为理解acceptor接收器。

  • 各对象的实例化

TcpServer的实例化,进而初始化了接收器Acceptor与线程池EventLoopThreadPool

在接收器中,首先通过sockets::createNonblockingOrDie创建了一个使用TCP的非阻塞socket。并将sockfd绑定到acceptChannel_。然后设置地址地址复用,端口复用,bind绑定地址等。最后设置读事件回调。

在这一步中,实际上完成了socket中的socketbind

注意,muduo使用了idleFd_来处理当服务端达到最大连接时的情况。具体操作是,事先申请一个/dev/null的文件fd, 然后在errno == EMFILE的情况下,先关闭fd,接收一个TCP连接后,将TCP连接关闭,最后再申请一个/dev/null的文件fd,留着下次备用。这样就可以将服务端无法accept连接转变为接收连接,然后关闭连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// muduo/net/Acceptor.cc
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}

另外,在TcpServer中,设置了连接的回调处理函数。

  • listen设置被动sockfd

在服务端应用程序中,将调用ChatServer中的start方法,进而调用TcpServer中的start方法。

get_pointer(acceptor_)为传递类的原始指针。

1
2
3
4
5
6
7
8
9
10
11
12
// muduo/net/TcpServer.cc
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_); // 若回调函数没有设置将怎么处理

assert(!acceptor_->listening());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_))); // 设置监听模式
}
}
1
2
3
4
5
6
7
8
// muduo/net/Acceptor.cc
void Acceptor::listen()
{
loop_->assertInLoopThread();
listening_ = true;
acceptSocket_.listen(); // 设置被动socket
acceptChannel_.enableReading(); // 注册读事件
}
  • accept建立连接

当客户端connect时,服务端触发读事件,读事件处理函数为接收器的handleRead函数。在建立连接后,执行newConnectionCallback_。该回调处理函数在TcpServer中设置,为TcpServer::newConnection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// muduo/net/Acceptor.cc
void Acceptor::handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr); // 建立连接
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr); // 连接回调
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can't" in libev's doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
  • 连接成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// muduo/net/TcpServer.cc
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop(); // 单Reactor只有一个loop
char buf[64];
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));
connections_[connName] = conn;
conn->setConnectionCallback(connectionCallback_);
conn->setMessageCallback(messageCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
// 连接建立完成
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
1
2
3
4
5
6
7
8
9
10
11
// muduo/net/TcpConnection.cc
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 将channel加入epoll

connectionCallback_(shared_from_this()); // 在这里真正执行client中的onConnect回调
}

3. 如何发送与接收数据

发送数据与接收数据的流程与客户端差别不大,回显主要是在消息处理函数中,将收到的数据发出去,而且是发送给建立连接的每一个客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
// examples/asio/chat/server.cc 
void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
for (ConnectionList::iterator it = connections_.begin();
it != connections_.end();
++it)
{
codec_.send(get_pointer(*it), message);
}
}

4. 日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fzy@fzy-Lenovo:~/Downloads/03_net_lib/build/release-cpp11/bin$ ./asio_chat_server 18799
20230424 05:52:06.629547Z 17693 INFO pid = 17693 - server.cc:74
# timerQueue
20230424 05:52:06.629617Z 17693 TRACE updateChannel fd = 4 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:06.629641Z 17693 TRACE update epoll_ctl op = ADD fd = 4 event = { 4: IN PRI } - EPollPoller.cc:179
20230424 05:52:06.629867Z 17693 DEBUG EventLoop EventLoop created 0x7FFC0780DD80 in thread 17693 - EventLoop.cc:79
# wakeupChannel_->enableReading()
20230424 05:52:06.629876Z 17693 TRACE updateChannel fd = 5 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:06.629882Z 17693 TRACE update epoll_ctl op = ADD fd = 5 event = { 5: IN PRI } - EPollPoller.cc:179
# acceptChannel_.enableReading();
20230424 05:52:06.629975Z 17693 TRACE updateChannel fd = 6 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:06.629984Z 17693 TRACE update epoll_ctl op = ADD fd = 6 event = { 6: IN PRI } - EPollPoller.cc:179
# loop.loop()
20230424 05:52:06.629999Z 17693 TRACE loop EventLoop 0x7FFC0780DD80 start looping - EventLoop.cc:111
20230424 05:52:06.630005Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57
20230424 05:52:16.631134Z 17693 TRACE poll nothing happened - EPollPoller.cc:75
20230424 05:52:16.631180Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57
20230424 05:52:26.641613Z 17693 TRACE poll nothing happened - EPollPoller.cc:75
20230424 05:52:26.641659Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57
# 读事件的发生
20230424 05:52:28.262505Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:28.262555Z 17693 TRACE printActiveChannels {6: IN } - EventLoop.cc:277
20230424 05:52:28.262577Z 17693 TRACE handleEventWithGuard 6: IN - Channel.cc:86
20230424 05:52:28.262619Z 17693 INFO TcpServer::newConnection [ChatServer] - new connection [ChatServer-0.0.0.0:18799#1] from 127.0.0.1:33996 - TcpServer.cc:80
20230424 05:52:28.262659Z 17693 DEBUG TcpConnection TcpConnection::ctor[ChatServer-0.0.0.0:18799#1] at 0x563CE5C6F9B0 fd=8 - TcpConnection.cc:61
# 将建立连接的fd(channel)注册进epoll
20230424 05:52:28.262677Z 17693 TRACE updateChannel fd = 8 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:28.262691Z 17693 TRACE update epoll_ctl op = ADD fd = 8 event = { 8: IN PRI } - EPollPoller.cc:179
# 连接建立
20230424 05:52:28.262711Z 17693 INFO 127.0.0.1:33996 -> 127.0.0.1:18799 is UP - server.cc:39
# 等待事件发生
20230424 05:52:28.262723Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57
20230424 05:52:38.272934Z 17693 TRACE poll nothing happened - EPollPoller.cc:75
20230424 05:52:38.272972Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57
# 收到客户端发送的数据,读事件发生
20230424 05:52:47.287853Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:47.287885Z 17693 TRACE printActiveChannels {8: IN } - EventLoop.cc:277
20230424 05:52:47.287915Z 17693 TRACE handleEventWithGuard 8: IN - Channel.cc:86
20230424 05:52:47.288136Z 17693 TRACE poll fd total count 4 - EPollPoller.cc:57
20230424 05:52:56.887515Z 17693 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:56.887543Z 17693 TRACE printActiveChannels {8: IN } - EventLoop.cc:277
20230424 05:52:56.887567Z 17693 TRACE handleEventWithGuard 8: IN - Channel.cc:86
20230424 05:52:56.887582Z 17693 TRACE handleClose fd = 8 state = kConnected - TcpConnection.cc:411
20230424 05:52:56.887589Z 17693 TRACE updateChannel fd = 8 events = 0 index = 1 - EPollPoller.cc:111
20230424 05:52:56.887598Z 17693 TRACE update epoll_ctl op = DEL fd = 8 event = { 8: } - EPollPoller.cc:179
20230424 05:52:56.887626Z 17693 INFO 127.0.0.1:33996 -> 127.0.0.1:18799 is DOWN - server.cc:39
20230424 05:52:56.887648Z 17693 INFO TcpServer::removeConnectionInLoop [ChatServer] - connection ChatServer-0.0.0.0:18799#1 - TcpServer.cc:109
20230424 05:52:56.887665Z 17693 TRACE removeChannel fd = 8 - EPollPoller.cc:155
20230424 05:52:56.887679Z 17693 DEBUG ~TcpConnection TcpConnection::dtor[ChatServer-0.0.0.0:18799#1] at 0x563CE5C6F9B0 fd=8 state=kDisconnected - TcpConnection.cc:68
20230424 05:52:56.887802Z 17693 TRACE poll fd total count 3 - EPollPoller.cc:57


理解muduo之服务端编程
http://ziyangfu.github.io/2023/04/26/理解muduo之服务端编程/
作者
FZY
发布于
2023年4月26日
许可协议