首先,通过代码,查看调用关系:
libpqwalreceiver.c _PG_init 関数
/*
* Module load callback
*/
void
_PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
再看
walreceiver.c WalReceiverMain 関数
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
…
/* Main entry point for walreceiver process */
void
WalReceiverMain(void)
{
…
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
…
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
DisableWalRcvImmediateExit();
/* Initialize LogstreamResult, reply_message and feedback_message */
LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
MemSet(&reply_message, 0, sizeof(reply_message));
MemSet(&feedback_message, 0, sizeof(feedback_message));
/* Loop until end-of-streaming or error */
for (;;)
{
…
/* Wait a while for data to arrive */
if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
{
/* Accept the received data, and process it */
XLogWalRcvProcessMsg(type, buf, len);
/* Receive any more data we can without sleeping */
while (walrcv_receive(0, &type, &buf, &len))
XLogWalRcvProcessMsg(type, buf, len);
/* Let the master know that we received some data. */
XLogWalRcvSendReply();
/*
* If we've written some records, flush them to disk and let the
* startup process and primary server know about them.
*/
XLogWalRcvFlush(false);
}
else
{
/*
* We didn't receive anything new, but send a status update to the
* master anyway, to report any progress in applying WAL.
*/
XLogWalRcvSendReply();
XLogWalRcvSendHSFeedback();
}
}
}
再看
libpqwalreceiver.c libpqrcv_receive 関数
/*
* Receive a message available from XLOG stream, blocking for
* maximum of 'timeout' ms.
*
* Returns:
*
* True if data was received. *type, *buffer and *len are set to
* the type of the received data, buffer holding it, and length,
* respectively.
*
* False if no data was available within timeout, or wait was interrupted
* by signal.
*
* The buffer returned is only valid until the next call of this function or
* libpq_connect/disconnect.
*
* ereports on error.
*/
static bool
libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
{
int rawlen;
if (recvBuf != NULL)
PQfreemem(recvBuf);
recvBuf = NULL;
/* Try to receive a CopyData message */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
/*
* No data available yet. If the caller requested to block, wait for
* more data to arrive.
*/
if (timeout > 0)
{
if (!libpq_select(timeout))
return false;
}
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
return false;
}
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;
res = PQgetResult(streamConn);
if (PQresultStatus(res) == PGRES_COMMAND_OK)
{
PQclear(res);
ereport(ERROR,
(errmsg("replication terminated by primary server")));
}
PQclear(res);
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
/* Return received messages to caller */
*type = *((unsigned char *) recvBuf);
*buffer = recvBuf + sizeof(*type);
*len = rawlen - sizeof(*type);
return true;
}
再看:
fe-exec.c PQgetCopyData 関数
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
* Returns 0 if no row available yet (only possible if async is true),
* -1 if end of copy (consult PQgetResult), or -2 if error (consult
* PQerrorMessage).
*/
int
PQgetCopyData(PGconn *conn, char **buffer, int async)
{
*buffer = NULL; /* for all failure cases */
if (!conn)
return -2;
if (conn->asyncStatus != PGASYNC_COPY_OUT &&
conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
return -2;
}
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
return pqGetCopyData3(conn, buffer, async);
else
return pqGetCopyData2(conn, buffer, async);
}
还有这个:
fe-exec.c PQgetCopyData 関数
/*
* PQgetCopyData - read a row of data from the backend during COPY OUT
* or COPY BOTH
*
* If successful, sets *buffer to point to a malloc'd row of data, and
* returns row length (always > 0) as result.
* Returns 0 if no row available yet (only possible if async is true),
* -1 if end of copy (consult PQgetResult), or -2 if error (consult
* PQerrorMessage).
*/
int
PQgetCopyData(PGconn *conn, char **buffer, int async)
{
*buffer = NULL; /* for all failure cases */
if (!conn)
return -2;
if (conn->asyncStatus != PGASYNC_COPY_OUT &&
conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
return -2;
}
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
return pqGetCopyData3(conn, buffer, async);
else
return pqGetCopyData2(conn, buffer, async);
}
事实上,从上面的逻辑,可以看到:
如果slave端读取wal,发生了错误,那么它会在循环中再次试图读取,知道成功为止。
所以,出错了不要紧。当然这种 FATAL ERROR出现,肯定是要引起重视的。
经过试验,发现有几种可能会出现错误:
错误发生原因1:如果没有采用 archive log模式,那么当master端事务任务过重,导致在线wal log很快被删除覆盖,那么slave端就会找不到相应的wal log,于是在master端和客户端都出现:
FATAL: could not receive data from WAL stream: FATAL: requested WAL segment 0000000100000000000000XX has already been removed
错误发生原因2:
如果在master端的postgresql.conf文件中,设置了 replication_timeout,但wal_receiver_status_interval 却等于零,
则经过 replication_timeout秒后,如果master和slave之间因为master不忙等原因没有通信,则master会主动把这个连接干掉。
所以此时
master端出现:LOG:terminating walsender process due to replication timeout
slave端出现: FATAL: could not receive data from WAL stream:
可能的错误发生原因3:
这可能是和 recovery.conf 中的primary_conninfo有关:
例如:
primary_conninfo = 'host=master port=5432 application_name=mypg user=postgres connect_timeout=10 keepalives_idle=10 keepalives_interval=1 keepalives_count=3'
这样,每隔10秒,为了看看当前连接是否已经失效,就要发送3个keepalive数据包,如果在1秒的时间里没有得到对方响应,那么就认为连接已经死掉。
这样,如果master端的通讯比较繁忙,可能来不及应答,这样就可能发生 FATAL: could not receive data from WAL stream: could not receive data from server: connection timeout error,目前此种情况尚未再现出来,尚需验证。
下面这段话,说明了hot-standby 的中间过程:
http://www.postgresql.org/docs/9.2/static/warm-standby.html
In standby mode, the server continuously applies WAL received from the master server. The standby server can read WAL from a WAL archive (see restore_command) or directly from the master over a TCP connection (streaming replication). The standby server will also attempt to restore any WAL found in the standby cluster's pg_xlog directory. That typically happens after a server restart, when the standby replays again WAL that was streamed from the master before the restart, but you can also manually copy files to pg_xlog at any time to have them replayed.
At startup, the standby begins by restoring all WAL available in the archive location, calling restore_command. Once it reaches the end of WAL available there and restore_command fails, it tries to restore any WAL available in the pg_xlog directory. If that fails, and streaming replication has been configured, the standby tries to connect to the primary server and start streaming WAL from the last valid record found in archive or pg_xlog. If that fails or streaming replication is not configured, or if the connection is later disconnected, the standby goes back to step 1 and tries to restore the file from the archive again. This loop of retries from the archive, pg_xlog, and via streaming replication goes on until the server is stopped or failover is triggered by a trigger file.
就是说: standby server一旦启动,就会按照 archive directory --> pg_xlog directory ---> streaming replication 的顺序来应用 wal log。
所以,单纯由于网络环境造成出错的可能性比较大。
本文转自健哥的数据花园博客园博客,原文链接:http://www.cnblogs.com/gaojian/p/3349179.html,如需转载请自行联系原作者