Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

1.项目介绍

本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒

  1. 数据来源【程序自己模拟数据的产生,交给Kafka的生产者】
  2. Kafka的消费者端接的是HBase数据库
  3. MapReduce读取HBase中的数据进行分析
  4. 再将分析的数据导入MySQL

2.各类介绍

Produce模块

  • DataProduce:主要负责生产数据
  • Main:函数的入口
  • testAPI:进行功能测试
  • KafkaUtils:将数据发送到topic

Consumer模块

  • Main:程序的入口
  • HBaseConsumer:消费者拉取数据
  • HBaseDao:HBase的客户端对象,创建表导入数据
  • HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作

Analysis模块

  • HashUtils:将每个Cell中的数据存入到HashMap中
  • MysqlUtils:主要是Mysql的连接操作
  • CountMap:计算每个用户之间的通话记录次数
  • DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作

3.项目各模块

项目分为三个模块,分别是produce、consumer、analysis

  • produce:实现数据生产
  • consumer:Kafka将数据写入HBase
  • analysis:利用MapReduce分析数据将结果导入Mysql

2.1 produce

2.1.1 entry

public class Main {
    public static void main(String[] args) throws ParseException, InterruptedException {
        //生产数据,发到Kafka
        KafkaUtils.writeDataIntoKafka();
    }
}

2.1.2 dataProduce

public String produce(String startTime, String endTime) throws ParseException {
        // 张三 李四 2021-2-3 13:43:25 1204
        initMetaData();
        //获得随机下标来获得拨打电话者
        int callerIndex = (int) (Math.random() * telePhone.size());
        String callerName = phoneToName.get(telePhone.get(callerIndex));
        //获得被拔打电话者
        int calleeIndex;
        do {
            calleeIndex = (int) (Math.random() * telePhone.size());
        } while (callerIndex == calleeIndex);
        String calleeName = phoneToName.get(telePhone.get(calleeIndex));
        //定义解析时间的对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        //定义起止时间
        Date startDate = null;
        Date endDate = null;
        //解析传入的时间字符串,将其转化成Date格式
        startDate = sdf.parse(startTime);
        endDate = sdf.parse(endTime);
        //获得一个时间戳,来初始打电话的时间
        long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
        Date resultDate = new Date(randomTs);
        //将初始化好的Date时间,转化成字符串
        String resultTimeString = sdf.format(resultDate);
        //随机初始化小时、分钟、秒
        int hour = (int) (Math.random() * 24);
        int minute = (int) (Math.random() * 60);
        int second = (int) (Math.random() * 60);
        //初始化具体时间,精确到小时、分钟、秒
        String specificTime = String.format(String.format("%02d", hour) + ":"
                + String.format("%02d", minute) + ":"
                + String.format("%02d", second));
        //定义时间跨度,表明电话的拨打时长
        int duration = (int) (Math.random() * 3600);
        //拼接结果 张三 李四 2021-2-3 13:43:25 1204
        String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration;
        return result;
    }

2.1.3 kafkaUtils

public static void writeDataIntoKafka() throws ParseException, InterruptedException {
        //定义配置对象
        Properties properties = new Properties();
        //定义主机名
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //字符串序列化的类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //Kafka的主题
        String topic = "telecom";
        //定义一个生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(properties);
        //循环发送数据到Kafka
        for (int i = 0; i < 1000; i++) {
            //按给定起止时间生成数据
            String value = dataProduce.produce("2021-1-1", "2021-5-1");
            //睡眠1秒
            Thread.sleep(1000);
            //创建ProducerRecord对象
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
            //发送数据
            producer.send(record);
        }
        //关闭资源
        producer.close();
    }

2.2 consumer

2.2.1 entry

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, ParseException {
        //创建HBase消费者
        HBaseConsumer hBaseConsumer = new HBaseConsumer();
        //从Kafka中获取数据输到HBase
        hBaseConsumer.getDataFromKafka();
    }
}

2.2.2 hbase

2.2.2.1 HBaseConsumer

public class HBaseConsumer {
    public void getDataFromKafka() throws InterruptedException, IOException, ParseException {
        //定义配置对象
        Properties properties = new Properties();
        //连接主机名
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //是否自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        //消费者组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
        //字符串序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        //消费者订阅主题
        consumer.subscribe(Arrays.asList("telecom"));
        //创建一个Dao对象,用于上传数据到HBase
        HBaseDao hBaseDao = new HBaseDao();
        //从Kafka拉取数据
        while (true) {
            //拉取的时间间隔
            ConsumerRecords<String,String> records = consumer.poll(100);
            //拉取数据输到HBase
            for (ConsumerRecord<String,String> record : records) {
                String value = record.value();
                System.out.println(value);
                Thread.sleep(1000);
                //上传数据
                hBaseDao.put(value);
            }
        }
    }
}

2.2.2.2 HBaseDao

public class HBaseDao {
    //命名空间
    private String nameSpace;
    //表名
    private String tableName;
    //配置对象
    public static Configuration conf;
    //表对象
    private Table table;
    //连接HBase对象
    private Connection connection;
    //解析日期对象
    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");
    //初始化配置对象
    static {
        conf = HBaseConfiguration.create();
    }
    public HBaseDao() throws IOException {
        nameSpace = "telecom";
        tableName = "teleRecord";
        connection = ConnectionFactory.createConnection(conf);
        if (!HBaseUtils.isExistTable(conf, tableName)) {
            HBaseUtils.initNamespace(conf, nameSpace);
            HBaseUtils.createTable(conf, tableName, "f1", "f2");
        }
        table = connection.getTable(TableName.valueOf(tableName));
    }
    //将数据导入HBase
    public void put(String value) throws ParseException, IOException {
        //将Kafka拉取的数据切分
        String[] splitValues = value.split(" ");
        String caller = splitValues[0];
        String callee = splitValues[1];
        String buildTime = splitValues[2];
        String specificTime = splitValues[3];
        String duration = splitValues[4];
        //2021-03-21 12:23:04
        buildTime = buildTime + " " + specificTime;
        //20210321122304   用于创建rowKey
        String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
        //时间戳
        String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());
        //获得rowKey
        String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration);
        //创建put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        //添加各列属性
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));
        //添加put
        table.put(put);
    }
}

2.2.3 hbaseUtils

public class HBaseUtils {
    //判断表是否存在
    public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean result = admin.tableExists(TableName.valueOf(tableName));
        admin.close();
        connection.close();
        return result;
    }
    //判断命名空间是否存在
    public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean result = false;
        admin.close();
        connection.close();
        return result;
    }
    //创建命名空间
    public static void initNamespace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        try {
            NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
            admin.createNamespace(descriptor);
        } catch (NamespaceExistException e) {
        } finally {
            admin.close();
            connection.close();
        }
    }
    //创建表
    public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String cf : cfs) {
            hTableDescriptor.addFamily(new HColumnDescriptor(cf));
        }
        admin.createTable(hTableDescriptor);
        admin.close();
        connection.close();
    }
    //创建RowKey
    public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) {
        StringBuilder rowKey = new StringBuilder();
        rowKey.append(caller + "_")
                .append(buildTime + "_")
                .append(callee + "_")
                .append(flag + "_")
                .append(duration);
        return rowKey.toString();
    }
}

2.3 analysis

2.3.1 hashUtils

public class HashUtils {
    public static void putValue(Cell cell, HashMap<String, String> hashMap) {
        //获取cell中的列名
        String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
        //获取每列的值
        String value = Bytes.toString(CellUtil.cloneValue(cell));
        //存入map
        hashMap.put(qualifier, value);
    }
}

2.3.2 mysqlUtils

public class MysqlUtils {
    public static Connection connection;
    public static String userName = "root";
    public static String passwd = "123456";
    public static PreparedStatement ps = null;
    //获得Connection对象
    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" +
                            "?useSSL=false" +
                            "&allowPublicKeyRetrieval=true" +
                            "&serverTimezone=UTC",
                    userName,
                    passwd);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    //清空表数据
    public static void deleteData(String tableName) throws SQLException {
        String sql = String.format("delete from %s", tableName);
        ps = connection.prepareStatement(sql);
        ps.executeUpdate();
    }
}

2.3.3 hbaseToMR

2.3.3.1 callCount

2.3.3.1.1 Map
public class CountMap extends TableMapper<Text, IntWritable> {
    //输出    张三      1
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashMap = new HashMap<>();
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }
        String caller = hashMap.get("caller");
        String callee = hashMap.get("callee");
        context.write(new Text(caller + "-" + callee), new IntWritable(1));
    }
}
2.3.3.1.2 Reduce
public class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> {
    //输出  张三    23
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += 1;
        }
        context.write(new DBWrite(key.toString(), count), NullWritable.get());
    }
}
2.3.3.1.3 Driver
public class CountDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "callcounts";
    //Mysql表中列名
    public static String[] fieldNames = {"callercallee", "counts"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(CountDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                CountMap.class,
                Text.class,
                IntWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(CountReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new CountDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2.3.3.1.4 DBWrite
public class DBWrite implements Writable, DBWritable {
    String caller_callee = "";
    int count = 0;
    public DBWrite(){}
    public DBWrite(String caller_callee, int count){
        this.caller_callee=caller_callee;
        this.count=count;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(caller_callee);
        out.writeInt(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.caller_callee = in.readUTF();
        this.count = in.readInt();
    }
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, caller_callee);
        preparedStatement.setInt(2, count);
    }
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.caller_callee = resultSet.getString(1);
        this.count = resultSet.getInt(2);
    }
}

2.3.3.2 callerDuration

2.3.3.2.1 Map
public class DurationMap extends TableMapper<Text, LongWritable> {
    //输出    张三     2041
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //创建HashMap对象,为了下面取出对应值用
        HashMap<String, String> hashMap = new HashMap<>();
        //迭代rowkey对应的每个单元
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }
        //获得电话发起人
        String caller = hashMap.get("caller");
        //获得每次电话时长
        String duration = hashMap.get("duration");
        //写出
        context.write(new Text(caller), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.2.2 Reduce
public class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出    张三     4204
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //存储每个人拨打电话的总时长
        long sum = 0;
        //迭代每个时长
        for (LongWritable value : values) {
            sum += value.get();
        }
        //将结果写出
        context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get());
    }
}
2.3.3.2.3 Driver
public class DurationDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "callerdurations";
    //Mysql表中列名
    public static String[] fieldNames = {"caller", "durations"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(DurationDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                DurationMap.class,
                Text.class,
                LongWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(DurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new DurationDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.3.3.3 dayCountDuration

2.3.3.3.1 Map
public class dayCountDurationMap extends TableMapper<Text, LongWritable> {
    //2021-01-13    3042
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashmap = new HashMap<>();
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashmap);
        }
        String date = hashmap.get("build_time").substring(0, 10);
        String duration = hashmap.get("duration");
        context.write(new Text(date), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.3.2 Reduce
public class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出 2021-01-13  2042
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long durations = 0;
        for (LongWritable value : values) {
            durations += value.get();
        }
        context.write(new DBWrite(key.toString(), durations), NullWritable.get());
    }
}
2.3.3.3.3 Driver
public class dayCountDurationDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "daydurations";
    //Mysql表中列名
    public static String[] fieldNames = {"date", "durations"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(dayCountDurationDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                dayCountDurationMap.class,
                Text.class,
                LongWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(dayCountDurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new dayCountDurationDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4 项目源码

Github地址


目录
相关文章
|
1月前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
88 4
|
12天前
|
缓存 关系型数据库 MySQL
MySQL 索引优化与慢查询优化:原理与实践
通过本文的介绍,希望您能够深入理解MySQL索引优化与慢查询优化的原理和实践方法,并在实际项目中灵活运用这些技术,提升数据库的整体性能。
42 5
|
25天前
|
存储 关系型数据库 MySQL
PHP与MySQL动态网站开发:从基础到实践####
本文将深入探讨PHP与MySQL的结合使用,展示如何构建一个动态网站。通过一系列实例和代码片段,我们将逐步了解数据库连接、数据操作、用户输入处理及安全防护等关键技术点。无论您是初学者还是有经验的开发者,都能从中获益匪浅。 ####
|
27天前
|
关系型数据库 MySQL 网络安全
DBeaver连接MySQL提示Access denied for user ‘‘@‘ip‘ (using password: YES)
“Access denied for user ''@'ip' (using password: YES)”错误通常与MySQL用户权限配置或网络设置有关。通过检查并正确配置用户名和密码、用户权限、MySQL配置文件及防火墙设置,可以有效解决此问题。希望本文能帮助您成功连接MySQL数据库。
41 4
|
1月前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
33 2
|
1月前
|
关系型数据库 MySQL Linux
Linux环境下MySQL数据库自动定时备份实践
数据库备份是确保数据安全的重要措施。在Linux环境下,实现MySQL数据库的自动定时备份可以通过多种方式完成。本文将介绍如何使用`cron`定时任务和`mysqldump`工具来实现MySQL数据库的每日自动备份。
109 3
|
1月前
|
存储 监控 关系型数据库
MySQL自增ID耗尽解决方案:应对策略与实践技巧
在MySQL数据库中,自增ID(AUTO_INCREMENT)是一种特殊的属性,用于自动为新插入的行生成唯一的标识符。然而,当自增ID达到其最大值时,会发生什么?又该如何解决?本文将探讨MySQL自增ID耗尽的问题,并提供一些实用的解决方案。
43 1
|
1月前
|
安全 关系型数据库 MySQL
【赵渝强老师】MySQL的连接方式
本文介绍了MySQL数据库服务器启动后的三种连接方式:本地连接、远程连接和安全连接。详细步骤包括使用root用户登录、修改密码、创建新用户、授权及配置SSL等。并附有视频讲解,帮助读者更好地理解和操作。
168 1
|
20天前
|
SQL 关系型数据库 MySQL
PHP与MySQL的高效交互:从基础到实践####
本文深入探讨了PHP与MySQL数据库之间的高效交互技术,涵盖了从基础连接到高级查询优化的全过程。不同于传统的摘要概述,这里我们直接以一段精简代码示例作为引子,展示如何在PHP中实现与MySQL的快速连接与简单查询,随后文章将围绕这一核心,逐步展开详细讲解,旨在为读者提供一个从入门到精通的实战指南。 ```php <?php // 数据库配置信息 $servername = "localhost"; $username = "root"; $password = "password"; $dbname = "test_db"; // 创建连接 $conn = new mysqli($se
23 0
|
2月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:百万级数据统计优化实践
【10月更文挑战第21天】 在处理大规模数据集时,传统的单体数据库解决方案往往力不从心。MySQL和Redis的组合提供了一种高效的解决方案,通过将数据库操作与高速缓存相结合,可以显著提升数据处理的性能。本文将分享一次实际的优化案例,探讨如何利用MySQL和Redis共同实现百万级数据统计的优化。
115 9