使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

使用Hadoop MapReduce分析邮件日志提取 id、状态 和 目标邮箱

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。

项目结构

我们将创建一个Java项目,该项目包含三个主要部分:

**Mapper类:**解析邮件日志,提取ID、状态和目标邮箱。

**Reducer类:**汇总Mapper输出的数据,生成最终结果。

*Driver类:**配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。

代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:

package org.example.mapReduce;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MailLogAnalysis {

    public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();

            if (line.contains("starting delivery")) {
                String[] parts = line.split(" ");
                String id = parts[3].replace(":", "");
                String targetEmail = parts[8];
                context.write(new Text(id), new Text("email," + targetEmail));
            }

            if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
                String status = "success";
                if (line.contains("failure")) {
                    status = "failure";
                }
                if (line.contains("bounce")) {
                    status = "bounce";
                }
                String[] parts = line.split(" ");
                String id = parts[2].replace(":", "");
                context.write(new Text(id), new Text("status," + status));
            }
        }
    }

    public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            String email = "";
            String status = "failure";
            for (Text val : values) {
                String[] parts = val.toString().split(",", 2);
                if (parts[0].equals("email")) {
                    email = parts[1];
                } else if (parts[0].equals("status")) {
                    status = parts[1];
                }
            }
            context.write(key, new Text(status + "," + email));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Mail Log Analysis");
        job.setJarByClass(MailLogAnalysis.class);
        job.setMapperClass(MailLogMapper.class);
        job.setReducerClass(MailLogReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

使用Hadoop MapReduce分析邮件日志

在大数据处理和分析的场景中,Hadoop MapReduce是一种常见且高效的工具。本文将展示如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态(成功、失败或退回)和目标邮箱。我们将通过一个具体的例子来实现这一目标。


项目结构

我们将创建一个Java项目,该项目包含三个主要部分:


Mapper类:解析邮件日志,提取ID、状态和目标邮箱。

Reducer类:汇总Mapper输出的数据,生成最终结果。

Driver类:配置和运行MapReduce作业。

数据格式

我们将处理的邮件日志示例如下:


less
复制代码
@400000004faa61e21e8e3e24 starting delivery 1820: msg 850901 to remote sunkang@189.cn
@400000004faa61e536864a44 delivery 1820: success: 121.14.53.136_accepted_message./Remote_host_said:_250_Ok:_queued_as_43A2222C006/
@400000004faa61e70a73c60c delivery 1823: deferral: 210.32.157.174_failed_after_I_sent_the_message./Remote_host_said:_450_Requested_action_not_taken:_AQAAf5CrT+qlYqpPamRUAA–.7571S2,_please_try_again/
@400000004faa61e70a73c60c bounce 1824: 550 Mailbox not found

在这些日志中,我们需要提取邮件的ID、发送状态(成功、失败或退回)和目标邮箱。


代码实现

以下是完整的Java代码,包含Mapper、Reducer和Driver类:


java
复制代码
package org.example.mapReduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MailLogAnalysis {
public static class MailLogMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();

        if (line.contains("starting delivery")) {
            String[] parts = line.split(" ");
            String id = parts[3].replace(":", "");
            String targetEmail = parts[8];
            context.write(new Text(id), new Text("email," + targetEmail));
        }

        if (line.contains("success") || line.contains("failure") || line.contains("bounce")) {
            String status = "success";
            if (line.contains("failure")) {
                status = "failure";
            }
            if (line.contains("bounce")) {
                status = "bounce";
            }
            String[] parts = line.split(" ");
            String id = parts[2].replace(":", "");
            context.write(new Text(id), new Text("status," + status));
        }
    }
}

public static class MailLogReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String email = "";
        String status = "failure";
        for (Text val : values) {
            String[] parts = val.toString().split(",", 2);
            if (parts[0].equals("email")) {
                email = parts[1];
            } else if (parts[0].equals("status")) {
                status = parts[1];
            }
        }
        context.write(key, new Text(status + "," + email));
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "Mail Log Analysis");
    job.setJarByClass(MailLogAnalysis.class);
    job.setMapperClass(MailLogMapper.class);
    job.setReducerClass(MailLogReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

代码解释

Mapper类

MailLogMapper类从日志中提取邮件的ID、目标邮箱和发送状态,并将这些信息作为键值对输出:


如果行包含"starting delivery",则提取邮件的ID和目标邮箱,并输出键值对<ID, email, 目标邮箱>。

如果行包含"success"、“failure"或"bounce”,则提取邮件的ID和发送状态,并输出键值对<ID, status, 发送状态>。

Reducer类

MailLogReducer类汇总Mapper输出的数据,生成最终的结果:


对于每个邮件ID,汇总对应的目标邮箱和发送状态。

输出包含ID、发送状态和目标邮箱的最终结果。

Driver类

MailLogAnalysis类配置和运行MapReduce作业:


设置作业名称、Mapper类和Reducer类。

设置输入路径和输出路径。

提交作业并等待完成。

MapReduce运行结果

总结

通过本文的示例,我们展示了如何使用Hadoop MapReduce来分析邮件日志,提取邮件的发送状态和目标邮箱。希望本文能为您的大数据处理和分析工作提供一些帮助。

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
|
1月前
|
分布式计算 资源调度 Hadoop
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
大数据-80 Spark 简要概述 系统架构 部署模式 与Hadoop MapReduce对比
64 2
|
10天前
|
存储 SQL 监控
|
10天前
|
运维 监控 安全
|
13天前
|
监控 关系型数据库 MySQL
分析慢查询日志
【10月更文挑战第29天】分析慢查询日志
34 3
|
13天前
|
监控 关系型数据库 数据库
怎样分析慢查询日志?
【10月更文挑战第29天】怎样分析慢查询日志?
32 2
|
1月前
|
存储 缓存 关系型数据库
MySQL事务日志-Redo Log工作原理分析
事务的隔离性和原子性分别通过锁和事务日志实现,而持久性则依赖于事务日志中的`Redo Log`。在MySQL中,`Redo Log`确保已提交事务的数据能持久保存,即使系统崩溃也能通过重做日志恢复数据。其工作原理是记录数据在内存中的更改,待事务提交时写入磁盘。此外,`Redo Log`采用简单的物理日志格式和高效的顺序IO,确保快速提交。通过不同的落盘策略,可在性能和安全性之间做出权衡。
1625 14
|
1月前
|
存储 消息中间件 大数据
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
大数据-69 Kafka 高级特性 物理存储 实机查看分析 日志存储一篇详解
35 4
|
1月前
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
148 6
|
15天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第27天】在大数据时代,数据湖技术凭借其灵活性和成本效益成为企业存储和分析大规模异构数据的首选。Hadoop和Spark作为数据湖技术的核心组件,通过HDFS存储数据和Spark进行高效计算,实现了数据处理的优化。本文探讨了Hadoop与Spark的最佳实践,包括数据存储、处理、安全和可视化等方面,展示了它们在实际应用中的协同效应。
59 2
|
16天前
|
存储 分布式计算 Hadoop
数据湖技术:Hadoop与Spark在大数据处理中的协同作用
【10月更文挑战第26天】本文详细探讨了Hadoop与Spark在大数据处理中的协同作用,通过具体案例展示了两者的最佳实践。Hadoop的HDFS和MapReduce负责数据存储和预处理,确保高可靠性和容错性;Spark则凭借其高性能和丰富的API,进行深度分析和机器学习,实现高效的批处理和实时处理。
56 1