使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。

项目结构

我们将创建一个Java项目,该项目包含三个主要部分:

**Mapper类:**解析邮件日志,提取ID、状态和目标邮箱。

**Reducer类:**汇总Mapper输出的数据,生成最终结果。

*Driver类:**配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。

代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:

package org.example.mapReduce;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MailLogAnalysis {

    public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            if (line.contains("starting delivery")) {
                String[] parts = line.split(" ");
                String id = parts[3].replace(":", "");
                String targetEmail = parts[8];
                context.write(new Text(id), new Text("email," + targetEmail));
            }

            if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
                String status = "success";
                if (line.contains("failure")) {
                    status = "failure";
                }
                if (line.contains("bounce")) {
                    status = "bounce";
                }
                String[] parts = line.split(" ");
                String id = parts[2].replace(":", "");
                context.write(new Text(id), new Text("status," + status));
            }
        }
    }

    public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String email = "";
            String status = "failure";
            for (Text val : values) {
                String[] parts = val.toString().split(",", 2);
                if (parts[0].equals("email")) {
                    email = parts[1];
                } else if (parts[0].equals("status")) {
                    status = parts[1];
                }
            }
            context.write(key, new Text(status + "," + email));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Mail Log Analysis");
        job.setJarByClass(MailLogAnalysis.class);
        job.setMapperClass(MailLogMapper.class);
        job.setReducerClass(MailLogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

使用Hadoop MapReduce分析邮件日志

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。我们将通过一个具体的例子来实现这一目标。


项目结构

我们将创建一个Java项目,该项目包含三个主要部分:


Mapper类:解析邮件日志,提取ID、状态和目标邮箱。

Reducer类:汇总Mapper输出的数据,生成最终结果。

Driver类:配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


less
复制代码
@400000004faa61e21e8e3e24 starting delivery 1820: msg 850901 to remote sunkang@189.cn
@400000004faa61e536864a44 delivery 1820: success: 121.14.53.136_accepted_message./Remote_host_said:_250_Ok:_queued_as_43A2222C006/
@400000004faa61e70a73c60c delivery 1823: deferral: 210.32.157.174_failed_after_I_sent_the_message./Remote_host_said:_450_Requested_action_not_taken:_AQAAf5CrT+qlYqpPamRUAA–.7571S2,_please_try_again/
@400000004faa61e70a73c60c bounce 1824: 550 Mailbox not found

在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。


代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:


java
复制代码
package org.example.mapReduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MailLogAnalysis {
public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        if (line.contains("starting delivery")) {
            String[] parts = line.split(" ");
            String id = parts[3].replace(":", "");
            String targetEmail = parts[8];
            context.write(new Text(id), new Text("email," + targetEmail));
        }

        if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
            String status = "success";
            if (line.contains("failure")) {
                status = "failure";
            }
            if (line.contains("bounce")) {
                status = "bounce";
            }
            String[] parts = line.split(" ");
            String id = parts[2].replace(":", "");
            context.write(new Text(id), new Text("status," + status));
        }
    }
}

public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String email = "";
        String status = "failure";
        for (Text val : values) {
            String[] parts = val.toString().split(",", 2);
            if (parts[0].equals("email")) {
                email = parts[1];
            } else if (parts[0].equals("status")) {
                status = parts[1];
            }
        }
        context.write(key, new Text(status + "," + email));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Mail Log Analysis");
    job.setJarByClass(MailLogAnalysis.class);
    job.setMapperClass(MailLogMapper.class);
    job.setReducerClass(MailLogReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

代码解释

Mapper类

MailLogMapper类从日志中提取邮件的ID、目标邮箱和发送状态,并将这些信息作为键值对输出:


如果行包含"starting delivery",则提取邮件的ID和目标邮箱,并输出键值对<ID, email, 目标邮箱>。

如果行包含"success"、“failure"或"bounce”,则提取邮件的ID和发送状态,并输出键值对<ID, status, 发送状态>。

Reducer类

MailLogReducer类汇总Mapper输出的数据,生成最终的结果:


对于每个邮件ID,汇总对应的目标邮箱和发送状态。

输出包含ID、发送状态和目标邮箱的最终结果。

Driver类

MailLogAnalysis类配置和运行MapReduce作业:


设置作业名称、Mapper类和Reducer类。

设置输入路径和输出路径。

提交作业并等待完成。

MapReduce运行结果

总结

通过本文的示例,我们展示了如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态和目标邮箱。希望本文能为您的大数据处理和分析工作提供一些帮助。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
6天前
|
SQL 监控 中间件
【应急响应】拒绝服务&钓鱼指南&DDOS压力测试&邮件反制分析&应用日志
【应急响应】拒绝服务&钓鱼指南&DDOS压力测试&邮件反制分析&应用日志
|
7天前
|
存储 监控 安全
《SelectDB 新一代日志存储分析平台解决方案》白皮书重磅发布|立即下载
作为基于 Apache Doris 打造的现代化数据仓库,SelectDB 不拘泥于传统数仓的限制,针对日志数据的特点引入了多项创新性技术,使用户可基于 SelectDB 构建开放、高性能、低成本、统一的日志存储分析平台, 截至目前已在近百家行业内知名企业中落地。
《SelectDB 新一代日志存储分析平台解决方案》白皮书重磅发布|立即下载
|
12天前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
16 1
|
12天前
|
存储 分布式计算 Hadoop
Hadoop生态系统详解:HDFS与MapReduce编程
Apache Hadoop是大数据处理的关键,其核心包括HDFS(分布式文件系统)和MapReduce(并行计算框架)。HDFS为大数据存储提供高容错性和高吞吐量,采用主从结构,通过数据复制保证可靠性。MapReduce将任务分解为Map和Reduce阶段,适合大规模数据集的处理。通过代码示例展示了如何使用MapReduce实现Word Count功能。HDFS和MapReduce的结合,加上YARN的资源管理,构成处理和分析大数据的强大力量。了解和掌握这些基础对于有效管理大数据至关重要。【6月更文挑战第12天】
31 0
|
13天前
|
分布式计算 Java Hadoop
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
简单的java Hadoop MapReduce程序(计算平均成绩)从打包到提交及运行
13 0
|
22天前
|
SQL 监控 关系型数据库
|
17天前
|
SQL 数据采集 DataWorks
DataWorks产品使用合集之pyodps的线程限制是什么意思
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
17天前
|
DataWorks 数据可视化 安全
DataWorks产品使用合集之SLS日志中新增了存在iotId这个字段,同步的时候怎么手动增加
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
22天前
|
SQL 关系型数据库 MySQL
|
22天前
|
SQL 监控 关系型数据库