0%

muduo库源码Buffer

Buffer的功能需求

设计buffer可以从易用性和性能两方面考虑,muduo的buffer更偏向于易用性。

  • 对外表现是一块连续的内存(char* p, int len)
  • size()可以自动增长,不是固定大小的数组
  • 内部以std::vector<int>来保存数据
  • buffer更像一个queue,从尾部写入数据,从头部读取数据
  • input buffer:连接从socket中读取数据,写入input buffer;客户代码从中读取数据
  • output buffer:客户代码把数据写入output buffer,连接从output buffer中读数据并写入socket

muduo/base基础库

在学习muduo如何实现Buffer之前,先阅读Buffer.h头文件中所需要的依赖。

types.h

基本类型的声明,如muduo::string
具体有:

1
2
3
4
5
6
7
8
9
10
11
12
inline void memZero(void* p, size_t n);

//作为static_cast<>或const_cast<>的安全版本
template<typename To, typename From>
inline To implicit_cast(From const &f)

//当upcast使用implicit_cast<>,downcast时static_cast<>不再安全,这里使用
//dynamic_cast<>来检查downcast是否合法
//不支持RTTI,如
//if(dynamic_cast<subclass>(foo)) HandleSubclassObj(foo);
template<typename To, typename From>
inline To down_cast(From* f)

StringPiece.h

传递C风格的字符串参数给函数,知乎有个相关的提问与回答点击查看StringPiece是某公司使用的一个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class StringArg {
public:
StringArg(const char* str)
: str_(str)
{ }

StringArg(const string& str)
: str_(str.c_str())
{ }

const char* c_str() const { return str_; }

private:
const char* str_;

提供non-explicit构造函数给使用者,可以传递const char*string

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#ifndef MUDUO_BASE_STRINGPIECE_H
#define MUDUO_BASE_STRINGPIECE_H
namespace muduo {
{
class StringPiece {
private:
const char* ptr_;
int length_;

public:
//default
StringPiece()
: ptr_(NULL), length_(0) { }
//参数为字符指针
StringPiece(const char* str)
: ptr_(str), length_(static_cast<int>(strlen(ptr_))) { }
//参数为无符号字符指针; reinterpret_cast临时隐藏类型,使用时要将其转换为原型
//具有非常强的转换能力
StringPiece(const unsigned char* str)
: ptr_(reinterpret_cast<const char*>(str)),
length_(static_cast<int>(strlen(ptr_))) { }

StringPiece(const string& str)
: ptr_(str.data()), length_(static_cast<int>(str.size())) { }

StringPiece(const char* offset, int len)
: ptr_(offset), length_(len) { }

//string类常见成员
const char* data() const { return ptr_; }
int size() const { return length_; }
bool empty() const { return length_ == 0; }
const char* begin() const { return ptr_; }
const char* end() const { return ptr_ + length_; }

void clear() { ptr_ = NULL; length_ = 0; }
void set(const char* buffer, int len) { ptr_ = buffer; length = len; }
void set(const char* str) {
ptr = str;
length_ = static_cast<int>(strlen(str));
}

//运算符成员函数[], ==,
char operator[](int i) const { return _ptr[i]; }
bool operator==(const StringPiece& x) const {
return (x.length_ == length_) &&
(memcmp(x.ptr_, ptr_, length_) == 0);
}
bool operator!=(const StringPiece& x) const {
return !(*this == x);
}

//比较成员函数
int compare(const StringPiece& x) const { ... }
//转换成string类型
string as_string() const {...}
void CopyToString(string* target) { ... }
//判断起始字符
bool start_with(const StringPiece& x) const { ... }
};
} // namespace muduo
//允许String Pieces被记录到日志中
std::ostream& operator<<(std::ostream o&, const muduo::StringPieces piece);

#endif

muduo/net网络库

Buffer.cc和Buffer.h

在这里插入图片描述
1)特点:封装vector作为缓冲区,因为vector为一块连续空间,且其本身具有自动增长的性质,迭代器为原始指针,使用起来较为方便
2)分布:缓冲区分为三个部分
prependable:大小为readerIndex
readable:大小为writerIndex - readerIndex
writeable:大小为size() - writerIndex
初始时,readable == 0,readIndex = writeIndex = 8;
3)检索:Buffer类通过findCRLFfindEOL成员函数在readerable区域检索\r\nEOL
4)调整区域

  • 通过成员函数hasWrittenunwrite调整可读/可写区域的大小,如果向Buffer写入200字节,那么writeIndex += 200readable == 200writable == 824
  • 成员函数read()retrieve()读入50字节,readIndex += 50readable == 150writable == 824;当一次性读完时,调用retrieveall()Buffer重置

5)自动增长:如果客户代码一次性写入1000字节,而当前可写字节小于这个数(prependable+writable),那么buffer就会自动增长以容纳全部数据。
6)内部挪腾:当可写字节满足需求,但是writable大小不足时,移动readable,腾出位置
7)前方添加:提供prependable,让程序能以很小的代价在数据前面添加几个字节。比如说要序列化一个消息,但是并不知道消息长度,于是不断append()直到序列化完成,此时可以通过readable区域变化求出消息长度,在prependable中用4个字节存储。
8)读取内容:利用readfd(),在栈上开辟一块65536字节额外缓冲区,利用readv()来读。注意在muduo中只调用了一次readv来读取数据是因为它采用的是level trigger,而不是edge trigger。如果采用边缘触发,就需要使用一个while循环来不断读取,直到把缓冲区的内容读取完。边缘触发相比水平触发会进行更多次的系统调用,对追求低延迟的程序而言不一定更高效。边缘触发更适合高并发,比如系统中存在很多不需要读写的就绪文件描述符,水平触发每次都会返回不断通知,边缘触发只通知一次,直到有第二次的读写事件发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class Buffer : public muduo::copyable 
{
private:
//1) 用一个vector来维护一个缓冲区
std::vector<char> buffer_;
size_t readerIndex_;
size_t writerIndex_;
static const char kCRLF[];

private:
char* begin() { return &*buffer_.begin(); } //使用vector自身的迭代器,begin()返回缓冲区起始元素
char* beginWrite() { return begin() + writeIndex_; } //获得writeable区的起始元素

public:
static const size_t kCheapPrepend = 8; //定义prepenable初始大小为8
static const size_t kInitialSize = 1024; //定义writeable初始大小为1024

//2) 求缓冲区三个部分的大小
size_t readableBytes() const
{ return writerIndex_ - readerIndex_; }
size_t writableBytes() const
{ return buffer_.size() - writerIndex_; }
size_t prependableBytes() const
{ return readerIndex_; }

const char* peek() const { return begin() + readerIndex_; } //求readerable的头部指针

//3) 检索"/r/n"和EOL
const char* findCRLF() const
{
const char* crlf = std::search(peek(), beginWrite(), kCRLF, kCRLF+2) //检索范围是readable区域,搜索的元素范围是[kCRLF,kCRLF+2]
return crlf == beginWrite() ? NULL : crlf; //若search返回尾部迭代器则没有找到
}
//重载findCRLF,可以自定义检索起始位置
const char* findCRLF(const char* start) const
{
assert(peek() <= start);
assert(start <= beginWrite());
...
}
const char* findEOL() const { ... } //在readerable中搜索换行符
const char* findEOL(const char* start) const { ... }

private:
//4) 写入和未写入移动writerIndex来调整可读/可写区域的大小
void hasWritten(size_t len)
{
assert(len <= writableBytes());
writerIndex += len;
}
void unwrite(size_t len)
{
assert(len <= readableBytes());
writerIndex_ -= len;
}

//从缓冲区中读取数据
void retrieve(size_t len) { ... }
void retrieveAll()
{
readerIndex_ = kCheapPrepend;
writerIndex_ = kCheapPrepend;
}
void append(const char* data, size_t len)
{
ensureWritableBytes(len);
std::copy(data, data+len, beginWrite());
hasWritten(len);
}

//5) 自动增长
void ensureWritableBytes(size_t len)
{
if (writableBytes() < len)
{
makeSpace(len);
}
assert(writableBytes() >= len);
}
//6) 内部腾挪
void makeSpace(size_t len)
{
if (writableBytes() + prependableBytes() < len + kCheapPrepend)
{
// FIXME: move readable data
buffer_.resize(writerIndex_+len);
}
else
{
// move readable data to the front, make space inside buffer
assert(kCheapPrepend < readerIndex_);
size_t readable = readableBytes();
//std::copy(first, last, destfirst) 要拷贝元素的首地址、尾地址和拷贝目的地首地址
std::copy(begin()+readerIndex_,
begin()+writerIndex_,
begin()+kCheapPrepend);
readerIndex_ = kCheapPrepend;
writerIndex_ = readerIndex_ + readable;
assert(readable == readableBytes());
}
}
};

//8) 读取内容
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
char extrabuf[65536]; //在栈上开辟一块额外的缓冲区
struct iovec vec[2];
const size_t writable = writableBytes();
//iovec第一块指向buffer中的writable,第二块指向extrabuf
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
//当空间足够时,即 n < writable,不使用extrabuf
//否则读到extrabuf,然后再把extrabuf中的数据append()到Buffer中
const int iovcnt = (writable < sizeof extrabuf) ? 2 : 1;
const ssize_t n = sockets::readv(fd, vec, iovcnt); //readv返回读到的总字节数n
if (n < 0)
{
*savedErrno = errno; //存储异常
}
else if (implicit_cast<size_t>(n) <= writable)
{
writerIndex_ += n;
}
else
{
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}