Hadoop MapReduce编程 API入门系列之Crime数据分析(二十五)(未完)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介:

  一共12列,我们只需提取有用的列:第二列(犯罪类型)、第四列(一周的哪一天)、第五列(具体时间)和第七列(犯罪场所)。

 

 

思路分析

        基于项目的需求,我们通过以下几步完成:

1、首先根据数据集,分别统计出不同犯罪类别在周时段内发生犯罪次数和不同区域在周时段内发生犯罪的次数。

2、然后根据第一步的输出结果,再按日期统计出每天每种犯罪类别在每个区域发生的犯罪次数。

3、将前两步的输出结果,按需求插入数据库,便于对犯罪数据的分析。

 

 

程序开发

        我们要编写5个文件:

编写基类,MapReduceJobBase.java

数据处理类,DataFile.java

编写第一个任务类,SanFranciscoCrime.java

编写第二个任务类,SanFranciscoCrimePrepOlap.java

编写第三个任务,插入数据库类,LoadStarDB.java

 

 

 

 

 

    Hive那边的 数据库首先需要创建4个表,

分别为:category(name,cid)、

district(name,did)、

fact(fid,district_id,category_id,time_id,crimes)和

timeperiod(tpid,year,month,week,day)。

 

 

 

 

编译和执行MapReduce作业

  1、myclipse将项目编译和打包为crime.jar,使用SSH将crime.jar上传至hadoop的/home/hadoop/目录下。

  2、使用cd /home/hadoop/djt 切换到当前目录,通过命令行执行任务。

         2.1 首先执行第一个作业 SanFranciscoCrime.java。

hadoop    jar    crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrime

     2.2    然后执行第二个作业SanFranciscoCrimePrepOlap.java。

hadoop    jar    crime.jar    zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrimePrepOlap

       2.3      最后执行第三个作业LoadStarDB.java,将数据插入数据库。

hadoop     jar     crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.LoadStarDB

 

 

 

 

 

运行结果

        任务的最终结果插入数据库,数据结果如下图所示。字段分别为:区域主键district_id、类别主键category_id、时间主键time_id、犯罪次数crimes和主键fid。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;
import java.text.DateFormat;


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configured;

/**

* @function 在 MapReduce 基类中,定义基础成员变量,减少 MapReduce 主类的工作量


*/
public class MapReduceJobBase extends Configured
{

/**
* 犯罪类型在犯罪数据数组的下标为1的位置
*/
protected static final int CATEGORY_COLUMN_INDEX = 1;

/**
* 礼拜几在犯罪数据数组的下标为3的位置
*/
protected static final int DAY_OF_WEEK_COLUMN_INDEX = 3;

/**
* 日期在犯罪数据数组的下标为4的位置
*/
protected static final int DATE_COLUMN_INDEX = 4;

/**
* 犯罪区域在犯罪数据数组的下标为6的位置
*/
protected static final int DISTRICT_COLUMN_INDEX = 6;

/**
* 定义日期的数据格式
*/
protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");

/**
* 定义 map/reduce job结果中,日期的输出格式
*/
protected static final DateFormat outputDateFormat = new SimpleDateFormat("yyyy/MM/dd");

/**
* @function 将字符串格式的日期转换为自定义Date类型的日期
* @param value 包含完整的日期字符串
* @return Date类型的日期
* @throws ParseException
*/
protected static Date getDate(String value) throws ParseException 
{
Date retVal = null;
String[] dp = value.split(" ");
if (dp.length > 0) {
retVal = df.parse(dp[0]);
}
return retVal;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.opencsv.CSVReader;

 

/**

* @function 从 map/reduce的输出结果中读取并提取数据


*/
public abstract class DataFile 
{

/**
* @function 从 map/reduce job 的输出结果,提取key值集合
* @param fn HDFS上的文件路径
* @return list key值的集合
* @throws IOException
*/
public static List<String> extractKeys(String fn,FileSystem fs) throws IOException 
{
FSDataInputStream in = fs.open(new Path(fn));//打开文件
List<String> retVal = new ArrayList<String>();//新建存储key值的集合list
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = br.readLine();//按行读取数据
while (line != null) 
{
String[] lp = line.split("\t");
if (lp.length > 0) 
{
retVal.add(lp[0]);//提取每行的第一个字段key
}
line = br.readLine();
}
br.close();
Collections.sort(retVal);//对key值进行排序
return retVal;
}

/**
* @function 将 csv文件格式的每行内容转换为数组返回
* @param 读取的一行数据
* @return array 数组
* @throws IOException
*/
public static String[] getColumns(String line) throws IOException
{
CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.getBytes())));
String[] retVal = reader.readNext();
reader.close();
return retVal;
}

}

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.text.MessageFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**

* 时段系统(bucketed system),在物料需求计划(MRP)、配销资源规划(DRP)或其他时程化(time-phased)的系统里,
* 所有时程化的资料都累积在同一时期,或称时段(buchet)。如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。
* 周时段(weekly buckets)即是一种以周为单位的统计方式
* @function 统计每个事件在每个周时段内发生的次数

*
*/
public class SanFranciscoCrime extends MapReduceJobBase implements Tool 
{

private static Logger log = Logger
.getLogger(SanFranciscoCrime.class.getCanonicalName());

/**
* CrimeMapper是一个公共的父类
*/
public static class CrimeMapper extends Mapper<LongWritable, Text, Text, Text>
{

protected int keyID = 0;

protected int valueID = 0;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
{
String line = value.toString();
try {
String[] col = DataFile.getColumns(line);
if (col != null)
{
// 防止数组超界
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
//过滤文件第一行头部名称
if (!"date".equalsIgnoreCase(col[valueID])) 
{
Text tk = new Text();
tk.set(col[keyID]);
Text tv = new Text();
tv.set(col[valueID]);
context.write(tk, tv);
}
} else 
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} else 
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} catch (NumberFormatException nfe)
{
log.log(Level.WARNING, MessageFormat
.format("Expected {0} to be a number.\n",
new Object[] { line }), nfe);
} catch (IOException e) {
log.log(Level.WARNING, MessageFormat.format(
"Cannot parse {0} into columns.\n",
new Object[] { line }), e);
}
}
}

/**
* 输出key为犯罪类别,value为日期
*/
public static class CategoryMapByDate extends CrimeMapper 
{
public CategoryMapByDate() 
{
keyID = CATEGORY_COLUMN_INDEX;//key为犯罪类别
valueID = DATE_COLUMN_INDEX;//value为日期
}
}

/**
* 输出key为犯罪区域,value为日期
*/
public static class DistrictMapByDate extends CrimeMapper
{
public DistrictMapByDate()
{
keyID = DISTRICT_COLUMN_INDEX;//key为犯罪区域
valueID = DATE_COLUMN_INDEX;//value为日期
}
}

/**
* 统计并解析 Mapper 端的输出结果
*/
public static class CrimeReducerByWeek extends Reducer<Text, Text, Text, Text>
{

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{

List<String> incidents = new ArrayList<String>();
// 将values放入incidents列表中
for (Text value : values)
{
incidents.add(value.toString());
}
if (incidents.size() > 0)
{
//对incidents列表排序
Collections.sort(incidents);
java.util.Map<Integer, Integer> weekSummary = new HashMap<Integer, Integer>();
//因为是对1-3月数据分析,周时段(weekly buckets)最大为15,所以weekSummary长度为15即可
for (int i = 0; i < 16; i++)
{
weekSummary.put(i, 0);
}
//统计每个周时段(weekly buckets)内,该事件发生的次数
for (String incidentDay : incidents) 
{
try 
{
Date d = getDate(incidentDay);
Calendar cal = Calendar.getInstance();
cal.setTime(d);
int week = cal.get(Calendar.WEEK_OF_MONTH);//这个月的第几周
int month = cal.get(Calendar.MONTH);//第几个月,从0开始
//如果累积的时间是以周为时间单位,此系统就称为周时段(weekly buckets)。
//周时段的计算公式,最大为15,它只是一种统计方式,不必深究
int bucket = (month * 5) + week;
//统计每个周时段内,该事件发生的次数
if (weekSummary.containsKey(bucket)) 
{
weekSummary.put(bucket, new Integer(weekSummary
.get(bucket).intValue() + 1));
} else
{
weekSummary.put(bucket, new Integer(1));
}
} catch (ParseException pe)
{
log.warning(MessageFormat.format("Invalid date {0}",
new Object[] { incidentDay }));
}
}
// 将该事件在每个周时段内发生的次数生成字符串输出
StringBuffer rpt = new StringBuffer();
boolean first = true;
for (int week : weekSummary.keySet())
{
if (first) 
{
first = false;
} else 
{
rpt.append(",");
}
rpt.append(new Integer(weekSummary.get(week)).toString());
}
String list = rpt.toString();
Text tv = new Text();
tv.set(list);
//value为0-15周时段内,该事件发生的次数
context.write(key, tv);
}
}
}
@Override
public int run(String[] args) throws Exception 
{
Configuration conf1 = new Configuration();

Path out1 = new Path(args[1]);

FileSystem hdfs1 = out1.getFileSystem(conf1);
if (hdfs1.isDirectory(out1)) 
{
hdfs1.delete(out1, true);
}

// 任务1
Job job1 = new Job(conf1, "crime");
job1.setJarByClass(SanFranciscoCrime.class);

job1.setMapperClass(CategoryMapByDate.class);
job1.setReducerClass(CrimeReducerByWeek.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
// 任务2
Configuration conf2 = new Configuration();
Path out2 = new Path(args[2]);
FileSystem hdfs2 = out2.getFileSystem(conf2);
if (hdfs2.isDirectory(out2))
{
hdfs2.delete(out2, true);
}
Job job2 = new Job(conf2, "crime");
job2.setJarByClass(SanFranciscoCrime.class);

job2.setMapperClass(DistrictMapByDate.class);
job2.setReducerClass(CrimeReducerByWeek.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// 构造一个 cJob1
ControlledJob cJob1 = new ControlledJob(conf1);
//设置 MapReduce job1
cJob1.setJob(job1);

// 构造一个 cJob2
ControlledJob cJob2 = new ControlledJob(conf2);
//设置 MapReduce job2
cJob2.setJob(job2);

//cJob2.addDependingJob(cJob1);// cjob2依赖cjob1

// 定义job管理对象
JobControl jobControl = new JobControl("12");

//把两个构造的job加入到JobControl中
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);

//启动线程运行任务
Thread t = new Thread(jobControl);
t.start();
while (true)
{
if (jobControl.allFinished())
{
jobControl.stop();
break;
}

}
return 0;

}

public static void main(String[] args) throws Exception 
{
String[] args0 = 
{
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/",
"hdfs://HadoopMaster:9000/middle/test/out2/" };
int ec = ToolRunner.run(new Configuration(), new SanFranciscoCrime(), args0);
System.exit(ec);
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.net.URI;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**

* @function 统计每天每种犯罪类型在每个区域发生的次数
*
*
*/
public class SanFranciscoCrimePrepOlap extends MapReduceJobBase implements Tool 
{

private static Logger log = Logger.getLogger(SanFranciscoCrimePrepOlap.class.getCanonicalName());
private static List<String> categories = null;
private static List<String> districts = null;
private static final java.util.Map<String, Integer> categoryLookup = new HashMap<String, Integer>();
private static final java.util.Map<String, Integer> districtLookup = new HashMap<String, Integer>();
public static abstract class Map extends Mapper<LongWritable, Text, Text, Text> 
{
protected int keyID = 0;
protected int valueID = 0;
protected int value2ID = 0;

/**
* @function 将key值转换为规范的数据格式
* @param value 包含不规范的 key值
* @return 返回规范的key值
* @throws ParseException
*/
protected abstract String formatKey(String value) throws ParseException;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
try 
{
String[] col = DataFile.getColumns(line);//将读取的每行数据转换为数组
if (col != null) 
{
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
Text tk = new Text();
tk.set(formatKey(col[keyID]));//将日期作为key值
Text tv = new Text();
StringBuffer sv = new StringBuffer();
sv.append("\"");
sv.append(col[valueID]);//犯罪区域
sv.append("\"");
sv.append(",");
sv.append("\"");
sv.append(col[value2ID]);//犯罪类型
sv.append("\"");
tv.set(sv.toString());
context.write(tk, tv);
} else 
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} else 
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} catch (NumberFormatException nfe) 
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a number.\n", new Object[]{line}), nfe);
} catch (IOException e) 
{
log.log(Level.WARNING, MessageFormat.format("Cannot parse {0} into columns.\n", new Object[]{line}), e);
} catch (ParseException e)
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a date but it was not.\n", new Object[]{line}), e);
}
}
}

/**
* @function 将 map 输入数据的日期作为key,犯罪区域和犯罪类型作为value,然后输出
*/
public static class DateMapByCategoryAndDistrict extends Map 
{
public DateMapByCategoryAndDistrict() 
{
keyID = DATE_COLUMN_INDEX;//代表日期下标
valueID = DISTRICT_COLUMN_INDEX;//代表犯罪区域下标
value2ID = CATEGORY_COLUMN_INDEX;//代表犯罪类型下标
}

@Override
protected String formatKey(String value) throws ParseException 
{
return outputDateFormat.format(getDate(value));
}
}

public static class Reduce extends Reducer<Text, Text, Text, Text> 
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// 分配和初始化犯罪类型所在区域的二维数组
int[][] crimes = new int[categories.size()][districts.size()];
for (int i = 0; i < categories.size(); i++) 
{
for (int j = 0; j < districts.size(); j++) 
{
crimes[i][j] = 0;
}
}
//统计犯罪类型/区域二维数组的值(即每种犯罪类型在每个区域发生的次数)
for (Text crime:values)
{
String[] cols = DataFile.getColumns(crime.toString());
if (cols.length == 2)
{
if (categoryLookup.containsKey(cols[1]))
{
if (districtLookup.containsKey(cols[0]))
{
int cat = categoryLookup.get(cols[1]);
int dist = districtLookup.get(cols[0]);
crimes[cat][dist]++;
} else 
{
log.warning(MessageFormat.format("District {0} not found.", new Object[]{cols[0]}));
}
} else 
{
log.warning(MessageFormat.format("Category {0} not found.", new Object[]{cols[1]}));
}
} else 
{
log.warning(MessageFormat.format("Input {0} was in unexpected format", new Object[]{crime}));
}
}
//将非0二维数组的犯罪类别下标,犯罪区域下标,犯罪次数作为value输出
for (int i = 0; i < categories.size(); i++) 
{
for (int j = 0; j < districts.size(); j++)
{
if (crimes[i][j] > 0) 
{
StringBuffer sv = new StringBuffer();
sv.append(new Integer(i).toString());//犯罪类别下标
sv.append(",");
sv.append(new Integer(j).toString());//犯罪区域下标
sv.append(",");
sv.append(new Integer(crimes[i][j]));//犯罪次数
Text tv = new Text();
tv.set(sv.toString());
context.write(key, tv);
}
}
}
}
}
/**
* @function 加载已经生成的 犯罪类别数据和犯罪区域数据,并将这些数据排序后存入Map
* @param categoryReport SanFranciscoCrime job任务输出犯罪类别的文件路径
* @param districtReport SanFranciscoCrime job任务输出犯罪区域的文件路径
* @throws IOException
*/
private static void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException 
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
int i = 0;
for (String category : categories) 
{
categoryLookup.put(category, i++);
}
i = 0;
for (String district : districts) 
{
districtLookup.put(district, i++);
}
}
@Override
public int run(String[] arg0) throws Exception 
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

Path out = new Path(arg0[3]);

FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out)) 
{
hdfs.delete(out, true);
}

// 任务1
Job job = new Job(conf, "SanFranciscoCrimePrepOlap");
job.setJarByClass(SanFranciscoCrimePrepOlap.class);

job.setMapperClass(DateMapByCategoryAndDistrict.class);//Mapper
job.setReducerClass(Reduce.class);//Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[3]));
job.waitForCompletion(true);//提交任务
return 0;
}

public static void main(String[] args) throws Exception
{
String[] args0 = {
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out3/"};
if (args0.length == 4) 
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
//调用setup
setup(args0[1], args0[2],fs);
//执行MapReduce任务
int ec = ToolRunner.run(conf, new SanFranciscoCrimePrepOlap(), args0);
System.exit(ec);
} else 
{
System.err.println("\nusage: bin/hadoop jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar SanFranciscoCrimePrepOlap path/to/category/report path/to/district/report path/to/input/data path/to/output/data");
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

 

 

/***
* @function 从 MapReduce 任务中,提取数据,插入到mysql数据库

*/
public class LoadStarDB
{

private Connection db = null;//mysql数据库连接

private Map<String, Integer> lastPrimaryKey = new HashMap<String, Integer>();

private List<String> categories = null;//犯罪类别list

private List<String> districts = null;//犯罪区域list

//映射date主键的关系
private final java.util.Map<Date, Integer> timeperiodLookup = new HashMap<Date, Integer>();

private final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");//插入数据库的日期格式

private final DateFormat kdf = new SimpleDateFormat("yyyy/MM/dd");//从map/reduce任务输出文件中,解析出此日期

/***
* @function 向数据库表中插入一条记录
* @param table 表名称
* @param row 包含插入字段的数据
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insert(String table, DataRecord row) throws SQLException 
{
int retVal = 0;
Statement s = db.createStatement();
StringBuffer sql = new StringBuffer();
sql.append("insert into ");
sql.append(table);
sql.append(" ");

sql.append(row.toString());
s.execute(sql.toString());
if (lastPrimaryKey.containsKey(table)) 
{
retVal = lastPrimaryKey.get(table) + 1;
lastPrimaryKey.put(table, retVal);
} else 
{
lastPrimaryKey.put(table, 1);
retVal = 1;
}
return retVal;
}

/***
* @function 向数据库中插入一条犯罪类别记录
* @param category name字段对应的值
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insertCategory(String category) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("name", category);
return insert("category", dr);
}

/***
* @function 向数据库中插入一条犯罪区域记录
* @param district name字段对应的值 
* @return 返回此记录的主键id
* @throws SQLException
*/
private int insertDistrict(String district) throws SQLException 
{
DataRecord dr = new DataRecord();
dr.put("name", district);
return insert("district", dr);
}

/***
* @function 将日期date拆分为字段 year, month, week, 和 day
* @param dr 包含date被拆分的字段
* @param d 需要拆分的date日期
*/
private void setTimePeriod(DataRecord dr, Date d) 
{
Calendar cal = Calendar.getInstance();
cal.setTime(d);
dr.put("year", cal.get(Calendar.YEAR));
dr.put("month", cal.get(Calendar.MONTH));
dr.put("week", cal.get(Calendar.WEEK_OF_MONTH));
dr.put("day", cal.get(Calendar.DAY_OF_MONTH));
}

/***
* @function 如果日期date已经存在表中,返回主键id,如果不存在,则插入数据库并返回主键id
* @param d 日期date
* @return 返回此日期对应的主键id
* @throws SQLException
*/
private int insertTimePeriod(Date d) throws SQLException
{
int retVal = 0;
if (timeperiodLookup.containsKey(d)) 
{
retVal = timeperiodLookup.get(d);
} else 
{
DataRecord dr = new DataRecord();
setTimePeriod(dr, d);
retVal = insert("timeperiod", dr);
timeperiodLookup.put(d, retVal);
}
return retVal;
}

/***
* @function 将数据记录插入fact表中
* @param districtId 犯罪区域外键id
* @param categoryId 犯罪类别外键id
* @param timeId 日期外键id
* @param crimes 在某一日期 某一区域 发生某一犯罪类别的总犯罪次数
* committed in this district of this category at his time* 
* @throws SQLException
*/
private void insertFact(int districtId, int categoryId, int timeId, int crimes) throws SQLException 
{
DataRecord dr = new DataRecord();
dr.put("district_id", districtId);
dr.put("category_id", categoryId);
dr.put("time_id", timeId);
dr.put("crimes", crimes);
insert("fact", dr);
}

/***
* @function 从SanFrancisco Crime map/reduce job输出结果中,读取数据
* @param categoryReport 犯罪类别文件路径
* @param districtReport 犯罪区域文件路径
* @throws IOException* 
* @throws SQLException
*/
private void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException, SQLException 
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
for (String category : categories)
{
insertCategory(category);
}
for (String district : districts)
{
insertDistrict(district);
}
}

/***
* @function 清空name表中的所有记录
* @param name 表名称
* @throws SQLException
*/
private void truncate(String name) throws SQLException
{
Statement s = db.createStatement();
s.execute("truncate table ".concat(name));
s.close();
}

/***
* @function 调用truncate()方法,清空表记录
* @throws SQLException
*/
private void reset() throws SQLException 
{
truncate("fact");
truncate("category");
truncate("district");
truncate("timeperiod");
}

/***
* @function 解析加载的数据
* @param categoryReport 犯罪类别文件路径
* @param districtReport 犯罪区域文件路径
* @param dbhost 数据库地址
* @param dbname 数据库名称
* @param dbuser 用户名
* @param dbpassword 密码
* @throws ClassNotFoundException* 
* @throws SQLException* 
* @throws IOException
*/
private LoadStarDB(String categoryReport, String districtReport,
String dbhost, String dbname, String dbuser, String dbpassword,FileSystem fs)
throws ClassNotFoundException, SQLException, IOException
{
Class.forName("com.mysql.jdbc.Driver");
String cs = MessageFormat
.format("jdbc:mysql://192.168.80.128:3306/test?user=root&password=root&autoReconnect=true",
new Object[] { dbhost, dbname, dbuser, dbpassword });
db = DriverManager.getConnection(cs);
reset();
setup(categoryReport, districtReport,fs);
}

/***

* @function 处理 SanFranciscoCrimPrepOlap map/reduce job任务输出结果,填充 timeperiod表和fact表 
* @param dataFile 文件路径
* @throws IOException* 
* @throws ParseException
*/
private void processData(String dataFile,FileSystem fs) throws IOException,ParseException 
{
FSDataInputStream in = fs.open(new Path(dataFile));//打开数据流
BufferedReader br = new BufferedReader(new InputStreamReader(in));//读取数据
String line = br.readLine();
while (line != null) 
{
String[] lp = line.split("\t");
if (lp.length > 0)
{
Date d = kdf.parse(lp[0]);//日期
String[] data = DataFile.getColumns(lp[1]);
if (data.length == 3) 
{
try
{
int categoryId = Integer.parseInt(data[0]) + 1;//犯罪类别id
int districtId = Integer.parseInt(data[1]) + 1;//犯罪区域id
int crimes = Integer.parseInt(data[2]);//犯罪次数
int timeId = insertTimePeriod(d);//时间id
insertFact(districtId, categoryId, timeId, crimes);//插入fact表
} catch (NumberFormatException nfe) 
{
System.err.println("invalid data: " + line);
} catch (SQLException e)
{
e.printStackTrace();
}
} else 
{
System.err.println("invalid data: " + line);
}
}
line = br.readLine();
}
br.close();
}

/*** 
* @function 运行job任务
* @param args 
* @throws IOException 
* */
public static void main(String[] args) throws IOException
{
String[] args0 = 
{
"hdfs://HadoopMaster:9000/middle/crime/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out3/part-r-00000",
"192.168.80.128:3306",
"test",
"root",
"root"};
if (args0.length == 7) 
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
try
{
LoadStarDB m = new LoadStarDB(args0[0], args0[1], args0[3],args0[4], args0[5], args0[6],fs);
m.processData(args0[2],fs);
} catch (ClassNotFoundException e)
{
e.printStackTrace();
} catch (SQLException e)
{
e.printStackTrace();
} catch (IOException e) 
{
e.printStackTrace();
} catch (ParseException e)
{
e.printStackTrace();
}
} else {
System.err
.println("\nusage: java -jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar com.dynamicalsoftware.olap.etl.LoadStarDB path/to/category/report path/to/district/report path/to/star/data dbhost dbname dbuser dbpassword\n");
}
}

/*** 
* 生成一条数据记录
*/
class DataRecord extends HashMap<String, Object> 
{
@Override
public String toString()
{
StringBuffer retVal = new StringBuffer();
// 生成表的数据字段
retVal.append("(");
boolean first = true;
for (String key : keySet()) 
{
if (first) 
{
first = false;
} else 
{
retVal.append(",");
}
retVal.append(key);
}
//生成表字段对应的值
retVal.append(") values (");
first = true;
for (String key : keySet()) 
{
Object o = get(key);
if (first)
{
first = false;
} else 
{
retVal.append(",");
}
if (o instanceof Long) 
{
retVal.append(((Long) o).toString());
} else if (o instanceof Integer)
{
retVal.append(((Integer) o).toString());
} else if (o instanceof Date)
{
Date d = (Date) o;
retVal.append("'");
retVal.append(df.format(d));
retVal.append("'");
} else if (o instanceof String) 
{
retVal.append("'");
retVal.append(o.toString());
retVal.append("'");
}
}
retVal.append(")");
//返回一条sql格式的数据记录
return retVal.toString();
}
}

}

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6166276.html,如需转载请自行联系原作者

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
安全 Java API
告别繁琐编码,拥抱Java 8新特性:Stream API与Optional类助你高效编程,成就卓越开发者!
【8月更文挑战第29天】Java 8为开发者引入了多项新特性,其中Stream API和Optional类尤其值得关注。Stream API对集合操作进行了高级抽象,支持声明式的数据处理,避免了显式循环代码的编写;而Optional类则作为非空值的容器,有效减少了空指针异常的风险。通过几个实战示例,我们展示了如何利用Stream API进行过滤与转换操作,以及如何借助Optional类安全地处理可能为null的数据,从而使代码更加简洁和健壮。
106 0
|
1月前
|
IDE API 定位技术
Python--API编程:IP地址翻译成实际的物理地址
Python--API编程:IP地址翻译成实际的物理地址
|
2月前
|
网络协议 API Windows
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
MASM32编程调用 API函数RtlIpv6AddressToString,windows 10 容易,Windows 7 折腾
|
3月前
|
分布式计算 大数据 Hadoop
揭秘MapReduce背后的魔法:从基础类型到高级格式,带你深入理解这一大数据处理利器的奥秘与实战技巧,让你从此不再是编程门外汉!
【8月更文挑战第17天】MapReduce作为分布式计算模型,是大数据处理的基石。它通过Map和Reduce函数处理大规模数据集,简化编程模型,使开发者聚焦业务逻辑。MapReduce分单阶段和多阶段,支持多种输入输出格式如`TextInputFormat`和`SequenceFileInputFormat`。例如,简单的单词计数程序利用`TextInputFormat`读取文本行并计数;而`SequenceFileInputFormat`适用于高效处理二进制序列文件。合理选择类型和格式可有效解决大数据问题。
51 1
|
3月前
|
JavaScript API 开发者
RESTful API 设计的传奇征程:突破常规,拥抱最佳实践,铸就编程巅峰!
【8月更文挑战第7天】希望通过以上的探讨,能让您对 RESTful API 设计有更深入的理解和认识。
52 5
|
3月前
|
JSON API 数据库
神秘编程力量来袭!Rails 究竟隐藏着怎样的魔力,能构建出强大的 RESTful API?快来一探究竟!
【8月更文挑战第31天】《构建 RESTful API:使用 Rails 进行服务端开发》介绍了如何利用 Ruby on Rails 框架高效构建可扩展的 RESTful API。Rails 采用“约定优于配置”,简化开发流程,通过示例展示了路由定义、控制器设计及模型层交互等内容,帮助开发者快速搭建稳定可靠的服务端。无论小型项目还是大型应用,Rails 均能提供强大支持,提升开发效率。
28 0
|
5月前
|
分布式计算 Hadoop Java
MapReduce编程模型——在idea里面邂逅CDH MapReduce
MapReduce编程模型——在idea里面邂逅CDH MapReduce
82 15
|
5月前
|
分布式计算 Hadoop Java
Hadoop MapReduce编程
该教程指导编写Hadoop MapReduce程序处理天气数据。任务包括计算每个城市ID的最高、最低气温、气温出现次数和平均气温。在读取数据时需忽略表头,且数据应为整数。教程中提供了环境变量设置、Java编译、jar包创建及MapReduce执行的步骤说明,但假设读者已具备基础操作技能。此外,还提到一个扩展练习,通过分区功能将具有相同尾数的数字分组到不同文件。
62 1
|
5月前
|
存储 分布式计算 Hadoop
MapReduce编程模型——自定义序列化类实现多指标统计
MapReduce编程模型——自定义序列化类实现多指标统计
41 0
|
5月前
|
机器学习/深度学习 分布式计算 并行计算
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
MapReduce是一种用于并行计算的编程模型和处理大规模数据集的实现
69 0

热门文章

最新文章

下一篇
无影云桌面