Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

本文涉及的产品
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: Kafka-HBase-MapReduce-Mysql 连接实践 通话记录

1.项目介绍

本项目采用的数据为通话记录数据,例(张三 李四 2021-4-23 12:32:13 2942)意思是张三在2021-4-23 12:32:13这个时间给李四通话,通话时长为2942秒

  1. 数据来源【程序自己模拟数据的产生,交给Kafka的生产者】
  2. Kafka的消费者端接的是HBase数据库
  3. MapReduce读取HBase中的数据进行分析
  4. 再将分析的数据导入MySQL

2.各类介绍

Produce模块

  • DataProduce:主要负责生产数据
  • Main:函数的入口
  • testAPI:进行功能测试
  • KafkaUtils:将数据发送到topic

Consumer模块

  • Main:程序的入口
  • HBaseConsumer:消费者拉取数据
  • HBaseDao:HBase的客户端对象,创建表导入数据
  • HBaseUtils:主要是创建RowKey,还有一些建表和命名空间的操作

Analysis模块

  • HashUtils:将每个Cell中的数据存入到HashMap中
  • MysqlUtils:主要是Mysql的连接操作
  • CountMap:计算每个用户之间的通话记录次数
  • DBWrite:实现了Writable、DBWritable,用于序列化以及写数据库操作

3.项目各模块

项目分为三个模块,分别是produce、consumer、analysis

  • produce:实现数据生产
  • consumer:Kafka将数据写入HBase
  • analysis:利用MapReduce分析数据将结果导入Mysql

2.1 produce

2.1.1 entry

public class Main {
    public static void main(String[] args) throws ParseException, InterruptedException {
        //生产数据,发到Kafka
        KafkaUtils.writeDataIntoKafka();
    }
}

2.1.2 dataProduce

public String produce(String startTime, String endTime) throws ParseException {
        // 张三 李四 2021-2-3 13:43:25 1204
        initMetaData();
        //获得随机下标来获得拨打电话者
        int callerIndex = (int) (Math.random() * telePhone.size());
        String callerName = phoneToName.get(telePhone.get(callerIndex));
        //获得被拔打电话者
        int calleeIndex;
        do {
            calleeIndex = (int) (Math.random() * telePhone.size());
        } while (callerIndex == calleeIndex);
        String calleeName = phoneToName.get(telePhone.get(calleeIndex));
        //定义解析时间的对象
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        //定义起止时间
        Date startDate = null;
        Date endDate = null;
        //解析传入的时间字符串,将其转化成Date格式
        startDate = sdf.parse(startTime);
        endDate = sdf.parse(endTime);
        //获得一个时间戳,来初始打电话的时间
        long randomTs = startDate.getTime() + (long) ((endDate.getTime() - startDate.getTime()) * Math.random());
        Date resultDate = new Date(randomTs);
        //将初始化好的Date时间,转化成字符串
        String resultTimeString = sdf.format(resultDate);
        //随机初始化小时、分钟、秒
        int hour = (int) (Math.random() * 24);
        int minute = (int) (Math.random() * 60);
        int second = (int) (Math.random() * 60);
        //初始化具体时间,精确到小时、分钟、秒
        String specificTime = String.format(String.format("%02d", hour) + ":"
                + String.format("%02d", minute) + ":"
                + String.format("%02d", second));
        //定义时间跨度,表明电话的拨打时长
        int duration = (int) (Math.random() * 3600);
        //拼接结果 张三 李四 2021-2-3 13:43:25 1204
        String result = callerName + " " + calleeName + " " + resultTimeString + " " + specificTime + " " + duration;
        return result;
    }

2.1.3 kafkaUtils

public static void writeDataIntoKafka() throws ParseException, InterruptedException {
        //定义配置对象
        Properties properties = new Properties();
        //定义主机名
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //字符串序列化的类
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //Kafka的主题
        String topic = "telecom";
        //定义一个生产者对象
        KafkaProducer producer = new KafkaProducer<String, String>(properties);
        //循环发送数据到Kafka
        for (int i = 0; i < 1000; i++) {
            //按给定起止时间生成数据
            String value = dataProduce.produce("2021-1-1", "2021-5-1");
            //睡眠1秒
            Thread.sleep(1000);
            //创建ProducerRecord对象
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
            //发送数据
            producer.send(record);
        }
        //关闭资源
        producer.close();
    }

2.2 consumer

2.2.1 entry

public class Main {
    public static void main(String[] args) throws IOException, InterruptedException, ParseException {
        //创建HBase消费者
        HBaseConsumer hBaseConsumer = new HBaseConsumer();
        //从Kafka中获取数据输到HBase
        hBaseConsumer.getDataFromKafka();
    }
}

2.2.2 hbase

2.2.2.1 HBaseConsumer

public class HBaseConsumer {
    public void getDataFromKafka() throws InterruptedException, IOException, ParseException {
        //定义配置对象
        Properties properties = new Properties();
        //连接主机名
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        //是否自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自动提交的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        //消费者组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test3");
        //字符串序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //创建消费者对象
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        //消费者订阅主题
        consumer.subscribe(Arrays.asList("telecom"));
        //创建一个Dao对象,用于上传数据到HBase
        HBaseDao hBaseDao = new HBaseDao();
        //从Kafka拉取数据
        while (true) {
            //拉取的时间间隔
            ConsumerRecords<String,String> records = consumer.poll(100);
            //拉取数据输到HBase
            for (ConsumerRecord<String,String> record : records) {
                String value = record.value();
                System.out.println(value);
                Thread.sleep(1000);
                //上传数据
                hBaseDao.put(value);
            }
        }
    }
}

2.2.2.2 HBaseDao

public class HBaseDao {
    //命名空间
    private String nameSpace;
    //表名
    private String tableName;
    //配置对象
    public static Configuration conf;
    //表对象
    private Table table;
    //连接HBase对象
    private Connection connection;
    //解析日期对象
    private SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
    private SimpleDateFormat sdf2 = new SimpleDateFormat("yyyyMMddHHmmss");
    //初始化配置对象
    static {
        conf = HBaseConfiguration.create();
    }
    public HBaseDao() throws IOException {
        nameSpace = "telecom";
        tableName = "teleRecord";
        connection = ConnectionFactory.createConnection(conf);
        if (!HBaseUtils.isExistTable(conf, tableName)) {
            HBaseUtils.initNamespace(conf, nameSpace);
            HBaseUtils.createTable(conf, tableName, "f1", "f2");
        }
        table = connection.getTable(TableName.valueOf(tableName));
    }
    //将数据导入HBase
    public void put(String value) throws ParseException, IOException {
        //将Kafka拉取的数据切分
        String[] splitValues = value.split(" ");
        String caller = splitValues[0];
        String callee = splitValues[1];
        String buildTime = splitValues[2];
        String specificTime = splitValues[3];
        String duration = splitValues[4];
        //2021-03-21 12:23:04
        buildTime = buildTime + " " + specificTime;
        //20210321122304   用于创建rowKey
        String buildTimeReplace = sdf2.format(sdf1.parse(buildTime));
        //时间戳
        String buildTimeTs = String.valueOf(sdf1.parse(buildTime).getTime());
        //获得rowKey
        String rowKey = HBaseUtils.createRowKey(caller, callee, buildTimeReplace, "1", duration);
        //创建put对象
        Put put = new Put(Bytes.toBytes(rowKey));
        //添加各列属性
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time"), Bytes.toBytes(buildTime));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("build_time_ts"), Bytes.toBytes(buildTimeTs));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("flag"), Bytes.toBytes("1"));
        put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("duration"), Bytes.toBytes(duration));
        //添加put
        table.put(put);
    }
}

2.2.3 hbaseUtils

public class HBaseUtils {
    //判断表是否存在
    public static boolean isExistTable(Configuration conf, String tableName) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean result = admin.tableExists(TableName.valueOf(tableName));
        admin.close();
        connection.close();
        return result;
    }
    //判断命名空间是否存在
    public static boolean isExistTableSpace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        boolean result = false;
        admin.close();
        connection.close();
        return result;
    }
    //创建命名空间
    public static void initNamespace(Configuration conf, String nameSpace) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        try {
            NamespaceDescriptor descriptor = NamespaceDescriptor.create(nameSpace).build();
            admin.createNamespace(descriptor);
        } catch (NamespaceExistException e) {
        } finally {
            admin.close();
            connection.close();
        }
    }
    //创建表
    public static void createTable(Configuration conf, String tableName, String... cfs) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String cf : cfs) {
            hTableDescriptor.addFamily(new HColumnDescriptor(cf));
        }
        admin.createTable(hTableDescriptor);
        admin.close();
        connection.close();
    }
    //创建RowKey
    public static String createRowKey(String caller, String callee, String buildTime, String flag, String duration) {
        StringBuilder rowKey = new StringBuilder();
        rowKey.append(caller + "_")
                .append(buildTime + "_")
                .append(callee + "_")
                .append(flag + "_")
                .append(duration);
        return rowKey.toString();
    }
}

2.3 analysis

2.3.1 hashUtils

public class HashUtils {
    public static void putValue(Cell cell, HashMap<String, String> hashMap) {
        //获取cell中的列名
        String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
        //获取每列的值
        String value = Bytes.toString(CellUtil.cloneValue(cell));
        //存入map
        hashMap.put(qualifier, value);
    }
}

2.3.2 mysqlUtils

public class MysqlUtils {
    public static Connection connection;
    public static String userName = "root";
    public static String passwd = "123456";
    public static PreparedStatement ps = null;
    //获得Connection对象
    static {
        try {
            Class.forName("com.mysql.cj.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        try {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb" +
                            "?useSSL=false" +
                            "&allowPublicKeyRetrieval=true" +
                            "&serverTimezone=UTC",
                    userName,
                    passwd);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    //清空表数据
    public static void deleteData(String tableName) throws SQLException {
        String sql = String.format("delete from %s", tableName);
        ps = connection.prepareStatement(sql);
        ps.executeUpdate();
    }
}

2.3.3 hbaseToMR

2.3.3.1 callCount

2.3.3.1.1 Map
public class CountMap extends TableMapper<Text, IntWritable> {
    //输出    张三      1
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashMap = new HashMap<>();
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }
        String caller = hashMap.get("caller");
        String callee = hashMap.get("callee");
        context.write(new Text(caller + "-" + callee), new IntWritable(1));
    }
}
2.3.3.1.2 Reduce
public class CountReduce extends Reducer<Text, IntWritable, DBWrite, NullWritable> {
    //输出  张三    23
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += 1;
        }
        context.write(new DBWrite(key.toString(), count), NullWritable.get());
    }
}
2.3.3.1.3 Driver
public class CountDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "callcounts";
    //Mysql表中列名
    public static String[] fieldNames = {"callercallee", "counts"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(CountDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                CountMap.class,
                Text.class,
                IntWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(CountReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new CountDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
2.3.3.1.4 DBWrite
public class DBWrite implements Writable, DBWritable {
    String caller_callee = "";
    int count = 0;
    public DBWrite(){}
    public DBWrite(String caller_callee, int count){
        this.caller_callee=caller_callee;
        this.count=count;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(caller_callee);
        out.writeInt(count);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        this.caller_callee = in.readUTF();
        this.count = in.readInt();
    }
    @Override
    public void write(PreparedStatement preparedStatement) throws SQLException {
        preparedStatement.setString(1, caller_callee);
        preparedStatement.setInt(2, count);
    }
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        this.caller_callee = resultSet.getString(1);
        this.count = resultSet.getInt(2);
    }
}

2.3.3.2 callerDuration

2.3.3.2.1 Map
public class DurationMap extends TableMapper<Text, LongWritable> {
    //输出    张三     2041
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //创建HashMap对象,为了下面取出对应值用
        HashMap<String, String> hashMap = new HashMap<>();
        //迭代rowkey对应的每个单元
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashMap);
        }
        //获得电话发起人
        String caller = hashMap.get("caller");
        //获得每次电话时长
        String duration = hashMap.get("duration");
        //写出
        context.write(new Text(caller), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.2.2 Reduce
public class DurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出    张三     4204
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //存储每个人拨打电话的总时长
        long sum = 0;
        //迭代每个时长
        for (LongWritable value : values) {
            sum += value.get();
        }
        //将结果写出
        context.write(new DBWrite(key.toString(), String.valueOf(sum)), NullWritable.get());
    }
}
2.3.3.2.3 Driver
public class DurationDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "callerdurations";
    //Mysql表中列名
    public static String[] fieldNames = {"caller", "durations"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(DurationDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                DurationMap.class,
                Text.class,
                LongWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(DurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new DurationDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.3.3.3 dayCountDuration

2.3.3.3.1 Map
public class dayCountDurationMap extends TableMapper<Text, LongWritable> {
    //2021-01-13    3042
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        HashMap<String, String> hashmap = new HashMap<>();
        for (Cell cell : value.rawCells()) {
            HashUtils.putValue(cell, hashmap);
        }
        String date = hashmap.get("build_time").substring(0, 10);
        String duration = hashmap.get("duration");
        context.write(new Text(date), new LongWritable(Long.valueOf(duration)));
    }
}
2.3.3.3.2 Reduce
public class dayCountDurationReduce extends Reducer<Text, LongWritable, DBWrite, NullWritable> {
    //输出 2021-01-13  2042
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long durations = 0;
        for (LongWritable value : values) {
            durations += value.get();
        }
        context.write(new DBWrite(key.toString(), durations), NullWritable.get());
    }
}
2.3.3.3.3 Driver
public class dayCountDurationDriver implements Tool {
    //配置对象
    public static Configuration conf = null;
    //Mysql数据库表名
    public static String mysqlTableName = "daydurations";
    //Mysql表中列名
    public static String[] fieldNames = {"date", "durations"};
    //Mysql驱动类
    public static String driverClass = "com.mysql.cj.jdbc.Driver";
    //Mysql的URL
    public static String dbUrl = "jdbc:mysql://localhost:3306/mydb" +
            "?useSSL=false" +
            "&allowPublicKeyRetrieval=true" +
            "&serverTimezone=UTC";
    //Mysql的用户名
    public static String userName = "root";
    //Mysql的用户密码
    public static String passwd = "123456";
    @Override
    public int run(String[] strings) throws Exception {
        //配置Mysql
        DBConfiguration.configureDB(conf, driverClass, dbUrl, userName, passwd);
        //清空表
        MysqlUtils.deleteData(mysqlTableName);
        //获得job对象
        Job job = Job.getInstance(conf);
        //关联Jar
        job.setJarByClass(dayCountDurationDriver.class);
        //配置MapperJob
        TableMapReduceUtil.initTableMapperJob("teleRecord",
                new Scan(),
                dayCountDurationMap.class,
                Text.class,
                LongWritable.class,
                job);
        //关联Reduce类
        job.setReducerClass(dayCountDurationReduce.class);
        job.setOutputKeyClass(DBWrite.class);
        job.setOutputValueClass(NullWritable.class);
        //设置输出类型
        job.setOutputFormatClass(DBOutputFormat.class);
        DBOutputFormat.setOutput(job, mysqlTableName, fieldNames);
        //提交job任务
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
    @Override
    public void setConf(Configuration configuration) {
        conf = configuration;
        conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104");
    }
    @Override
    public Configuration getConf() {
        return conf;
    }
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            int run = ToolRunner.run(conf, new dayCountDurationDriver(), args);
            System.exit(run);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4 项目源码

Github地址


目录
相关文章
|
25天前
|
分布式计算 关系型数据库 数据处理
Dataphin常见问题之没有建表的权限如何解决
Dataphin是阿里云提供的一站式数据处理服务,旨在帮助企业构建一体化的智能数据处理平台。Dataphin整合了数据建模、数据处理、数据开发、数据服务等多个功能,支持企业更高效地进行数据治理和分析。
|
1月前
|
关系型数据库 MySQL 数据库连接
连接和管理RDS
连接和管理RDS
24 2
|
1月前
|
SQL 分布式计算 关系型数据库
阿里云E-MapReduce Trino专属集群外连引擎及权限控制踩坑实践
本文以云厂商售后技术支持的角度,从客户的需求出发,对于阿里云EMR-Trino集群的选型,外连多引擎的场景、Ldap以及Kerberos鉴权等问题进行了简要的实践和记录,模拟客户已有的业务场景,满足客户需求的同时对过程中的问题点进行解决、记录和分析,包括但不限于Mysql、ODPS、Hive connector的配置,Hive、Delta及Hudi等不同表格式读取的兼容,aws s3、阿里云 oss协议访问异常的解决等。
|
1月前
|
Ubuntu 关系型数据库 MySQL
【MySQL】Navicat/SQLyog连接Ubuntu中的数据库(MySQL)
【MySQL】Navicat/SQLyog连接Ubuntu中的数据库(MySQL)
|
1月前
|
关系型数据库 MySQL 数据安全/隐私保护
关于Navicat Premium连接MySQL出现2059错误解决方法
关于Navicat Premium连接MySQL出现2059错误解决方法
|
1月前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
136 4
|
27天前
|
SQL 关系型数据库 MySQL
阿里云MySQL数据库价格、购买、创建账号密码和连接数据库教程
阿里云数据库使用指南:购买MySQL、SQL Server等RDS实例,选择配置和地区,完成支付。创建数据库和账号,设置权限。通过DMS登录数据库,使用账号密码访问。同地域VPC内的ECS需将IP加入白名单以实现内网连接。参考链接提供详细步骤。
367 3
|
4天前
|
SQL 关系型数据库 MySQL
DQL语言之连接查询(mysql)
DQL语言之连接查询(mysql)
|
7天前
|
关系型数据库 MySQL 数据安全/隐私保护
MySQL 安装及连接
MySQL 安装及连接
24 0
|
16天前
|
存储 关系型数据库 MySQL
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)
MySQL 查询优化:提速查询效率的13大秘籍(避免使用SELECT *、分页查询的优化、合理使用连接、子查询的优化)(上)