streambuf是C++流(iostream)与流实体(文件、标准输入输出等)交互的桥梁
# 文件流
fstream <--> filebuf <--> file
# 字符串流
stringstream <--> stringbuf <--> string
上面的文件流和字符串流是C++标准库已经提供了的,现在我的目标是实现一个使用TCP协议通信的socket流
tstream <--> tcpbuf <--> socket(tcp)
首先来分析一下streambuf的内部实现
streambuf内部实现
术语说明:
- get 相当于 从流中读取数据
- put 相当于 写入数据到流中
- 字符,C/C++中的char,也可以理解为字节
streambuf内部持有三个用于get的指针gfirst,gnext,glast
和三个用于put的指针pfirst,pnext,plast
,这些指针分别可以使用eback(),gptr(),egptr()
和pbase(),pptr(),epptr()
函数获得,在代码中需要使用这些函数获取指针,为了方便描述,我直接使用这些指针变量名
下面是其他几个受保护的成员函数的作用
- gbump(n) : gnext+=n
- setg : setg(gfirst, gnext, glast)
- pbump(n) : pnext+=n
- setp : setp(pfirst, pnext, plast)
小结:
- get缓冲区通过setg()设置,setg的三个参数分别对应gfirst,gnext,glast
- put缓冲区通过setp()设置,setp的两个参数分别对应pfirst,plast
- 如果继承自streambuf的子类不通过setg和setp设置缓冲区,也就是读写缓冲区为空,那么这个流可以说是不带读缓冲和写缓冲的流,这时gfirst = gnext = glast = pfirst = pnext = plast = NULL
子类需要override(覆写)几个虚函数来封装具体的流的实现
虚函数(protected)
这些函数有些需要子类实现,来屏蔽不同的流的具体实现,向上提供统一的接口
缓冲区管理
- setbuf ---------- 设置缓冲区
- seekoff --------- 根据相对位置移动内部指针
- seekpos --------- 根据绝对位置移动内部指针
- sync ------------ 同步缓冲区数据(flush),默认什么都不做
- showmanyc ------- 流中可获取的字符数,默认返回0
输入函数(get)
- underflow(c) ---- 当get缓冲区不可用时调用,用于获取流中当前的字符,注意获取和读入的区别,获取并不使gnext++,默认返回EOF
- uflow ----------- 默认返回underflow(),并使gnext++
- xsgetn(s, n) ---- 从流中读取n个字符到缓冲区s中并返回读到的字符数:默认从当前缓冲区中读取n个字符,若当前缓冲区不可用,则调用一次uflow()
- pbackfail ------- 回写失败时调用
输出函数(put)
- xsputn(s, n) ---- 将缓冲区s的n个字符写入到流中并返回写入的字符数
- overflow(c) ----- 当put缓冲区不可用时调用,向流中写入一个字符;当c==EOF时,流写入结束
缓冲区不可用是指gnext(pnext) == NULL或者gnext(pnext) >= glast(plast)
public函数
缓冲区管理
- pubsetbuf : setbuf()
- pubseekoff : seekoff()
- pubseekpos : seekpos()
- pubsync : sync()
输入函数(get)
NOTE: 下面的缓冲区指的是用于get操作的缓冲区
- in_avail : get缓冲区内还有多少个字符可获取
- snextc :
return sbumpc() == EOF ? EOF : sgetc()
- sbumpc : 缓冲区不可用时返回uflow();否则返回(++gnext)[-1]
- sgetc : 缓冲区不可用时返回underflow();否则返回*gnext
- sgetn : xsgetn()
- sputbackc : 缓冲区不可用时返回pbackfail(c);否则返回*(--gnext)
- sungetc : 类似于sputbackc,不过默认调用pbackfail(EOF)
输出函数(put)
NOTE: 下面的缓冲区指的是用于put操作的缓冲区
- sputc : 缓冲区不可用时,返回overflow(c);否则*pnext++ = c,返回pnext
- sputn : xsputn()
iostream与streambuf的调用关系
下面就iostream常用的几个函数说明他们的调用关系
- read(char *s, int n) -> buf.sgetn(s, n)
- getline() -> buf.sgetc(), buf.snextc(); 首先调用一次sgetc()来判断当前字符是否为EOF,然后不断地调用snextc()读取下一个字符,直到读到
\n
- peek() -> buf.sgetc()
- sync() -> buf.pubsync()
总结
- 除了read这种一次读入多个字符的函数外,一般的函数都是调用snextc()一次读入一个字符
- snextc()当缓冲区不可用时会触发uflow(),uflow()会调用underflow()触发一次读取的操作,如果读到了流的末尾,可以返回EOF
-
我们可以在underflow()函数里重新设置gfirst gnext glast,使得snextc()不会不断的调用uflow(),而可以先读取缓冲区里的数据
- 若缓冲区不为空,此函数需要重新移动gnext到新的位置并返回*gnext
- 流中没有数据时(或者说读到了流的末尾时)返回EOF
-
[gfirst, glast)
永远是已经从流实体里读到的数据如果他们不为空的话
TCP流的实现
class tcpbuf : public std::streambuf {
void initsocklib() {
#ifdef WIN32
static bool inited = false;
WSADATA wsaData;
if (!inited)
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
}
public:
enum { BUFSIZE = 1 << 5 };
tcpbuf(SOCKET s) { initsocklib(); sock = s; }
tcpbuf() {
initsocklib();
sock = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
}
~tcpbuf() override { close(); }
bool connect(char *ip, unsigned short port) {
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_addr.s_addr = inet_addr(ip);
addr.sin_family = AF_INET;
addr.sin_port = ::htons(port);
return !::connect(sock, (sockaddr *)&addr, sizeof(addr));
}
int close() {
#ifdef WIN32
return ::closesocket(sock);
#else
return ::close(sock);
#endif
}
protected:
// Buffered get
int underflow() override {
auto n = ::recv(sock, buf, BUFSIZE, 0);
return n > 0 ? (setg(buf, buf, buf + n), *gptr()) : EOF;
}
// Unbuffered put
int overflow(int c) override {
if (c == EOF) return close();
char b = c;
return ::send(sock, &b, 1, 0) > 0 ? c : EOF;
}
std::streamsize xsputn(const char *s, std::streamsize n) override {
auto x = ::send(sock, s, n, 0);
return x > 0 ? x : 0;
}
// flush
int sync() override {
#ifdef WIN32
return 0;
#else
return flush(sock);
#endif // WIN32
}
//streamsize showmanyc() override { return 1; }
private:
SOCKET sock;
char buf[BUFSIZE];
};
class tstream : public std::iostream {
public:
tstream() : std::iostream(&_buf) {}
tstream(SOCKET sock) : _buf(sock), std::iostream(&_buf) {}
tstream(char *ip, unsigned short port)
: tstream() { connect(ip, port); }
bool connect(char *ip, unsigned short port) {
return _buf.connect(ip, port);
}
private:
tcpbuf _buf;
};