高并发简单解决方案————redis队列缓存+mysql 批量入库(ThinkPhP)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 源码地址:https://github.com/Tinywan/PHP_Experience问题分析问题一:要求日志最好入库;但是,直接入库mysql确实扛不住,批量入库没有问题,done。【批量入库和直接入库性能差异】问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚。

源码地址:https://github.com/Tinywan/PHP_Experience

问题分析

  • 问题一:要求日志最好入库;但是,直接入库mysql确实扛不住,批量入库没有问题,done。【批量入库和直接入库性能差异】
  • 问题二:批量入库就需要有高并发的消息队列,决定采用redis list 仿真实现,而且方便回滚。
  • 问题三:日志量毕竟大,保存最近30条足矣,决定用php写个离线统计和清理脚本。

一、设计数据库表和存储

  • 考虑到log系统对数据库的性能更多一些,稳定性和安全性没有那么高,存储引擎自然是只支持select insert 没有索引的archive。如果确实有update需求,也可以采用myISAM。
  • 考虑到log是实时记录的所有数据,数量可能巨大,主键采用bigint,自增即可
  • 考虑到log系统以写为主,统计采用离线计算,字段均不要出现索引,因为一方面可能会影响插入数据效率,另外读时候会造成死锁,影响写数据。

二、redis存储数据形成消息队列

 /**
     * 使用队列生成reids测试数据
     * 成功:执行 RPUSH操作后,返回列表的长度:8
     */
    public function createRedisList($listKey = 'message01')
    {
        $redis = RedisInstance::MasterInstance();
        $redis->select(1);
        $message = [
            'type' => 'say',
            'userId' => $redis->incr('user_id'),
            'userName' => 'Tinywan' . mt_rand(100, 9999), //是否正在录像
            'userImage' => '/res/pub/user-default-w.png', //是否正在录像
            'openId' => 'openId' . mt_rand(100000, 9999999999999999),
            'roomId' => 'openId' . mt_rand(30, 50),
            'createTime' => date('Y-m-d H:i:s', time()),
            'content' => $redis->incr('content') //当前是否正在打流状态
        ];
        $rPushResul = $redis->rPush($listKey, json_encode($message)); //执行成功后返回当前列表的长度 9
        return $rPushResul;
    }

三、读取redis消息队列里面的数据,批量入库

第一种思路:

 /**
     * 消息Redis方法保存到Mysql数据库
     * @param string $liveKey
     */
    public function RedisSaveToMysql($listKey = 'message01')
    {
        if (empty($listKey)) {
            $result = ["errcode" => 500, "errmsg" => "this parameter is empty!"];
            exit(json_encode($result));
        }
        $redis = RedisInstance::MasterInstance();
        $redis->select(1);
        $redisInfo = $redis->lRange($listKey, 0, 5);
        $dataLength = $redis->lLen($listKey);
        $model = M("User");
        while ($dataLength > 65970) {
            try {
                $model->startTrans();
                $redis->watch($listKey);
                $arrList = [];
                foreach ($redisInfo as $key => $val) {
                    $arrList[] = array(
                        'username' => json_decode($val, true)['userName'],
                        'logintime' => json_decode($val, true)['createTime'],
                        'description' => json_decode($val, true)['content'],
                        'pido' => json_decode($val, true)['content']
                    );
                }
                $insertResult = $model->addAll($arrList);
                if (!$insertResult) {
                    $model->rollback();
                    $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!", 'data' => 'dataLength:' . $dataLength);
                    exit(json_encode($result));
                }
                $model->commit();
                $redis->lTrim($listKey, 6, -1);
                $redisInfo = $redis->lRange($listKey, 0, 5);
                $dataLength = $redis->lLen($listKey);
            } catch (Exception $e) {
                $model->rollback();
                $result = array("errcode" => 500, "errmsg" => "Data Insert into Fail!");
                exit(json_encode($result));
            }
        }
        $result = array("errcode" => 200, "errmsg" => "Data Insert into Success!", 'data' => 'dataLength:' . $dataLength . 'liveKey:' . $listKey);
        exit(json_encode($result));
    }

第二种思路(供参考,非框架) 

<?php
$redis_xx = new Redis();
$redis_xx->connect('ip', port);
$redis_xx->auth("password");

// 获取现有消息队列的长度
$count = 0;
$max = $redis_xx->lLen("call_log");

// 获取消息队列的内容,拼接sql
$insert_sql = "insert into fb_call_log (`interface_name`, `createtime`) values ";

// 回滚数组
$roll_back_arr = array();

while ($count < $max) {
    $log_info = $redis_cq01->lPop("call_log");
    $roll_back_arr = $log_info;
    if ($log_info == 'nil' || !isset($log_info)) {
        $insert_sql .= ";";
        break;
    }

    // 切割出时间和info
    $log_info_arr = explode("%", $log_info);
    $insert_sql .= " ('" . $log_info_arr[0] . "','" . $log_info_arr[1] . "'),";
    $count++;
}

// 判定存在数据,批量入库
if ($count != 0) {
    $link_2004 = mysql_connect('ip:port', 'user', 'password');
    if (!$link_2004) {
        die("Could not connect:" . mysql_error());
    }

    $crowd_db = mysql_select_db('fb_log', $link_2004);
    $insert_sql = rtrim($insert_sql, ",") . ";";
    $res = mysql_query($insert_sql);

    // 输出入库log和入库结果;
    echo date("Y-m-d H:i:s") . "insert " . $count . " log info result:";
    echo json_encode($res);
    echo "</br>\n";

    // 数据库插入失败回滚
    if (!$res) {
        foreach ($roll_back_arr as $k) {
            $redis_xx->rPush("call_log", $k);
        }
    }
    // 释放连接
    mysql_free_result($res);
    mysql_close($link_2004);
}
$redis_cq01->close();
?>

四、获取Redis数据缓存数据

 

 /**
     * [0]检查当前Redis是否连接成功
     * [1]获取数据,首先从Redis中去获取,没有的话再从数据库中去获取
     */
    public function findDataRedisOrMysql($listKey = 'message01')
    {
        //Check the current connection status 查看服务是否运行
        if (RedisInstance::MasterInstance() != false) {
            $redis = RedisInstance::MasterInstance();
            $redis->select(2);
            /**
             * 首先从Redis中去获取数据
             * lRange 获取为空的话,则表示没有数据,否则返回一个非空数组
             */
            $redisData = $redis->lRange($listKey, 0, 9);
            $resultData = [];
            if (!empty($redisData)) {
                $resultData['status_code'] = 200;
                $resultData['msg'] = 'Data Source from Redis Cache';
                foreach ($redisData as $key => $val) {
                    $resultData['listData'][] = json_decode($val, true);
                }
            } else {
                $resultData['redis_msg'] = 'Redis is Expire';
                $conditions = array('status' => ':status');
                $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select();
                if ($mysqlData) {
                    $resultData['status_code'] = 200;
                    $resultData['mysql_msg'] = 'Data Source from Mysql is Success';
                    $redis->select(2);
                    foreach ($mysqlData as $key => $val) {
                        $resultData['listData'][] = $val;
                        //写入Redis作为缓存
                        $redis->rPush($listKey, json_encode($val));
                    }
                    //同时设置一个过期时间
                    $redis->expire($listKey,30);
                } else {
                    $resultData['status_code'] = 500;
                    $resultData['mysql_msg'] = 'Data Source from Mysql is Fail';
                }
            }
        } else {
            $resultData['redis_msg'] = 'Redis server went away';
            $resultData['mysql_msg'] = 'Mysql Data2';
            $conditions = array('status' => ':status');
            $mysqlData = M('User')->where($conditions)->bind(':status', 1, \PDO::PARAM_STR)->select();
            foreach ($mysqlData as $key => $val) {
                $resultData['listData'][] = $val;
            }
        }
        homePrint($resultData);
    }

 

四、离线天级统计和清理数据脚本

<?php
/**
* static log :每天离线统计代码日志和删除五天前的日志
* */

// 离线统计
$link_2004 = mysql_connect('ip:port', 'user', 'pwd');
if (!$link_2004) {
    die("Could not connect:" . mysql_error());
}

$crowd_db = mysql_select_db('fb_log', $link_2004);

// 统计昨天的数据
$day_time = date("Y-m-d", time() - 60 * 60 * 24 * 1);
$static_sql = "get sql";

$res = mysql_query($static_sql, $link_2004);

// 获取结果入库略

// 清理15天之前的数据
$before_15_day = date("Y-m-d", time() - 60 * 60 * 24 * 15);
$delete_sql = "delete from xxx where createtime < '" . $before_15_day . "'";
try {
    $res = mysql_query($delete_sql);
}catch(Exception $e){
    echo json_encode($e)."\n";
    echo "delete result:".json_encode($res)."\n";
}

mysql_close($link_2004);
?>

五:代码部署

主要是部署,批量入库脚本的调用和天级统计脚本,crontab例行运行。

# 批量入库脚本
*/2 * * * * /home/cuihuan/xxx/lamp/php5/bin/php /home/cuihuan/xxx/batchLog.php >>/home/cuihuan/xxx/batchlog.log

# 天级统计脚本
0 5 * * * /home/cuihuan/xxx/php5/bin/php /home/cuihuan/xxx/staticLog.php >>/home/cuihuan/xxx/staticLog.log

总结:相对于其他复杂的方式处理高并发,这个解决方案简单有效:通过redis缓存抗压,mysql批量入库解决数据库瓶颈,离线计算解决统计数据,通过定期清理保证库的大小。

 

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
10天前
|
缓存 NoSQL 关系型数据库
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
|
11天前
|
存储 缓存 NoSQL
【赵渝强老师】基于Redis的旁路缓存架构
本文介绍了引入缓存后的系统架构,通过缓存可以提升访问性能、降低网络拥堵、减轻服务负载和增强可扩展性。文中提供了相关图片和视频讲解,并讨论了数据库读写分离、分库分表等方法来减轻数据库压力。同时,文章也指出了缓存可能带来的复杂度增加、成本提高和数据一致性问题。
【赵渝强老师】基于Redis的旁路缓存架构
|
16天前
|
消息中间件 缓存 NoSQL
Redis 高并发竞争 key ,如何解决这个难点?
本文主要探讨 Redis 在高并发场景下的并发竞争 Key 问题,以及较为常用的两种解决方案(分布式锁+时间戳、利用消息队列)。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Redis 高并发竞争 key ,如何解决这个难点?
|
19天前
|
缓存 NoSQL Redis
Redis 缓存使用的实践
《Redis缓存最佳实践指南》涵盖缓存更新策略、缓存击穿防护、大key处理和性能优化。包括Cache Aside Pattern、Write Through、分布式锁、大key拆分和批量操作等技术,帮助你在项目中高效使用Redis缓存。
103 22
|
18天前
|
缓存 NoSQL 中间件
redis高并发缓存中间件总结!
本文档详细介绍了高并发缓存中间件Redis的原理、高级操作及其在电商架构中的应用。通过阿里云的角度,分析了Redis与架构的关系,并展示了无Redis和使用Redis缓存的架构图。文档还涵盖了Redis的基本特性、应用场景、安装部署步骤、配置文件详解、启动和关闭方法、systemctl管理脚本的生成以及日志警告处理等内容。适合初学者和有一定经验的技术人员参考学习。
110 7
|
6月前
|
消息中间件 Java Linux
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
2024年最全BATJ真题突击:Java基础+JVM+分布式高并发+网络编程+Linux(1),2024年最新意外的惊喜
|
5月前
|
缓存 NoSQL Java
Java高并发实战:利用线程池和Redis实现高效数据入库
Java高并发实战:利用线程池和Redis实现高效数据入库
489 0
|
3月前
|
监控 算法 Java
企业应用面临高并发等挑战,优化Java后台系统性能至关重要
随着互联网技术的发展,企业应用面临高并发等挑战,优化Java后台系统性能至关重要。本文提供三大技巧:1)优化JVM,如选用合适版本(如OpenJDK 11)、调整参数(如使用G1垃圾收集器)及监控性能;2)优化代码与算法,减少对象创建、合理使用集合及采用高效算法(如快速排序);3)数据库优化,包括索引、查询及分页策略改进,全面提升系统效能。
48 0
|
5月前
|
存储 NoSQL Java
探索Java分布式锁:在高并发环境下的同步访问实现与优化
【6月更文挑战第30天】Java分布式锁在高并发下确保数据一致性,通过Redis的SETNX、ZooKeeper的临时节点、数据库操作等方式实现。优化策略包括锁超时重试、续期、公平性及性能提升,关键在于平衡同步与效率,适应大规模分布式系统的需求。
172 1
|
4月前
|
算法 Java 调度
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决
高并发架构设计三大利器:缓存、限流和降级问题之使用Java代码实现令牌桶算法问题如何解决