理解muduo之客户端编程

理解muduo之客户端编程

0.前言

本文通过跟踪asio_chat_client程序与muduo网络库,试图回答以下两个问题:

  • 如何建立TCP连接
  • 如何发送与接收数据

以例程为抓手,理解muduo网络库的工作机制与源码。

请时刻记住以下几张图,它有助于理解muduo工作机制以及一次循环中各种回调发生的顺序。

image-20230425141742220

muduo网络编程模型之一(默认):多Reactor,即 one loop per thread

image-20230425141433082

muduo网络编程模型之二:多Reactor + 线程池模型(计算线程)

另外2种为:单Reactor与单Reactor+线程池

image-20230425140356426

三者关系

网络编程之Reactor模型

1. asio_chat_client客户端程序

功能:回显,即建立TCP连接后,客户端发送消息,服务端将数据原封发送回来,并显示。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include "examples/asio/chat/codec.h"

#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/net/TcpClient.h"

#include "muduo/base/Logging.h"

#include <iostream>
#include <stdio.h>
#include <unistd.h>

using namespace muduo;
using namespace muduo::net;

class ChatClient : noncopyable
{
public:
ChatClient(EventLoop* loop, const InetAddress& serverAddr)
: client_(loop, serverAddr, "ChatClient"),
codec_(std::bind(&ChatClient::onStringMessage, this, _1, _2, _3))
{
client_.setConnectionCallback(
std::bind(&ChatClient::onConnection, this, _1));
client_.setMessageCallback(
std::bind(&LengthHeaderCodec::onMessage, &codec_, _1, _2, _3));
client_.enableRetry();
}

void connect()
{
client_.connect();
}

void disconnect()
{
client_.disconnect();
}

void write(const StringPiece& message)
{
MutexLockGuard lock(mutex_);
if (connection_)
{
codec_.send(get_pointer(connection_), message);
}
}

private:
void onConnection(const TcpConnectionPtr& conn)
{
LOG_INFO << conn->localAddress().toIpPort() << " -> "
<< conn->peerAddress().toIpPort() << " is "
<< (conn->connected() ? "UP" : "DOWN");

MutexLockGuard lock(mutex_);
if (conn->connected())
{
connection_ = conn;
}
else
{
connection_.reset();
}
}

void onStringMessage(const TcpConnectionPtr&,
const string& message,
Timestamp)
{
printf("<<< %s\n", message.c_str());
}

TcpClient client_;
LengthHeaderCodec codec_;
MutexLock mutex_;
TcpConnectionPtr connection_ GUARDED_BY(mutex_);
};

int main(int argc, char* argv[])
{
Logger::setLogLevel(Logger::TRACE);
LOG_INFO << "pid = " << getpid();
if (argc > 2)
{
EventLoopThread loopThread;
uint16_t port = static_cast<uint16_t>(atoi(argv[2]));
InetAddress serverAddr(argv[1], port); // 设置协议族、IP地址与端口, 默认采用AF_INET

ChatClient client(loopThread.startLoop(), serverAddr);
client.connect();
std::string line;
while (std::getline(std::cin, line))
{
client.write(line);
}
client.disconnect();
CurrentThread::sleepUsec(1000*1000); // wait for disconnect, see ace/logging/client.cc
}
else
{
printf("Usage: %s host_ip port\n", argv[0]);
}
}

2. loop循环的建立

主线程:用来读取输入,下述log中的线程ID:17696

IO线程:处理IO事件,下述log中的线程ID:17697

首先创建loopThread对象, EventLoopThread loopThread,该部分完成的工作是初始化以及绑定线程运行函数threadFunc。

在设置协议族、IP地址与端口后,开始实例化一个对象client。在获得loop后,在构造函数中,设置了连接回调函数、消息回调函数与解编码回调,解编码本文不做描述,读者感兴趣可看源码。

这部分传递2个参数,第一个是loop,即loopThread.startLoop()返回的loop, 第二个是服务端地址。

loopThread.startLoop()这个函数做的工作比较多。

  • 首先是创建一个IO线程,然后在IO线程中实例化一个EventLoop,即实现one loop per thread。

    • 在EventLoop构造函数中,会实例化timerQueue_wakeupChannel_
    • 设置timerQueue_的读事件回调处理函数。
    • timerQueue_设置读事件感兴趣,并将channel注册进epoll。
    • wakeupChannel_执行同样操作。
  • 创建完成后,将IO线程中loop对象的地址传递给主线程。在主线程与IO线程中使用了条件变量作为同步原语。

  • 运行loop.loop

该阶段的运行日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fzy@fzy-Lenovo:~/Downloads/03_net_lib/build/release-cpp11/bin$ ./asio_chat_client localhost 18799
20230424 05:52:28.261481Z 17696 INFO pid = 17696 - client.cc:84
20230424 05:52:28.261568Z 17696 ERROR sockets::fromIpPort - SocketsOps.cc:241
# Eventloop构造函数初始化时,:timerQueue_(new TimerQueue(this)),在timerQueue中设置enableRead
# timerfdChannel_.enableReading();
20230424 05:52:28.261943Z 17697 TRACE updateChannel fd = 4 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:28.261973Z 17697 TRACE update epoll_ctl op = ADD fd = 4 event = { 4: IN PRI } - EPollPoller.cc:179
# Eventloop构造函数,wakeupChannel_->enableReading();
20230424 05:52:28.262045Z 17697 DEBUG EventLoop EventLoop created 0x7FADC3B27B30 in thread 17697 - EventLoop.cc:79
20230424 05:52:28.262055Z 17697 TRACE updateChannel fd = 5 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:28.262062Z 17697 TRACE update epoll_ctl op = ADD fd = 5 event = { 5: IN PRI } - EPollPoller.cc:179
# loop.loop()
20230424 05:52:28.262083Z 17697 TRACE loop EventLoop 0x7FADC3B27B30 start looping - EventLoop.cc:111
20230424 05:52:28.262097Z 17697 TRACE poll fd total count 2 - EPollPoller.cc:57

3. 如何建立TCP连接

一句话描述:

main函数中的client.connect(),怎么执行到ChatClient中的void onConnection(const TcpConnectionPtr& conn)。以下是具体的过程:

  • client.connect()将经过到ChatClient, 调用TcpCient中的connect函数,进而调用start函数。

此时调用runInLoop的线程不是loop线程,因此需要进行唤醒, startInLoop实际在doPendingFunctors中执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// muduo/net/Connector.cc
void Connector::start()
{
connect_ = true;
// 此时调用runInLoop的线程不是loop线程,因此需要进行唤醒, startInLoop实际在doPendingFunctors中执行
loop_->runInLoop(std::bind(&Connector::startInLoop, this)); // FIXME: unsafe
}

void Connector::startInLoop()
{
loop_->assertInLoopThread();
assert(state_ == kDisconnected);
if (connect_)
{
connect(); // 开始准备建立连接
}
else
{
LOG_DEBUG << "do not connect";
}
}
  • connect中,将创建非阻塞socket以及调用socket connect,连接建立成功后,调用connecting函数,并将sockfd传递给connecting函数。
  • connecting中,将建立TCP连接阶段时的写回调,注意,当建立连接后,Channel的回调将转到TCP Connection中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// muduo/net/Connector.cc
void Connector::connecting(int sockfd) // socket连接建立后,处理上层
{
setState(kConnecting);
assert(!channel_);
channel_.reset(new Channel(loop_, sockfd));
channel_->setWriteCallback( // 建立TCP连接阶段时的写回调
std::bind(&Connector::handleWrite, this)); // FIXME: unsafe
channel_->setErrorCallback(
std::bind(&Connector::handleError, this)); // FIXME: unsafe

// channel_->tie(shared_from_this()); is not working,
// as channel_ is not managed by shared_ptr
channel_->enableWriting(); // 最终会把channel添加到epoll中
}
  • 写事件的发生

enableWriting注册写事件,即触发写事件。

  • 写事件的处理以及连接的建立

写事件发生后,在EventLoop::loop()中执行currentActiveChannel_->handleEvent(pollReturnTime_),此时会去处理写事件。而在连接阶段,channel中的写事件的回调函数是由connector负责的,即Connector::handleWrite()函数。

在handleWrite函数中,将经历一次removeAndResetChannel(),完成清除channel事件、将channel从epoll中移除、保存建立连接的sockfd(用于后续重新加入epoll)以及重置channel(释放当前指针所拥有的对象,并将channel_置为nullptr)。

注意,这个channel_.reset()是在handleEvent结束后的doPendingFunctors中处理的。

之后处理socket错误,即sockets::getSocketError,它的作用是再次确认是否成功建立连接。若有自连接事件发生,就处理自连接。getSocketError没有错误的话,就设置状态为kConnected,并执行连接回调,即执行 TcpClient中的 newConnection()

newConnection()中,最重要的事情就是实例化TcpConnection,并设置TcpConnection的回调函数,其中的连接、消息处理以及写完整回调是由上层应用开发者开发的客户端程序中设置的,比如onConnect()。在这个函数的最后,会执行connectEstablished()。它会重新将channel加入epoll,以及执行应用程序onConnect回调。

至此,客户端与服务端的TCP连接就真正建立了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// muduo/net/TcpClient.cc
// 当连接建立后执行, sockfd:已经建立连接后的socket
void TcpClient::newConnection(int sockfd)
{
loop_->assertInLoopThread();
InetAddress peerAddr(sockets::getPeerAddr(sockfd));
char buf[32];
snprintf(buf, sizeof buf, ":%s#%d", peerAddr.toIpPort().c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

InetAddress localAddr(sockets::getLocalAddr(sockfd));
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
// 实例化 TcpConnection,后续通信将通过TcpConnection进行
TcpConnectionPtr conn(new TcpConnection(loop_,
connName,
sockfd,
localAddr,
peerAddr));

conn->setConnectionCallback(connectionCallback_); // 执行应用程序中的onConnect()
conn->setMessageCallback(messageCallback_); // 这些都是客户端应用程序设置的回调函数
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback(
std::bind(&TcpClient::removeConnection, this, _1)); // FIXME: unsafe
{
MutexLockGuard lock(mutex_);
connection_ = conn;
}
conn->connectEstablished(); // 重新将channel加入epoll,以及执行应用程序onConnect回调
}
1
2
3
4
5
6
7
8
9
10
11
// muduo/net/TcpClient.cc
void TcpConnection::connectEstablished()
{
loop_->assertInLoopThread();
assert(state_ == kConnecting);
setState(kConnected);
channel_->tie(shared_from_this());
channel_->enableReading(); // 将channel重新加入epoll

connectionCallback_(shared_from_this()); // 在这里真正执行client中的onConnect回调
}

TcpConnection的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// muduo/net/TcpConnection.cc
TcpConnection::TcpConnection(EventLoop* loop,
const string& nameArg,
int sockfd,
const InetAddress& localAddr,
const InetAddress& peerAddr)
: loop_(CHECK_NOTNULL(loop)),
name_(nameArg),
state_(kConnecting),
reading_(true),
socket_(new Socket(sockfd)),
channel_(new Channel(loop, sockfd)),
localAddr_(localAddr),
peerAddr_(peerAddr),
highWaterMark_(64*1024*1024)
{
// 在建立TCP连接以后
// 处理在loop中运行的channel中的读、写、关闭以及错误回调处理函数
channel_->setReadCallback(
std::bind(&TcpConnection::handleRead, this, _1));
channel_->setWriteCallback(
std::bind(&TcpConnection::handleWrite, this));
channel_->setCloseCallback(
std::bind(&TcpConnection::handleClose, this));
channel_->setErrorCallback(
std::bind(&TcpConnection::handleError, this));
LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this
<< " fd=" << sockfd;
socket_->setKeepAlive(true); // 设置TCP保活机制
}

该阶段日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Connector构造
20230424 05:52:28.262139Z 17696 DEBUG Connector ctor[0x5603D49F74A0] - Connector.cc:31
# TcpClient构造
20230424 05:52:28.262175Z 17696 INFO TcpClient::TcpClient[ChatClient] - connector 0x5603D49F74A0 - TcpClient.cc:69
# client.connect();
20230424 05:52:28.262187Z 17696 INFO TcpClient::connect[ChatClient] - connecting to 0.0.0.0:18799 - TcpClient.cc:107
# 有上层事件发生,唤醒loop线程
20230424 05:52:28.262227Z 17697 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:28.262241Z 17697 TRACE printActiveChannels {5: IN } - EventLoop.cc:277
20230424 05:52:28.262262Z 17697 TRACE handleEventWithGuard 5: IN - Channel.cc:86
# 将建立的channel(socket)更新到epoll中
20230424 05:52:28.262454Z 17697 TRACE updateChannel fd = 6 events = 4 index = -1 - EPollPoller.cc:111
20230424 05:52:28.262470Z 17697 TRACE update epoll_ctl op = ADD fd = 6 event = { 6: OUT } - EPollPoller.cc:179
# 继续阻塞在epoll_wait
20230424 05:52:28.262492Z 17697 TRACE poll fd total count 3 - EPollPoller.cc:57
# 写事件发生,刚注册当然可写
20230424 05:52:28.262503Z 17697 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:28.262508Z 17697 TRACE printActiveChannels {6: OUT } - EventLoop.cc:277
20230424 05:52:28.262516Z 17697 TRACE handleEventWithGuard 6: OUT - Channel.cc:86
# void Connector::handleWrite()
20230424 05:52:28.262541Z 17697 TRACE handleWrite Connector::handleWrite 1 - Connector.cc:160
20230424 05:52:28.262546Z 17697 TRACE updateChannel fd = 6 events = 0 index = 1 - EPollPoller.cc:111
20230424 05:52:28.262559Z 17697 TRACE update epoll_ctl op = DEL fd = 6 event = { 6: } - EPollPoller.cc:179
20230424 05:52:28.262572Z 17697 TRACE removeChannel fd = 6 - EPollPoller.cc:155
# TcpConnection构造函数
20230424 05:52:28.262610Z 17697 DEBUG TcpConnection TcpConnection::ctor[ChatClient:127.0.0.1:18799#1] at 0x7FADBC001120 fd=6 - TcpConnection.cc:61
# void TcpConnection::connectEstablished() enableReading()
20230424 05:52:28.262626Z 17697 TRACE updateChannel fd = 6 events = 3 index = -1 - EPollPoller.cc:111
20230424 05:52:28.262635Z 17697 TRACE update epoll_ctl op = ADD fd = 6 event = { 6: IN PRI } - EPollPoller.cc:179
# client.cc中的onConnect()
20230424 05:52:28.262655Z 17697 INFO 127.0.0.1:33996 -> 127.0.0.1:18799 is UP - client.cc:53

在连接建立的过程中,我们发现channel经历了加入epoll,又移除,后续又加入的过程。即为什么存在removeAndResetChannel这个函数。

==TODO 解答==

4. 如何发送与接收数据

一句话描述:

main函数中的client.write(line),怎么执行到ChatClient中的void onStringMessage

4.1 发送过程

  • 主线程读取一行用户输入,调用ChatClientwrite方法。最终调用TcpConnection::send()发送数据。关于buffer的处理将在后续专题叙述。
  • 在send方法中,由于此时在主线程上,因此需要唤醒IO线程,并在IO线程中执行发送任务。唤醒与执行如前所述。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// muduo/net/TcpConnection.cc
void TcpConnection::send(Buffer* buf)
{
if (state_ == kConnected)
{
if (loop_->isInLoopThread())
{
sendInLoop(buf->peek(), buf->readableBytes());
buf->retrieveAll();
}
else
{ // 定义一个指向sendInLoop的函数指针sendInLoop
void (TcpConnection::*fp)(const StringPiece& message) = &TcpConnection::sendInLoop;
loop_->runInLoop( // 唤醒后在IO线程中执行
std::bind(fp,
this, // FIXME
buf->retrieveAllAsString()));
//std::forward<string>(message)));
}
}
}
  • TcpConnection::sendInLoop(const void* data, size_t len)函数中,如果输出队列上没有数据,将直接将消息发送出去,反之则将数据追加到outputBuffer_。若应用程序设置了写完整回调函数,则会在loop中执行写完整回调函数。
  • 发送结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// muduo/net/TcpConnection.cc
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
// if no thing in output queue, try writing directly
// 直接写
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
// 返回共发出去多少字节,或者返回-1(出错)
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_) // // 全发出去了
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
// ...
}

4.2 接收过程

  • epoll触发读回调
  • 执行 Channel::handleEventWithGuard,并根据事件回调类型选择具体回调函数,本次Wie读回调处理函数,实际处理函数为TcpConnection::handleRead
  • 将数据放回到inputBuffer_后,执行消息回调messageCallback_。改回调函数经TcpClient,最终实际执行的是应用程序的消息回调函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// muduo/net/TcpConnection.cc
// 处理在loop中handleEvent事件之读事件
void TcpConnection::handleRead(Timestamp receiveTime)
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); // 消息回调
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
  • 在本例程中,上述代码中的messageCallback_LengthHeaderCodec中的onMessage, 经过处理后最终回调到ChatClient中的onStringMessage函数。
  • 接收消息并显示,在结束后,loop将继续阻塞在poll处,等待下次事件发生。

该阶段日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 写入数据
helloworld
# 写事件发生,唤醒
20230424 05:52:47.287685Z 17697 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:47.287713Z 17697 TRACE printActiveChannels {5: IN } - EventLoop.cc:277
20230424 05:52:47.287737Z 17697 TRACE handleEventWithGuard 5: IN - Channel.cc:86
# -------------------------------------------------------------------------------
# 接收阶段
# 读事件发生
20230424 05:52:47.287839Z 17697 TRACE poll fd total count 3 - EPollPoller.cc:57
20230424 05:52:47.288177Z 17697 TRACE poll 1 events happened - EPollPoller.cc:66
20230424 05:52:47.288197Z 17697 TRACE printActiveChannels {6: IN } - EventLoop.cc:277
20230424 05:52:47.288213Z 17697 TRACE handleEventWithGuard 6: IN - Channel.cc:86
<<< helloworld
# 继续阻塞在epoll_wait
20230424 05:52:47.288315Z 17697 TRACE poll fd total count 3 - EPollPoller.cc:57

5. 总结

本文的主要内容,在陈硕的《Linux多线程服务端编程》一书中均有体现。陈书全面之余,有些许零散,初次阅读可能会很难从整体把握muduo工作流。因此尝试以应用开发者的角度,跟踪源码,将整个过程串起来。

本文可能有错误之处,敬请批评指正。


理解muduo之客户端编程
http://ziyangfu.github.io/2023/04/25/理解muduo之客户端编程/
作者
FZY
发布于
2023年4月25日
许可协议