Discuz!NT跨站缓存同步

简介:

 在之前的文章中,提到了在Discuz!NT中进行缓存分层的概念。之前在产品中也实现了其中的构想,但该方案有一个问题,就是如果将产品进行分布式布署 之后,如果某一站点发生数据变化时,只能更新本地缓存和Memcached缓存信息,而其它分布式布署的站点则无法收到缓存数据已修改的‘通知’,导致数 据不同步而成为‘脏数据’。
      虽然在之前的文章中提到通过将本地缓存失效时间‘缩短’(比如15秒后即失效),以便在相对较短的时间内让本地数据失效从而再次从Memcached读取 最新的数据,但这必定不符合我们设计的基本思路,并且导致程序的运行效率低,同时会造成过于频繁的访问Memcached,无形中增加了与 Memcached的socket开销。所以才有了今天的这篇文章。

      首先要说明的是,这个方案只有Discuz!NT的企业版(EntLib)中提供,所以在普通的版本中是找不到它的影子的,下面我就简要说明一下其实现思 路。
      因为在传统的WEB应用开发中,一般都是采用get的方式来获得所需要的数据,也就是通过从客户端向服务端发送get请求来获得数据。而如果要实现下面的 流程:     

     当本地缓存数据变化-->更新memcached-->(通知notify)其它分布式应用   

      这里我借助主动发送的模式来实现,为了便于理解,我这里将memcached变成服务端,将分布式布署的应用看成是一个个‘客户端’,而当‘客户端’将数 据更新到memcached时,通过发送http通知的方式来通知其它‘客户端’,所以我们要实现的代码包括两部分,一部分实现上面流程中的‘将本地数据 变化告之到memcached’。这块代码已在之前的文章中被实现了,而我们只要在相应的‘RemoveObject’方法后跟上一行‘通知其它分布式应 用’即可(代码位于Discuz.EntLib\Memcached\MemCachedStrategy.cs),如下面所示:  

代码

         ///   <summary>
        
///  移除指 定ID的对象
        
///   </summary>
        
///   <param name="objId"></param>
         public   override   void  RemoveObject( string  objId)
        {
            
// 先移除本地cached,然后再移除memcached中的相应数据
             if  ( base .RetrieveObject(objId)  !=   null )
                
base .RemoveObject(objId);

            
if  (MemCachedManager.CacheClient.KeyExists(objId))
                MemCachedManager.CacheClient.Delete(objId);

            Discuz.EntLib.SyncCache.SyncRemoteCache(objId);
// 通知其它分布式应用
        }    

 

      下面就是‘同步其它分布式应用缓存数据’的代码了。在介绍代码之前,先要将‘发送缓存数据修改通知’的设计思想介绍一下:     

1.首先我们需要一下记录着分布式 布署应用的网站列表,它主要是一链接串,比如下面这个格式(用逗号分割):      

< SiteUrl > http://10.0.2.137:8088/tools/,http://10.0.2.150:8088/tools/,http://10.0.2.136:8088/tools/ </ SiteUrl >

        我们需要将上面的链接串分割之后加上相应的更新缓存工具页面(稍后介绍)来实现移除(相当时同步)的功能。    

2.为了安全起见,在发送通知的请求时,需 要对请求进行加密,以免该功能被其它恶意代码利用,从而造成系统安全性和效率受到影响,所以我这里提供了认证码,即:

< AuthCode > 123123 </ AuthCode >

       这样,验证码加密的请求只有在被同步工具正确解析后,才会更新相应的缓存数据。

      了解这些内容之后,我们看一下相应的实现代码以验证一下所说的设计思想(Discuz.EntLib\SyncLocalCache \SyncCache.cs):     

 

代码
///   <summary>
///  同步缓存类
///   </summary>
public   class  SyncCache
{
    
///   <summary>
    
///  除本站 之外的负载均衡站点列表
    
///   </summary>
     static  List < string >  syncCacheUrlList  =   null ;

    
static  LoadBalanceConfigInfo loadBalanceConfigInfo  =  LoadBalanceConfigs.GetConfig();

    
static  SyncCache()
    {
        syncCacheUrlList 
=   new  List < string > ();
        syncCacheUrlList.AddRange(loadBalanceConfigInfo.SiteUrl.
            Replace(
" tools/ " " tools/SyncLocalCache.ashx " ).Split( ' , ' ));
     
        
int  port  =  HttpContext.Current.Request.Url.Port;
        
string  localUrl  =   string .Format( " {0}://{1}{2}{3} " ,
                                         HttpContext.Current.Request.Url.Scheme,
                                         HttpContext.Current.Request.Url.Host,
                                         (port 
==   80   ||  port  ==   0 ?   ""  :  " : "   +  port,
                                         BaseConfigs.GetForumPath);

        Predicate
< string >  matchUrl  =   new  Predicate < string >
        (
            
delegate ( string  webUrl)
            {
                
return  webUrl.IndexOf(localUrl)  >=   0 // 移除本地站点链接,因为当前站点缓存已被移除。
            }
        );

        syncCacheUrlList.RemoveAll(matchUrl);
    }

      首先我们在静态构造方法中读取相应url链接列表(loadBalanceConfigInfo配置文件),然后将其中的本地应用链接去掉,这样就不会造 成反复更新本地缓存数据(从而造成死循环)的问题了。接着就是使用一个线程来发送相应的同步数据请求到各个分布式应用上,如下(包括使用认证码加密链接信 息):  

 代码

   ///   <summary>
    
///  同步远 程缓存信息
    
///   </summary>
    
///   <param name="cacheKey"></param>
     public   static   void  SyncRemoteCache( string  cacheKey)
    {
        
foreach  ( string  webSite  in  syncCacheUrlList)
        {
            
string  url  =   string .Format( " {0}?cacheKey={1}&passKey={2} " ,
                                       webSite,
                                       cacheKey,
                                       Discuz.Common.Utils.UrlEncode(Discuz.Common.DES.Encode(cacheKey, loadBalanceConfigInfo.AuthCode)));

            ThreadSyncRemoteCache src 
=   new  ThreadSyncRemoteCache(url);
            
new  Thread( new  ThreadStart(src.Send)).Start();
        }
    }

 

      这里我们使用线程方式来更新相应的分布式应用,思路是: 

      对一个分布式应用发送三次请求,如果其中某一次返回结果为ok时,则不再向其发送其余请求了。如果上一次请求不成功,则当前线程暂停五秒后再次发送请求, 直到三次请求用完为止。这样主要是考虑到远程应用上的主机可能某一时刻处于忙碌状态而无法响应,所以采用发送三次(每次间隔五秒)的方式。

      下面就是它的主要实现代码:       

 

代码
     ///   <summary>
    
///  多线程 更新远程缓存
    
///   </summary>
     public   class  ThreadSyncRemoteCache
    {
        
public   string  _url;

        
public  ThreadSyncRemoteCache( string  url)
        {
            _url 
=  url;
        }

        
public   void  Send()
        {
            
try
            {
                
// 设置循环三次,如果 某一次更新成功("OK"),则跳出循环
                 for  ( int  count  =   0 ; count  <   3 ; count ++ )
                {
                    
if  ( this .SendWebRequest(_url)  ==   " OK " )
                        
break ;
                    
else
                        Thread.Sleep(
5000 ); // 如果更新不成功,则暂停5秒后再次更新
                }
            }
            
catch  { }
            
finally
            {
                
if  (Thread.CurrentThread.IsAlive)
                    Thread.CurrentThread.Abort();                  
            }
       }

        
///   <summary>
        
///  发送 web请求
        
///   </summary>
        
///   <param name="url"></param>
        
///   <returns></returns>
         public   string  SendWebRequest( string  url)
        {
            StringBuilder builder 
=   new  StringBuilder();
            
try
            {
                WebRequest request 
=  WebRequest.Create( new  Uri(url));
                request.Method 
=   " GET " ;
                request.Timeout 
=   15000 ;
                request.ContentType 
=   " Text/XML " ;
                
using  (WebResponse response  =  request.GetResponse())
                {
                    
using  (StreamReader reader  =   new  StreamReader(response.GetResponseStream(), Encoding.UTF8))
                    {
                        builder.Append(reader.ReadToEnd());
                    }
                }
            }
            
catch
            {
                builder.Append(
" Process Failed! " );
            }
            
return  builder.ToString();
        }
    }
 

 

     现在发送请求的功能介绍完了,下面简要介绍一下在‘分布式应用’那一方如何对上面发送的请求进行解析操作的。请看下面的代码段:    

 

代码
///   <summary>
    
///  同步本 地缓存
    
///   </summary>
    [WebService(Namespace  =   " http://tempuri.org/ " )]
    [WebServiceBinding(ConformsTo 
=  WsiProfiles.BasicProfile1_1)]
    
public   class  SyncLocalCache : IHttpHandler
    {
        
public   void  ProcessRequest(HttpContext context)
        {
            context.Response.ContentType 
=   " text/plain " ;
            
string  cacheKey  =  context.Request.QueryString[ " cacheKey " ];
            
string  passKey  =  context.Request.QueryString[ " passKey " ];

            
if  (Utils.StrIsNullOrEmpty(cacheKey))
            {
                context.Response.Write(
" CacheKey is not null! " );
                
return ;
            }
            
if  ( ! cacheKey.StartsWith( " /Forum " ))
            {
                context.Response.Write(
" CacheKey is not valid! " );
                
return ;
            }
            
if  (passKey  !=  Discuz.Common.DES.Encode(cacheKey, Discuz.Config.LoadBalanceConfigs.GetConfig().AuthCode))
            {
                context.Response.Write(
" AuthCode is not valid! " );
                
return ;
            }

            
// 更新本地缓存 (注:此处不可使用MemCachedStrategy的RemoveObject方法,因为该方法中有SyncRemoteCache的调用,会造成循 环调用)
            Discuz.Cache.DNTCache cache  =  Discuz.Cache.DNTCache.GetCacheService();
            cache.LoadCacheStrategy(
new  DefaultCacheStrategy());
            cache.RemoveObject(cacheKey);
            cache.LoadDefaultCacheStrategy();    

            context.Response.Write(
" OK " );
        }

        
public   bool  IsReusable
        {
            
get
            {
                
return   false ;
            }
        }
    }
 

          上面代码首先会获取请求过来的缓存键值和passKey(即认证码加密后的链接),然后在本地进行数据有效性校验,如果认证通过的 话,就可以对其要移除的缓存数据进行操作了,并在操作成功之后返回ok信息。该页面采用synclocalcache.ashx文件进行声明,

如 下: 

 

< %@ WebHandler  Language ="C#"   Class ="Discuz.EntLib.SyncLocalCache"  % >

 

     到这里,只要将该ashx文件放到站点的tools/文件夹下,就可以实现跨站同步缓存数据的功能了。目前考虑的场景还是比较单一的,所以实现的 代码也相对简单,不排除随着业务逻辑复杂度不断提升而做重新设计的可能性。

     为了便于购买我们商业服务的客户进行管理操作,我们还提供了一个企业级的监控管理工具,该工具基本asp.net mvc框架开发,提供了监视负 载均衡,同步缓存,读写分离检查和远程服务器运行状态(CPU,内存等使用情况)。下面是该工具所提供的同步缓存数据的功能界面:  

      该工具的开发思想和实现原理会在后面章节中加以详细说明,敬请关注:)



本文转自 daizhenjun 51CTO博客,原文链接:http://blog.51cto.com/daizhj/341288,如需转载请自行联系原作者

相关文章
|
2月前
|
缓存
详解CentOS8更换yum源后出现同步仓库缓存失败的问题
详解CentOS8更换yum源后出现同步仓库缓存失败的问题
107 0
|
canal 存储 SQL
MySQL 与 Redis 缓存的同步方案
本文介绍MySQL与Redis缓存的同步的两种方案 方案1:通过MySQL自动同步刷新Redis,MySQL触发器+UDF函数实现 方案2:解析MySQL的binlog实现,将数据库中的数据同步到Redis 一、方案1(UDF)
MySQL 与 Redis 缓存的同步方案
|
canal 存储 SQL
Mysql与Redis缓存同步方案详解
Mysql与Redis缓存同步方案详解
1570 1
Mysql与Redis缓存同步方案详解
|
9月前
|
消息中间件 缓存 NoSQL
redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)
redis做为缓存,mysql的数据如何与redis进行同步呢?(双写一致性)
617 0
|
9月前
|
canal 消息中间件 缓存
Canal 实战 | 第一篇:SpringBoot 整合 Canal + RabbitMQ 实现监听 MySQL 数据库同步更新 Redis 缓存
Canal 实战 | 第一篇:SpringBoot 整合 Canal + RabbitMQ 实现监听 MySQL 数据库同步更新 Redis 缓存
|
缓存 前端开发
ehcache jgroups同步,节点重启初始化缓存bug
ehcache jgroups同步,节点重启初始化缓存bug
114 0
|
缓存 算法 NoSQL
缓存和数据库同步|学习笔记
快速学习缓存和数据库同步
|
缓存 算法 NoSQL
缓存和数据库同步|学习笔记
快速学习缓存和数据库同步
|
缓存
Confluence 6 从外部目录中同步数据手动同步缓存
你可以通过单击用户目录(User Directories)界面中的同步(Synchronize)按钮,手动进行同步。如果一个同步进程已经正在同步的过程中的话,你就不能在上一个同步进程完成之前重新进行同步。
742 0

热门文章

最新文章