Slony-I的 RemoteWorker重试调查

简介:

客户的问题是:

向Slony-I运行环境中,增加新的slaveDB节点的时候发生错误。

log中反复出现错误,然后再重新开始(重新开始部分的log省略):

复制代码
CONFIG remoteWorkerThread_1: connected to provider DB
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."b_tbl"
CONFIG remoteWorkerThread_1: prepare to copy table "tst"."c_tbl"
CONFIG remoteWorkerThread_1: all tables for set 1 found on subscriber
CONFIG remoteWorkerThread_1: copy sequence "tst"."a_no_seq"
CONFIG remoteWorkerThread_1: copy sequence "tst"."b_no_seq"
CONFIG remoteWorkerThread_1: copy sequence "tst"."c_no_seq"

CONFIG remoteWorkerThread_1: copy table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: Begin COPY of table "tst"."a_tbl"
NOTICE:  truncate of "tst"."a_tbl" succeeded
CONFIG remoteWorkerThread_1: 33778 bytes copied for table "tst"."a_tbl"
CONFIG remoteWorkerThread_1: 27.97 seconds to copy table "tst"."a_tbl"

CONFIG remoteWorkerThread_1: copy table "tst"."b_tbl"
CONFIG remoteWorkerThread_1: Begin COPY of table "tst"."b_tbl"
ERROR  remoteWorkerThread_1: "select "_mycluster".copyFields(2);" 

WARN   remoteWorkerThread_1: data copy for set 1 failed 1 times - sleep 15 seconds
NOTICE:  Slony-I: Logswitch to sl_log_2 initiated
CONTEXT:  SQL statement "SELECT "_mycluster".logswitch_start()"
复制代码

经过查阅资料,并且和客户沟通,发现是他们的网络环境有问题:原有节点所在网段和新增节点不在一个网段。而他们又使用了网络工具来监控网络,在某些特定情况下,网络工具会切点网络连接。

正式此原因,导致出错。然后我进行了代码分析,发现remoteworker是很勤劳的,如果发生了通讯错误,它会反复重试的:

remoteWorkerThread_main函数的while循环,就会完成这个工作。

复制代码
/* ----------                                                    
 * slon_remoteWorkerThread                                                    
 *                                                    
 * Listen for events on the local database connection. This means, events                                                    
 * generated by the local node only.                                                    
 * ----------                                                    
 */                                                    
void *                                                    
remoteWorkerThread_main(void *cdata)                                                    
{                                                    
    …                                                
    /*                                                
     * Work until shutdown or node destruction                                                
     */                                                
    while (true)                                                
      {                                                
        …                                            
        /*                                            
         * Event type specific processing                                            
         */                                            
        if (strcmp(event->ev_type, "SYNC") == 0)                                            
        {                                            
            …                                        
        }                                            
        else    /* not SYNC */                                        
        {                                            
            …                                        
            /*                                        
             * Simple configuration events. Call the corresponding runtime                                        
             * config function, add the query to call the configuration event                                        
             * specific stored procedure.                                        
             */                                        
            if (strcmp(event->ev_type, "STORE_NODE") == 0)                                        
            {                                        
                …                                    
            }                                        
            …                                        
            else if (strcmp(event->ev_type, "ENABLE_SUBSCRIPTION") == 0)                                        
            {                                        
                …                                    
                int         copy_set_retries = 0;                                    
                …                                    
                                                    
                                                    
                if (sub_receiver == rtcfg_nodeid &&                                    
                    event->ev_origin == node->no_id)                                
                {                                    
                    ScheduleStatus            sched_rc;                    
                    int            sleeptime = 15;                    
                    …                                
                    while (true)                                
                    {                                
                        …                            
                        /*                            
                         * If the copy succeeds, exit the loop and let the                            
                         * transaction commit.                            
                         */                            
                        if (copy_set(node, local_conn, sub_set, event) == 0)                            
                        {                            
                            …                        
                            copy_set_retries = 0;                        
                            break;                        
                        }                            
                        copy_set_retries++;                            
                                                    
                        /*                            
                         * Data copy for new enabled set has failed. Rollback                            
                         * the transaction, sleep and try again.                            
                         */                            
                        slon_log(SLON_WARN, "remoteWorkerThread_%d: "                            
                                 "data copy for set %d failed %d times - "                    
                                 "sleep %d seconds\n",                    
                                 node->no_id, sub_set, copy_set_retries,                    
                                 sleeptime);                    
                        …                            
                    }                                
                }                                    
                else                                    
                {                                    
                    …                                
                }                                    
                …                                    
            }                                        
            …                                        
            else                                        
            {                                        
                …                                    
            }                                        
                                                    
            /*                                        
             * All simple configuration events fall through here. Commit the                                        
             * transaction.                                        
             */                                        
            …                                        
        }                                            
        …                                            
    }                                                
    …                                                
}                                                    
                                                    
                                                    
/* ----------                                                    
 * copy_set                                                    
 * ----------                                                    
 */                                                    
static int                                                    
copy_set(SlonNode *node, SlonConn *local_conn, int set_id,                                                    
         SlonWorkMsg_event *event)                                            
{                                                    
    …                                                
    /*                                                
     * Connect to the provider DB                                                
     */                                                
    …                                                
    slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                                
             "connected to provider DB\n",                                        
             node->no_id);                                        
    …                                                
    /*                                                
     * For each table in the set                                                
     */                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        char       *tab_fqname = PQgetvalue(res1, tupno1, 1);                                        
                                                    
        gettimeofday(&tv_start2, NULL);                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "prepare to copy table %s\n",                                    
                 node->no_id, tab_fqname);                                    
                                                    
        (void) slon_mkquery(&query3, "select * from %s limit 0;",                                            
                            tab_fqname);                        
        res2 = PQexec(loc_dbconn, dstring_data(&query3));                                            
        …                                            
    }                                                
    …                                                
    slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                                
             "all tables for set %d found on subscriber\n",                                        
             node->no_id, set_id);                                        
    …                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "copy sequence %s\n",                                    
                 node->no_id, seq_fqname);                                    
        …                                            
    }                                                
    …                                                
                                                    
    /*                                                
     * For each table in the set                                                
     */                                                
    for (tupno1 = 0; tupno1 < ntuples1; tupno1++)                                                
    {                                                
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "copy table %s\n",                                    
                 node->no_id, tab_fqname);                                    
        …                                            
        if (omit_copy) {                                            
            …                                        
        } else {                                            
            slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                        
                 "Begin COPY of table %s\n",                                    
                 node->no_id, tab_fqname);                                    
                                                    
            (void) slon_mkquery(&query2, "select %s.copyFields(%d);",                                        
                            rtcfg_namespace, tab_id);                        
                                                    
            res3 = PQexec(pro_dbconn, dstring_data(&query2));                                        
                                                    
            if (PQresultStatus(res3) != PGRES_TUPLES_OK)                                        
            {                                        
                slon_log(SLON_ERROR, "remoteWorkerThread_%d: \"%s\" %s\n",                                    
                         node->no_id, dstring_data(&query2),                            
                         PQresultErrorMessage(res3));                            
                …                                    
                return -1;                                    
            }                                        
                                                    
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 INT64_FORMAT " bytes copied for table %s\n",                                    
                 node->no_id, copysize, tab_fqname);                                    
        …                                            
        slon_log(SLON_CONFIG, "remoteWorkerThread_%d: "                                            
                 "%.3f seconds to copy table %s\n",                                    
                 node->no_id,                                    
                 TIMEVAL_DIFF(&tv_start2, &tv_now), tab_fqname);                                    
    }                                                
    …                                                
    return 0;                                                
}                                                    
复制代码




目录
相关文章
|
9月前
有几百万消息持续积压几小时,怎么解决
有几百万消息持续积压几小时,怎么解决
50 0
|
负载均衡 Dubbo 应用服务中间件
我叫你不要重试,你非得重试。这下玩坏了吧? (下)
我叫你不要重试,你非得重试。这下玩坏了吧? (下)
214 0
我叫你不要重试,你非得重试。这下玩坏了吧? (下)
|
XML Dubbo 应用服务中间件
我叫你不要重试,你非得重试。这下玩坏了吧? (上)
我叫你不要重试,你非得重试。这下玩坏了吧? (上)
92 0
我叫你不要重试,你非得重试。这下玩坏了吧? (上)
|
负载均衡 Dubbo 应用服务中间件
我叫你不要重试,你非得重试。这下玩坏了吧? (中)
我叫你不要重试,你非得重试。这下玩坏了吧? (中)
70 0
我叫你不要重试,你非得重试。这下玩坏了吧? (中)
|
机器学习/深度学习 网络协议 算法
设置默认的超时和重试是一个基础设施的基本素养
设置默认的超时和重试是一个基础设施的基本素养
设置默认的超时和重试是一个基础设施的基本素养
|
SQL 安全 中间件
网站收到网络安全监督检查限期整改通知书
我们来看下网络安全监督检查限期整改通知书: 根据《中华人民共和国网络安全法》《中华人民共和国人民警察法》《中华人民共和国计算机信息系统安全保护条例》等法律法规规定,我单位于近日对你单位网络安全保护工作进行了监督检查,现将有关情况反馈如下:详见附件,根据《中华人民共和国网络安全法》《中华人民共和国计算机信息系统安全保护条例》《信息安全等级保护管理办法》《公安机关互联网安全监督检查规定》,请你单位于2022年1月18日前完成整改,并在期限届满前将整改情况函告我单位。
1028 0
网站收到网络安全监督检查限期整改通知书
|
监控 安全
黑客网站拒绝删除微软机密文件 被迫关停
Cryptome.org是一个知名的提供网络盗版和黑客情报的黑客网站,不久前它提供了微软的“全球犯罪调查手册”(Global Criminal Compliance Handbook,GCCH),这份22页的指南介绍了微软为执法机构提供的对各类在线平台的监控服务,包括提取IP地址的详细说明。
1063 0
|
Linux
记一次审核时间异常引发的惊天...
某用户反馈系统时间在某一个时刻跳变到2019年10月1号,导致程序异常,并提供了相关的截图佐证
1739 0
|
算法 编解码 内存技术
|
安全
工控系统网络应急响应小组:超过半数的攻击为APT
本文讲的是工控系统网络应急响应小组:超过半数的攻击为APT,工控系统网络应急响应小组(ICS-CERT)的报告显示,工控系统已经成为各种恶意攻击的目标,其中高级持续威胁攻击在所有报告的安全事件中占半数以上。
1598 0