暂时未有相关云产品技术能力~
1 线程的生命周期当线程被创建并启动以后,它既不是一启动就进入执行状态,也不是一直处于执行状态,在线程的生命周期中,它要经过新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)和死亡(Dead)5种状态。尤其是当线程启动以后,它不可能一直霸占着CPU独自运行,所以CPU需要在多条线程之间切换,于是线程状态也会多次在运行、就绪之间切换。1.1 新建和就绪状态新建状态:当程序使用new关键字创建了一个线程之后,该线程就处于新建状态,此时它和其他的Java对象一样,仅仅由JVM为其分配内存,并初始化其成员变量的值。就绪状态:当线程对象调用了start()方法之后,该线程处于就绪状态,JVM会为其创建方法调用栈和程序计数器,处于这个状态中的线程并没有开始运行,只是表示该线程可以运行了。至于何时运行,取决于JVM里线程调度器的调度。注意:(1)启动线程使用start()方法,而不是run()方法!永远不要调用线程对象的run()方法!调用start()方法来启动线程,系统会把对应的run()方法当做线程执行体来处理,但如果直接调用线程对象的run()方法,则run()方法立即就会执行,而且在run()方法返回之前其他线程无法并发执行。简而言之,就是启动线程的正确方法是调用Thread对象的start()方法,而不是直接调用run()方法,否则就不是新开线程了,而是在同步单线执行了。(2)只能对处于新建状态的线程调用start()方法,否则将引发IllegalThreadStateException异常。1.2 运行和阻塞状态运行状态:如果处于就绪状态的线程获得了CPU,开始执行run()方法的线程执行体,则该线程处于运行状态,如果计算机只有一个CPU,那么在任何时刻只有一个线程处于运行状态。当一个线程开始运行后,它很难一直处于运行状态,线程在运行过程中需要被中断,目的是使其他线程获得执行的机会,线程调度的细节取决于底层平台所采用的策略。说明对于抢占式策略,在选择下一个线程时,系统会考虑线程的优先级。所有现代的桌面和服务器操作系统都采用抢占式调度策略,但一些小型设备比如手机则可能采用协作式调度策略,在这样的系统中,只有当一个线程调用了它的sleep()或yield()方法后才会放弃所占用的资源-也就是必须由该线程主动放弃所占用的资源。当发生如下情况时,线程将会主动进入阻塞状态:(1)线程调用sleep()方法主动放弃所占用的处理器资源。(2)线程调用了一个阻塞式IO方法,在该方法返回之前,该线程被阻塞。(3)线程试图获得一个同步锁,但该同步锁被其他线程所持有。(4)线程在等待某个通知(notify)。(5)程序调用了线程的suspend()方法将线程挂起。(容易引起死锁,应尽量避免使用!)针对上面的几种情况,就当发生如下特定的情况时可以解除上面的阻塞,让该线程重新进入就绪状态:(1)调用sleep()方法的线程经过了指定的时间。(2)线程调用的阻塞式IO方法已经返回。(3)线程成功获得了同步锁。(4)线程正在等待某个通知时,其他线程发出了一个通知。(5)处于挂起状态的线程被调用了resume()恢复方法。线程状态转换图从上图中也可以看到,线程从阻塞状态只能进入就绪状态,无法直接进入运行状态。而就绪和运行状态之间的转换通常不受程序控制,而是由系统线程调度所决定,当处于就绪状态的线程获得处理器资源时,该线程进入运行状态;当处于运行状态的线程失去处理器资源时,该线程进入就绪状态。但有一个方法例外,调用yield()方法可以让运行状态的线程转入就绪状态。1.3 线程死亡以下三种方式结束,线程即进入死亡状态:(1)run()或call()方法执行完成,线程正常结束。(2)线程抛出一个未捕获的Exception或Error。(3)直接调用该线程的stop()方法来结束该线程,该方法容易导致死锁,不推荐使用。注意:不要试图对一个已经死亡的线程调用start()方法使它重新启动,死亡就是死亡,该线程将不可再次作为线程执行,否则将引发IllegalThreadStateException异常。2 控制线程2.1 join线程join()方法:让一个线程等待另一个线程完成。当在某个程序执行流中调用其他线程的join()方法时,调用线程将被阻塞,直到被join()方法加入的join线程执行完为止。2.2 后台线程后台线程也叫守护线程,JVM的垃圾回收线程就是典型的后台线程。后台线程有个特征,如果所有的前台线程都死亡,后台线程也会自动死亡。通过调用Thread对象的setDaemon(true)方法可以将指定线程设置成后台线程,同时Thread类还提供了一个isDaemon()方法用于判断执行的线程是否为后台线程。注意:将一个线程设置为后台线程,必须要在该线程启动之前设置,也就是说,setDaemon(true)方法必须在start()方法之前调用,否则将引发IllegalThreadStateException异常。2.3 线程睡眠:sleep线程调用sleep()方法进入阻塞状态,在其睡眠时间段内,该线程不会获得执行机会,即便系统中没有其他可执行的线程,处于sleep()中的线程也不会执行。2.4 线程让步:yieldyield()方法可以让当前正在执行的线程暂停,但它不会阻塞该线程,它只是将该线程转入就绪状态。yield()只是让当前线程暂停一下,让系统的线程调度器重新调度一次,完全可能的情况是,当某个线程调用了yield()方法暂停之后,线程调度器又将其调度出来重新执行。实际上,当某个线程调用yield()方法之后,只有优先级与当前线程相同或者优先级比当前线程更高的处于就绪状态的线程才会获得执行的机会。在多CPU并行的环境下,yield()方法的功能很多时候并不明显。2.5 改变线程优先级每个线程执行时都具有一定的优先级,优先级高的线程获得较多的执行机会,而优先级低的线程则获得较少的执行机会。每个线程默认的优先级都与创建它的父线程的优先级相同。注意:由于不同操作系统的支持不同,应该尽量避免直接为线程指定优先级,而应该使用静态常量来设置优先级,这样可以保证程序具有最好的可移植性。
问题:Oracle数据库中一个中文汉字占用几个字节?回答:Oracle数据库中一个中文汉字具体占用几个字节,要根据Oracle中字符集编码决定。问题:查看oracle server端字符集?回答:select userenv('language') from dual;GBK:1个汉字占用2个字节具体编码示例:SIMPLIFIED CHINESE_CHINA.ZHS16GBKUTF8:大多是1个汉字占用3个字节示例编码如下:SIMPLIFIED CHINESE_CHINA.AL32UTF8可以用以下语句查询一个汉字占用的字节长度select lengthb('你'),length('你') from dual;
由于基于模板生成pdf文件的方式,无法灵活支持同类别不固定行数的信息展示,所以只能改为不依靠模板的方式。现将我开发实现过程,做一简单分享。算是一个总结,便于自己日后查阅,也希望能够帮助到其他有需要的同学。需要参考的,看一下我下面截图的结果样式,再看一下对应的测试代码就可以使用了。1. 生成结果样式pdf表格文件示例2. 生成PDF文件实现方式具体实现:2.1 引入jar包<dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId> <version>5.5.13</version> </dependency> <dependency> <groupId>com.itextpdf</groupId> <artifactId>itext-asian</artifactId> <version>5.2.0</version> </dependency>2.2 具体代码如下:不啰嗦直接上代码,具体含义如果不清楚的话,可以看注释,我已经把注释尽可能的写详细了。package pdfgentest; import com.itextpdf.text.*; import com.itextpdf.text.pdf.BaseFont; import com.itextpdf.text.pdf.PdfPCell; import com.itextpdf.text.pdf.PdfPTable; import com.itextpdf.text.pdf.PdfWriter; import java.io.FileOutputStream; public class PdfTableGenTest { private static Font headFont;// 设置字体大小 private static Font sonHeadFont;// 设置字体大小 private static Font normalTextFont;// 设置字体大小 private static Font tableLineHeadFont;// 设置字体大小 private static Font tableColumHeadFont;// 设置字体大小 private static Font textFont;// 设置字体大小 private static Font minTextFont;// 设置字体大小 static { BaseFont bfChinese; try { bfChinese = BaseFont.createFont("STSong-Light", "UniGB-UCS2-H", BaseFont.NOT_EMBEDDED); headFont = new Font(bfChinese, 16, Font.BOLD);// 设置字体大小 sonHeadFont = new Font(bfChinese, 10, Font.BOLD);// 设置字体大小 normalTextFont = new Font(bfChinese, 10, Font.NORMAL);// 设置字体大小 tableLineHeadFont = new Font(bfChinese, 8, Font.BOLD);// 设置字体大小 tableColumHeadFont = new Font(bfChinese, 6, Font.BOLD);// 设置字体大小 textFont = new Font(bfChinese, 6, Font.NORMAL);// 设置字体大小 minTextFont = new Font(bfChinese, 5, Font.NORMAL);// 设置字体大小 } catch (Exception e) { e.printStackTrace(); } } public static void writeExampaperPdf() throws Exception { // 1.新建document对象 // 第一个参数是页面大小。接下来的参数分别是左、右、上和下页边距。 Document document = new Document(PageSize.A4, 50, 50, 20, 40); // 2.建立一个书写器(Writer)与document对象关联,通过书写器(Writer)可以将文档写入到磁盘中。 // 创建 PdfWriter 对象 第一个参数是对文档对象的引用,第二个参数是文件的实际名称,在该名称中还会给出其输出路径。 PdfWriter writer = PdfWriter.getInstance(document, new FileOutputStream("D://myPdfFile.pdf")); //3.2打开文档 document.open(); //创建一个包含多列的表格,以通过合并单元格的方式,控制表格的宽度大小 PdfPTable table = createTable(12); table.addCell(createCell("\n", headFont, Element.ALIGN_CENTER, 4, false)); table.addCell(createCell("客户基本信息表", headFont, Element.ALIGN_CENTER, 4, false)); sonHeadFont.setColor(new BaseColor(205, 133, 63)); //设置字体颜色 table.addCell(createCell("文件编号:FILEBH-001", sonHeadFont, Element.ALIGN_RIGHT, 4, false)); PdfPCell jbxxCell = createCell("基本信息", tableLineHeadFont, Element.ALIGN_CENTER, 12, true); jbxxCell.setBackgroundColor(new BaseColor(205, 133, 63)); table.addCell(jbxxCell); //创建单元格,指定字体、对齐方式、合并单元格的个数、是否有边框 table.addCell(createCell("姓名", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell(null, textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("职业", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell("工程师", textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("出生日期", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell("2020-01-01", textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("国籍", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell("中国", textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("性别", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell("男", textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("联系电话", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); table.addCell(createCell("18888888888", textFont, Element.ALIGN_CENTER, 4, true)); table.addCell(createCell("个人税收居民身份", tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); Phrase phrase = new Phrase(); PdfPCell wtrgrssjmxxCell1 = new PdfPCell(); wtrgrssjmxxCell1.setVerticalAlignment(Element.ALIGN_MIDDLE); wtrgrssjmxxCell1.setHorizontalAlignment(Element.ALIGN_LEFT); phrase.add(new Chunk("□ 1.中国税收居民 □ 2.非中国税收居民 ■ 3.既是中国税收居民又是其他国家(地区)税收居民\n", minTextFont).setLineHeight(12f)); phrase.add(new Chunk("★ 如以上选项中填选第2项或第3项,请填写下列信息:\n", textFont).setLineHeight(12f)); phrase.add(new Chunk(" 税收居民国(地区):", textFont).setLineHeight(12f)); phrase.add(new Chunk(" HK00001", textFont).setLineHeight(12f).setUnderline(0.5f, -1f)); phrase.add(new Chunk(" 纳税人识别号(如有):", textFont).setLineHeight(12f)); phrase.add(new Chunk("289000001", textFont).setLineHeight(12f).setUnderline(0.5f, -1f)); //设置下划线,并设置下划线的粗细 wtrgrssjmxxCell1.setPhrase(phrase); wtrgrssjmxxCell1.setPaddingLeft(20f); wtrgrssjmxxCell1.setColspan(10); wtrgrssjmxxCell1.setPaddingTop(3.0f); wtrgrssjmxxCell1.setPaddingBottom(6.0f); table.addCell(wtrgrssjmxxCell1); PdfPCell cclyCell = createCell("保险清单列表", tableLineHeadFont, Element.ALIGN_CENTER, 12, true); cclyCell.setBackgroundColor(new BaseColor(205, 133, 63)); //设置单元格背景 table.addCell(cclyCell); for (int i = 0; i < 2; i++) { table.addCell(createCell("保险清单" + (i + 1), tableColumHeadFont, Element.ALIGN_CENTER, 2, true)); Phrase insurancePhrase1 = new Phrase(); PdfPCell insuranceCell1 = new PdfPCell(); insuranceCell1.setVerticalAlignment(Element.ALIGN_MIDDLE); //上下居中 insuranceCell1.setHorizontalAlignment(Element.ALIGN_LEFT); //水平左对齐 insurancePhrase1.add(new Chunk("1.保险产品名称:灵通万事家天下寿险" + i + "\n", textFont).setLineHeight(10f)); //setLineHeight 设置行高 insurancePhrase1.add(new Chunk("3.保险单号:L000000002392997" + i + "\n", textFont).setLineHeight(10f)); insurancePhrase1.add(new Chunk("5.保险险种:终身寿险" + "\n", textFont).setLineHeight(10f)); insurancePhrase1.add(new Chunk("7.基本保额:250000" + i + "\n", textFont).setLineHeight(10f)); insurancePhrase1.add(new Chunk("9.备注(其他情况说明):无", textFont).setLineHeight(10f)); insuranceCell1.setPhrase(insurancePhrase1); insuranceCell1.setPaddingLeft(10f);//设置左侧空白填充的宽度 insuranceCell1.setColspan(5); //合并单元格 insuranceCell1.setPaddingTop(1.0f); //距离上边框距离 insuranceCell1.setPaddingBottom(4.0f); //距离下边框距离 insuranceCell1.disableBorderSide(8);//隐藏右边框 1-上, 2-下, 4-左, 8-右 table.addCell(insuranceCell1); Phrase insurancePhrase2 = new Phrase(); PdfPCell insuranceCell2 = new PdfPCell(); insuranceCell2.setVerticalAlignment(Element.ALIGN_MIDDLE); //上下居中 insuranceCell2.setHorizontalAlignment(Element.ALIGN_LEFT); //水平左对齐 insurancePhrase2.add(new Chunk("2.保险公司:灵通万事" + i + "\n", textFont).setLineHeight(10f)); insurancePhrase2.add(new Chunk("4.保单生效日:2020/12/0" + i + "\n", textFont).setLineHeight(10f)); insurancePhrase2.add(new Chunk("6.应交总保费:1290500" + i + "\n", textFont).setLineHeight(10f)); insurancePhrase2.add(new Chunk("8.受益人变更情况:生存受益人变更为保险公司;身故受益人变更为保险公司\n", textFont).setLineHeight(10f)); insurancePhrase2.add(new Chunk(" \n", textFont).setLineHeight(10f)); insuranceCell2.setPhrase(insurancePhrase2); insuranceCell2.setColspan(5); insuranceCell2.setPaddingTop(1.0f); insuranceCell2.setPaddingBottom(4.0f); insuranceCell2.disableBorderSide(4);//隐藏左边框: 1-上, 2-下, 4-左, 8-右 table.addCell(insuranceCell2); } PdfPCell syrmxCell = createCell("受益人情况", tableLineHeadFont, Element.ALIGN_CENTER, 12, true); syrmxCell.setBackgroundColor(new BaseColor(205, 133, 63)); table.addCell(syrmxCell); for (int i = 0; i < 1; i++) { table.addCell(createCell("受益人" + (i + 1), tableColumHeadFont, Element.ALIGN_CENTER, 1, true)); Phrase syrPhrase1 = new Phrase(); PdfPCell syrCell1 = new PdfPCell(); syrCell1.setVerticalAlignment(Element.ALIGN_MIDDLE); syrCell1.setHorizontalAlignment(Element.ALIGN_LEFT); syrPhrase1.add(new Chunk("姓名:王小" + i + "\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("与委托人关系:儿子\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("婚姻状况:未婚\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("联系电话:1887878000" + i + "\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("电子邮箱:8888888@168.com\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("邮寄地址:北京市朝阳区光明路128号5栋102\n", textFont).setLineHeight(10f)); syrPhrase1.add(new Chunk("终止分配比例:100%", textFont).setLineHeight(10f)); syrCell1.setPhrase(syrPhrase1); syrCell1.setPaddingLeft(10f); syrCell1.setColspan(5); syrCell1.setPaddingTop(1.0f); syrCell1.setPaddingBottom(4.0f); table.addCell(syrCell1); table.addCell(createCell("个人税收居民身份", tableColumHeadFont, Element.ALIGN_CENTER, 1, true)); Phrase syrgrssjmxxPhrase = new Phrase(); PdfPCell syrgrssjmxxCell1 = new PdfPCell(); syrgrssjmxxCell1.setVerticalAlignment(Element.ALIGN_MIDDLE); syrgrssjmxxCell1.setHorizontalAlignment(Element.ALIGN_LEFT); syrgrssjmxxPhrase.add(new Chunk("■ 1.中国税收居民 □ 2.非中国税收居民 □ 3.既是中国又是其他国家(地区)税收居民\n", minTextFont).setLineHeight(15f)); syrgrssjmxxPhrase.add(new Chunk("★ 如以上选项中填选第2项或第3项,请填写下列信息:\n", textFont).setLineHeight(15f)); syrgrssjmxxPhrase.add(new Chunk(" 税收居民国(地区):▁▁▁▁ 纳税人识别号(如有):▁▁▁▁", textFont).setLineHeight(15f)); syrgrssjmxxCell1.setPhrase(syrgrssjmxxPhrase); syrgrssjmxxCell1.setPaddingLeft(6f); syrgrssjmxxCell1.setColspan(5); syrgrssjmxxCell1.setPaddingTop(3.0f); syrgrssjmxxCell1.setPaddingBottom(6.0f); table.addCell(syrgrssjmxxCell1); } document.add(table); document.add(new Paragraph("\n")); String paragraph5 = "客户(签字):▁▁▁▁▁▁▁▁"; Paragraph elements5 = new Paragraph(paragraph5, normalTextFont); elements5.setAlignment(Element.ALIGN_RIGHT); document.add(elements5); // 5.关闭文档 document.close(); } public static PdfPTable createTable(int colNumber) { int maxWidth = 500; PdfPTable table = new PdfPTable(colNumber); try { table.setTotalWidth(maxWidth); table.setLockedWidth(true); table.setHorizontalAlignment(Element.ALIGN_CENTER); table.setWidths(new int[]{50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50, 50}); table.getDefaultCell().setBorder(1); } catch (Exception e) { e.printStackTrace(); } return table; } public static PdfPCell createCell(String value, Font font, int align, int colspan, boolean boderFlag) { PdfPCell cell = new PdfPCell(); cell.setVerticalAlignment(Element.ALIGN_MIDDLE); cell.setHorizontalAlignment(align); cell.setColspan(colspan); cell.setPhrase(new Phrase(value, font)); cell.setPadding(3.0f); if (!boderFlag) { cell.setBorder(0); cell.setPaddingTop(15.0f); cell.setPaddingBottom(8.0f); } return cell; } public static void main(String[] args) { try { writeExampaperPdf(); } catch (Exception e) { e.printStackTrace(); } } }3. 写在最后的小心得网上得来的代码,尤其是涉及流资源读取与关闭的,一定要仔细重点的经过反复测试验证之后,再借鉴使用。一定不要拿来主义,直接照搬。因为很多时候看似是捷径的方法,其实质很可能是拖你后腿的大坑。4. 参考文章https://www.bbsmax.com/A/gVdnK17XzW/https://www.cnblogs.com/qlqwjy/p/8213989.html
上午在生产服务器发现一个不小的问题,就是一个程序在调用存储过程中抢到了锁,但抢到锁后调用存储过程执行出现卡死的情况,导致抢到的锁迟迟没有释放,这导致第二天程序执行时,因为无法获取到锁而无法正常执行。解决方案:引入Future类,并设定调用存储过程执行的超时时间,通过get(long timeout, TimeUnit unit),当抛出超时异常时,记录异常,往下进行其他处理逻辑,并正常释放锁。当创建了Future实例,任务可能有以下三种状态:等待状态。此时调用cancel()方法不管传入true还是false都会标记为取消,任务依然保存在任务队列中,但当轮到此任务运行时会直接跳过。完成状态。此时cancel()不会起任何作用,因为任务已经完成了。运行中。此时传入true会中断正在执行的任务,传入false则不会中断。总结:Future.cancel(true)适用于:(1) 长时间处于运行的任务,并且能够处理interruptionFuture.cancel(false)适用于:(1) 未能处理interruption的任务 ;(2) 不清楚任务是否支持取消 ;(3) 需要等待已经开始的任务执行完成吊诡的事情:当超时后,我调用Future的cancel(true)方法,在本地demo测试如下代码时,均可以正常中断任务线程,但在Spring项目工程中使用时,却没有实现效果。这个坑,暂且留下,还待后面再补上。回复吊诡的事情:这里把坑补上,其实吊诡的事情并不吊诡,主要是我事先没有一个词“能够处理interruption”或是“可中断的方法”。下面解释一下:当一个方法内部调用了wait、sleep、join等方法时,会使得当前线程进入阻塞状态,若另外的一个线程调用被阻塞线程的interrupt方法,则会打断这种阻塞,因此这种方法有时会被称为可中断方法。记住,打断一个线程并不等于该线程的生命周期结束,仅仅是打断了当前线程的阻塞状态。而所谓“吊诡的事情”发生,正是因为我在demo测试的代码中调用sleep方法,使得demo测试的代码成为了可中断的方法,而Spring工程中的代码,未调用sleep等类方法,也就是未进入阻塞状态,故而无法被中断。demo测试代码:package com.xgh.demo.threaddemo; import java.util.concurrent.*; public class TestFuture { public static void main(String[] args) { try { testTimeout3(100); System.out.println("执行结束3。。。。"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } public static void testTimeout1(final int num) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newScheduledThreadPool(5); Future result1 = executor.submit(new Callable() { @Override public Integer call() throws Exception { Thread.sleep(10000); System.out.println(num + "-22222222222222" + Thread.currentThread().getName()); return num; } }); System.out.println("下面开始判断程序是否超时或已经执行完毕。。。"); long currentTimeMillis = System.currentTimeMillis(); long timeout = 5 * 1000L; while (!result1.isDone()) { long timecha = System.currentTimeMillis() - currentTimeMillis; if (timecha >= timeout) { System.out.println("revoke timeout"); boolean cancel = result1.cancel(true); System.out.println("revoke cancel result : --->" + cancel + " ;result isCancelled: " + result1.isCancelled()); cancel = result1.cancel(true); System.out.println("revoke cancel result2 : --->" + cancel + " ;result isCancelled: " + result1.isCancelled()); break; } if (result1.isDone()) { System.out.println("result1----->" + result1.get()); System.out.println("revoke success..."); boolean cancel = result1.cancel(true); System.out.println("revoke cancel result : --->" + cancel); break; } } executor.shutdown(); //关闭线程池 } public static void testTimeout2(final int num) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newScheduledThreadPool(5); Future result1 = executor.submit(new Callable() { @Override public Boolean call() throws Exception { Thread.sleep(10000); System.out.println(num + "=========" + Thread.currentThread().getName()); return true; } }); long timeout = 5 * 1000L; try { boolean result = (boolean) result1.get(timeout, TimeUnit.MILLISECONDS); System.out.println("revoke success,result ----->" + result); } catch (TimeoutException e) { System.out.println("revoke timeout:" + e); boolean cancel = result1.cancel(true); System.out.println("revoke cancel result : --->" + cancel + " ;result isCancelled: " + result1.isCancelled()); } executor.shutdown(); } /** * 验证不可中断的方法 * @param num * @throws InterruptedException * @throws ExecutionException */ public static void testTimeout3(final int num) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newScheduledThreadPool(5); Future result1 = executor.submit(new Callable() { @Override public Integer call() throws Exception { int i= 1; while (true){ System.out.println(num + "-22222222222222" + Thread.currentThread().getName()+"=="+Thread.currentThread().isInterrupted()); i ++; if (i == 99999999){ break; } } return num; } }); System.out.println("下面开始判断程序是否超时或已经执行完毕。。。"); long currentTimeMillis = System.currentTimeMillis(); long timeout = 2000L; while (!result1.isDone()) { long timecha = System.currentTimeMillis() - currentTimeMillis; if (timecha >= timeout) { System.out.println("revoke timeout"); boolean cancel = result1.cancel(true); System.out.println("revoke cancel result : --->" + cancel + " ;result isCancelled: " + result1.isCancelled()); cancel = result1.cancel(true); System.out.println("revoke cancel result2 : --->" + cancel + " ;result isCancelled: " + result1.isCancelled()); break; } if (result1.isDone()) { System.out.println("result1----->" + result1.get()); System.out.println("revoke success..."); boolean cancel = result1.cancel(true); System.out.println("revoke cancel result : --->" + cancel); break; } } executor.shutdown(); //关闭线程池 } }参考文章:https://felord.blog.csdn.net/article/details/104788189https://blog.csdn.net/u014252478/article/details/82109694https://blog.csdn.net/qq_24630433/article/details/88537407
1 效果演示效果演示截图2 实现过程2.1 导入jar包<!-- https://mvnrepository.com/artifact/org.apache.poi/poi --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.17</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.poi/poi-scratchpad --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-scratchpad</artifactId> <version>3.17</version> </dependency>2.2 实现代码不啰嗦,直接上代码。package WordTest; import org.apache.poi.hwpf.HWPFDocument; import org.apache.poi.hwpf.usermodel.Range; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; public class WordDemoUtils { public static void main(String[] args) { replaceWordKeyWord(); } public static void replaceWordKeyWord() { try { String serverPath = "E:\\TEST\\tmpl2.doc"; InputStream is = new FileInputStream(serverPath); HWPFDocument doc = new HWPFDocument(is); Range range = doc.getRange(); range.replaceText("${year}", "2020"); range.replaceText("${morning}", "早上"); OutputStream os = new FileOutputStream("E:\\TEST\\target.doc"); //把doc输出到输出流中 doc.write(os); is.close(); os.close(); } catch (Exception e) { e.printStackTrace(); } } }自测过程中发现的小问题及解决方法:原本最初我使用的模板后缀为.docx,可自测过程中报了如下错误:java.lang.IllegalArgumentException: The document is really a OOXML file。解决方法:我将docx的文件另存为低版本的doc文件,即解决上述了问题。所以这里做个小总结,模板和输出文件都要是doc文件。
最近业务又新提了一个小需求,由于原来基于模板生成pdf文件的方式,无法灵活支持pdf文件中表格大小的展示,只能改为不依靠模板的方式。经过两天开发测试,算是搞定了业务的需求。现将自己的开发实现过程,做一简单分享。算是一个总结,便于自己日后查阅,也希望能够帮助到其他有需要的同学。1. 生成结果样式生成pdf样例2. 生成PDF文件实现方式具体实现:2.1 引入jar包<dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId> <version>5.5.13</version> </dependency> <dependency> <groupId>com.itextpdf</groupId> <artifactId>itext-asian</artifactId> <version>5.2.0</version> </dependency>2.2 具体代码如下:不啰嗦直接上代码。package pdfgentest; import com.itextpdf.text.*; import com.itextpdf.text.pdf.BaseFont; import com.itextpdf.text.pdf.PdfPCell; import com.itextpdf.text.pdf.PdfPTable; import com.itextpdf.text.pdf.PdfWriter; import java.io.FileOutputStream; public class EffectNoticePdfTest { private static Font headfont;// 设置字体大小 private static Font keyNormalFont;// 设置字体大小 private static Font keyBoldFont;// 设置字体大小 private static Font theadfont;// 设置字体大小 private static Font textfont;// 设置字体大小 static { BaseFont bfChinese; try { bfChinese = BaseFont.createFont("STSong-Light", "UniGB-UCS2-H", BaseFont.NOT_EMBEDDED); headfont = new Font(bfChinese, 16, Font.BOLD);// 设置字体大小 keyNormalFont = new Font(bfChinese, 10, Font.NORMAL);// 设置字体大小 keyBoldFont = new Font(bfChinese, 10, Font.BOLD);// 设置字体大小 theadfont = new Font(bfChinese, 9, Font.BOLD);// 设置字体大小 textfont = new Font(bfChinese, 8, Font.NORMAL);// 设置字体大小 } catch (Exception e) { e.printStackTrace(); } } public static void writeExampaperPdf() throws Exception { // 1.新建document对象 // 第一个参数是页面大小。接下来的参数分别是左、右、上和下页边距。 Document document = new Document(PageSize.A4, 50, 50, 120, 80); // 2.建立一个书写器(Writer)与document对象关联,通过书写器(Writer)可以将文档写入到磁盘中。 // 创建 PdfWriter 对象 第一个参数是对文档对象的引用,第二个参数是文件的实际名称,在该名称中还会给出其输出路径。 PdfWriter writer = PdfWriter.getInstance(document, new FileOutputStream("D://effect_notice_file2020.pdf")); //3.打开文档 document.open(); //4.设置背景图片 Image jpeg = Image.getInstance("backgroud.jpeg"); jpeg.setAlignment(Image.MIDDLE | Image.UNDERLYING); jpeg.setAbsolutePosition(0, 0); jpeg.scaleAbsolute(595, 842); document.add(jpeg); //5.将标题写进去 Paragraph pt = new Paragraph("合同生效通知书", headfont); pt.setAlignment(1);// 设置文字居中 0靠左 1,居中 2,靠右 document.add(pt); document.add(new Paragraph("\n"));// 添加段落分隔符 换行 Paragraph paragraph1 = new Paragraph(); paragraph1.add(new Chunk("致:尊敬的合伙人\n", keyBoldFont)); paragraph1.add(new Chunk(" 《", keyNormalFont)); paragraph1.add(new Chunk("益生华飞•第【64】号投资合作协议合同", keyNormalFont).setUnderline(0.7f, -1f)); paragraph1.add(new Chunk("》(以下简称“本合同”)已达到协议合同约定的生效条件,于", keyNormalFont)); paragraph1.add(new Chunk("2020", keyNormalFont).setUnderline(0.7f, -1f)); paragraph1.add(new Chunk("年", keyNormalFont)); paragraph1.add(new Chunk("10", keyNormalFont).setUnderline(0.7f, -1f)); paragraph1.add(new Chunk("月", keyNormalFont)); paragraph1.add(new Chunk("29", keyNormalFont).setUnderline(0.7f, -1f)); paragraph1.add(new Chunk("日生效。执行人已按照协议合同的约定管理数据和资产。\n", keyNormalFont)); paragraph1.add(new Chunk("一、成立信息\n", keyBoldFont)); paragraph1.add(new Chunk(" 1、管理期限:", keyNormalFont)); paragraph1.add(new Chunk("50", keyNormalFont).setUnderline(0.7f, -1f)); paragraph1.add(new Chunk("年\n", keyNormalFont)); paragraph1.add(new Chunk(" 2、本合同的交付的保险金请求权如下:", keyNormalFont)); paragraph1.setLeading(20f); document.add(paragraph1); document.add(new Paragraph("\n"));// 添加段落分隔符 换行 // PdfPTable table = createTable(8); table.addCell(createCell("序号", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保险产品名称", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保险公司", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保险单号", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保单生效日", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保险险种", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("保险规模(元)", theadfont, Element.ALIGN_CENTER)); table.addCell(createCell("受益人变更情况", theadfont, Element.ALIGN_CENTER)); for (int i = 0; i < 3; i++) { table.addCell(createCell(i + 1 + "", textfont)); table.addCell(createCell("亚盛财富版终身寿险" + i, textfont)); table.addCell(createCell("亚盛财富"+ i, textfont)); table.addCell(createCell("Y000000002392997" + i, textfont)); table.addCell(createCell("2020/8/1" + i, textfont)); PdfPCell pCell = createCell("□年金险\n" + "■终身寿险\n", textfont, Element.ALIGN_LEFT); pCell.setMinimumHeight(50); //设置最小行高 pCell.setPadding(8f); table.addCell(pCell); Phrase phrase = new Phrase(); PdfPCell pCell1 = new PdfPCell(); pCell1.setVerticalAlignment(Element.ALIGN_MIDDLE); pCell1.setHorizontalAlignment(Element.ALIGN_CENTER); phrase.add(new Chunk("1.应交总保费:\n", textfont)); phrase.add(new Chunk("1290500\n", textfont).setUnderline(0.1f, -1f).setLineHeight(9f)); phrase.add(new Chunk("2.基本保额: \n", textfont).setLineHeight(9f)); phrase.add(new Chunk("2500006\n", textfont).setUnderline(0.1f, -1f).setLineHeight(9f)); pCell1.setPhrase(phrase); pCell1.setPaddingLeft(7f); table.addCell(pCell1); PdfPCell pCell2 = createCell("□生存受益人变更为管理公司\n" + "■身故受益人变更为管理公司\n", textfont, Element.ALIGN_LEFT); pCell2.setPadding(6f); table.addCell(pCell2); } document.add(table); document.add(new Paragraph("\n")); Paragraph paragraph3 = new Paragraph(); paragraph3.add(new Chunk(" 特此回函。\n", keyNormalFont).setWordSpacing(30f)); paragraph3.add(new Chunk("重要提示:执行人仅以合同约定为限向受益人支付利益,且执行人及相关服务机构不承诺对本协议的业绩表现或者" + "任何回报之支付做出任何保证。\n", keyBoldFont).setWordSpacing(30f)); paragraph3.add(new Chunk(" 执行人咨询电话:400-888-8888。\n", keyNormalFont)); paragraph3.add(new Chunk(" 感谢您对我司的一贯支持与信任。\n", keyNormalFont)); paragraph3.setLeading(20f); document.add(paragraph3); String paragraph5 = "益生华飞投资管理有限公司\n2020年10月20日"; Paragraph elements5 = new Paragraph(paragraph5, keyNormalFont); elements5.setAlignment(Element.ALIGN_RIGHT); document.add(elements5); // 5.关闭文档 document.close(); } public static PdfPTable createTable(int colNumber) { int maxWidth = 500; PdfPTable table = new PdfPTable(colNumber); try { table.setTotalWidth(maxWidth); table.setLockedWidth(true); table.setHorizontalAlignment(Element.ALIGN_CENTER); table.setWidths(new int[]{15, 50, 50, 50, 28, 42, 45, 45}); table.getDefaultCell().setBorder(1); } catch (Exception e) { e.printStackTrace(); } return table; } public static PdfPCell createCell(String value, Font font, int align) { PdfPCell cell = new PdfPCell(); cell.setVerticalAlignment(Element.ALIGN_MIDDLE); cell.setHorizontalAlignment(align); Phrase phrase = new Phrase(); phrase.add(new Chunk(value, font).setLineHeight(9f)); cell.setPhrase(phrase); return cell; } public static PdfPCell createCell(String value, Font font) { PdfPCell cell = new PdfPCell(); cell.setVerticalAlignment(Element.ALIGN_MIDDLE); cell.setHorizontalAlignment(Element.ALIGN_CENTER); Phrase phrase = new Phrase(); phrase.add(new Chunk(value, font).setLineHeight(9f)); cell.setPhrase(phrase); return cell; } public static void main(String[] args) { try { writeExampaperPdf(); } catch (Exception e) { e.printStackTrace(); } } }写在最后的小心得:网上得来的代码,尤其是涉及流资源读取与关闭的,一定要仔细重点的经过反复测试验证之后,再借鉴使用。一定不要拿来主义,直接照搬。因为很多时候看似是捷径的方法,其实质很可能是拖你后腿的大坑。
1. 制作PDF模板目前制作PDF模板工具别无他物,唯有使用伟大的Adobe公司提供的Adobe Acrobat Pro DC软件进行制作。下面是一个测试PDF的表单域:2. 通过模板生成PDF文件具体实现:2.1 引入jar包<dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId> <version>5.5.13</version> </dependency> <dependency> <groupId>com.itextpdf</groupId> <artifactId>itext-asian</artifactId> <version>5.2.0</version> </dependency>2.2 具体代码如下:package pdfdemo; import java.io.ByteArrayOutputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import com.itextpdf.text.DocumentException; import com.itextpdf.text.pdf.AcroFields; import com.itextpdf.text.pdf.AcroFields.Item; import com.itextpdf.text.pdf.BaseFont; import com.itextpdf.text.pdf.PdfReader; import com.itextpdf.text.pdf.PdfStamper; public class PDFUtils { /** * @param fields * @param data * @throws IOException * @throws DocumentException */ private static void fillData(AcroFields fields, Map<String, String> data) throws IOException, DocumentException { List<String> keys = new ArrayList<String>(); Map<String, Item> formFields = fields.getFields(); for (String key : data.keySet()) { if (formFields.containsKey(key)) { String value = data.get(key); fields.setField(key, value); // 为字段赋值,注意字段名称是区分大小写的 keys.add(key); } } Iterator<String> itemsKey = formFields.keySet().iterator(); while (itemsKey.hasNext()) { String itemKey = itemsKey.next(); if (!keys.contains(itemKey)) { fields.setField(itemKey, " "); } } } /** * @param templatePdfPath 模板pdf路径 * @param generatePdfPath 生成pdf路径 * @param data 数据 */ public static String generatePDF(String templatePdfPath, String generatePdfPath, Map<String, String> data) { OutputStream fos = null; ByteArrayOutputStream bos = null; try { PdfReader reader = new PdfReader(templatePdfPath); bos = new ByteArrayOutputStream(); /* 将要生成的目标PDF文件名称 */ PdfStamper ps = new PdfStamper(reader, bos); /* 使用中文字体 */ BaseFont bf = BaseFont.createFont("STSong-Light", "UniGB-UCS2-H", BaseFont.NOT_EMBEDDED); ArrayList<BaseFont> fontList = new ArrayList<BaseFont>(); fontList.add(bf); /* 取出报表模板中的所有字段 */ AcroFields fields = ps.getAcroFields(); fields.setSubstitutionFonts(fontList); fillData(fields, data); /* 必须要调用这个,否则文档不会生成的 如果为false那么生成的PDF文件还能编辑,一定要设为true*/ ps.setFormFlattening(true); ps.close(); fos = new FileOutputStream(generatePdfPath); fos.write(bos.toByteArray()); fos.flush(); return generatePdfPath; } catch (Exception e) { e.printStackTrace(); } finally { if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } if (bos != null) { try { bos.close(); } catch (IOException e) { e.printStackTrace(); } } } return null; } public static void main(String[] args) { Map<String, String> data = new HashMap<String, String>(); //key为pdf模板的form表单的名字,value为需要填充的值 data.put("title", "河海市人民医院诊疗病历单"); data.put("case", "123456789"); data.put("date", "2020.11.02"); data.put("name", "熊瞎子"); data.put("sex", "男"); data.put("age", "29"); data.put("phone", "137888880000"); data.put("office", "内科"); data.put("cert", "身痒找打"); data.put("drug", "1、奥美拉唑肠溶胶囊 0.25g10粒×2板 "); data.put("dose", "×2盒"); data.put("cons", "用法用量:口服 一日两次 一次2粒"); data.put("tips", "温馨提示"); data.put("desc", "尽量呆在通风较好的地方,保持空气流通,有利于病情康复。尽量呆在通风较好的地方"); generatePDF("D:\\tpl2.pdf", "D:\\filled.pdf", data); } }生成效果如下:参考文章:https://www.cnblogs.com/zzlcome/p/11053226.htmlhttps://blog.csdn.net/javasun608/article/details/79307845
今天配合同事一起和外部系统进行联调测试,其实昨天我们已经成功走通了一遍。今天同事得到对方反馈系统可能有一个潜在的问题,所以就又严格地联调了一遍。这一遍,我也是一遍又一遍地盯日志,关注告警邮件。正是在这一遍联调系统中,我发现了一个小问题,程序里面一封相同内容的通知邮件,几乎是同一个时间发送了两次。通过分析日志外加比对代码,终于找到问题,是遇到线程安全问题引起的。下面和大家分享一下,伪代码:public class Email{ private String emailContent; @Async public void sendEmail(){ "send email" + emailContent; } } //Email类的实例被Spring容器管理,只有一个实例emailInstance。 public class TestDemo{ @Resource Email emailInstance; @Async public void testMethod(){ lock(); //处理业务逻辑,同时操作emailInstance.emailContent; operationBs(); emailInstance. sendEmail(); unlock(); } }问题原因复述:testMethod方法为防止多线程同时操作,在此处使用了锁,而为了确保发送邮件不影响主程序的执行时间,所以在调用sendEmail方法时,另行开辟了异步线程处理。正是这个方法允许了异步线程处理,所以当第一个线程释放锁后,而异步线程尚未完成邮件发送时,第二个获得锁并执行sendEmail方法时,开启了新的异步线程处理邮件发送,这样就出现了多个线程共同使用emailInstance对象,并同时操作emailInstance的成员变量emailContent。因为如果一个变量是成员变量,那么多个线程对同一个对象的成员变量进行操作时,他们对该成员变量的操作是彼此影响的(也就是说一个线程对成员变量的改变会影响到另一个线程)。这样就会出现上述发送相同内容邮件的情况。理论讲解:JAVA 多线程同时调用单例模式的对象时,该对象中的对成员变量与局部变量是否会受到多个线程的影响?当多个线程对同一个单例对象的同一个成员变量进行操作时,它们对该成员变量的操作是彼此互相影响的(也就是说一个线程对该成员变量的改变会影响到另一个线程) 。对于成员变量的操作,可以使用ThreadLocal来保证线程安全。而多线程调用同一个对象的同一个方法时,每个线程会对方法内部的局部变量都是在线程自己独立的内存区域进行的,也就是说在每个线程的独立内存中都一个局部变量的拷贝,这样一个线程对同一个单例对象的同一方法内的局部变量的改变就不会影响到其他线程中的局部变量,所以是线程安全的。总结,局部变量不会受多线程影响,成员变量会受到多线程影响。多个线程调用同一个对象的同一个方法时,如果方法里无成员变量,那么不受任何影响;如果方法里有成员变量,只有读操作,不受影响,存在写操作,考虑多线程影响值。解决方案:简化方案,去掉Email中sendEmail方法上@Async注解,也就是说将处理业务逻辑和发送邮件合并为同步操作,这样就保证了同一时间只会有一个线程操作成员变量,这样也就避免了线程安全问题。但注意这是以牺牲核心业务逻辑的处理时间来换得安全。将操作成员变量,调整为操作局部变量。参考文档:Java 多线程(四) 多线程访问成员变量与局部变量
整体流程:整体流程get_sheet_names:获取sheet名称和用于区分sheet数据内容的类型数据; b.gen_multi_sheet_excle:循环第一步获取到的数据,根据类型数据查询数据,组织生成包含多个sheet页的excle文件。1.第一步:get_sheet_namesget_sheet_names(1)表输入:获取sheet名称和区分各sheet页对应的类型数据。表输入(2)数据预览:预览2.第二步:gen_multi_sheet_excle循环-转换作业中的第二步需要循环执行第一步获取到的列表数据,在此转换入口的设置如下:(注意:红色线条框住的地方一定不要忘记勾上或配置哦)入口设置gen_multi_sheet_excle转换内容如下:转换过程gen_multi_sheet_excle对应的转换属性配置参数如下:(注意这一步一定不能少哦,之前我是经常忘记配置这个)sheetname:sheet页的名称;datatype:区分sheet页数据的类型数据。转换属性表输入(根据输入变量datatype查询sheet页数据):表输入excle导出(关键核心配置:此处红色框住的区域是生成多sheet页的核心配置,务必要配置准确):excle1excle2导出结果:导出示例截图
new在 Java中意思是”新的“,可以说是 Java 开发者最常用的关键字。在 Java 中 new 的操作往往意味着在内存中开辟新的空间,这个内存空间分配在内存的堆区。堆是用来存放由 new 创建的对象和数组,即动态申请的内存都存放在堆区。栈是用来存放在方法中定义的一些基本类型的变量和对象的引用变量。Java 中一般使用 new 来创建对象,它可以动态地为一个对象分配地址。它的通用格式如下:classname obj = new classname( );其中,obj 是创建的对象,classname 是类的名字,类名后边的( )指明了类的构造方法。构造方法定义了当创建一个对象时要进行的操作。下面我们通过 String 这个类举例说明。public class Test { public static void main(String[] args) { String a = "Java语言中文网"; String b = new String("Java语言中文网"); String c = "Java语言中文网"; String d = new String("Java语言中文网"); System.out.println(a == b); System.out.println(a == c); System.out.println(d == b); System.out.println(a); a = "Java"; System.out.println(a); } }输出结果为:false true false Java语言中文网 Java不同方式定义字符串时堆和栈的变化:String a = "Java语言中文网";在栈中创建一个 String 类的对象引用变量 a,然后查找栈中有没有存放“Java语言中文网”,如果有则直接指向“Java语言中文网",如果没有,则将”Java语言中文网“存放进栈,再指向。String a = new String("Java语言中文网");不仅在栈中创建一个 String 类的对象引用变量 a,同时也在堆中开辟一块空间存放新建的 String 对象“Java语言中文网”,变量 a 指向堆中的新建的 String 对象”Java语言中文网“。==用来比较两个对象在堆区存放的地址是否相同。根据上面的输出结果,我们可以看出:使用 new 运算符创建的 String 对象进行==操作时,两个地址是不同的。这就说明,每次对象进行 new 操作后,系统都为我们开辟堆区空间,虽然值是一样,但是地址却是不一样的。当我们没有使用 new 运算符的时候,系统会默认将这个变量保存在内存的栈区。如果变量的值存放在栈中,使用==比较时,比较的是具体的值。如果变量的值存放在堆中,使用==比较时,比较的是值所在的地址。因此在变量 a 与变量 c 进行==操作的时候,返回 true,因为变量 a 和变量 c 比较的是具体的值,即“C语言中文网”。在改变变量 a 的值后(如 a = "Java"),再次输出时,我们发现输出的结果是”Java“。事实上原来的那个“Java语言中文网”在内存中并没有清除掉,而是在栈区的地址发生了改变,这次指向的是”Java“所在的地址。注意:如果需要比较两个使用 new 创建的对象具体的值,则需要通过“equal()”方法去实现,这样才是比较引用类型变量具体值的正确方式。这时,你可能想知道为什么对整数或字符这样的简单变量不使用 new 运算符。答案是 Java 的简单类型不是作为对象实现的。出于效率的考虑,它们是作为“常规”变量实现的。对象有许多属性和方法,这使得 Java 对对象的处理不同于简单类型。Java 在处理对象和处理简单类型时开销不同,Java 能更高效地实现简单类型。当然,如果你希望完全使用对象类型,那么 Java 也提供了简单类型的对象版本,也就是包装类。大家一定要明白,new 运算符是在运行期间为对象分配内存的,这使得内存的分配更加灵活和高效,你的程序在运行期间可以根据实际情况来合理地分配内存。但是,内存是有限的,因此 new 有可能由于内存不足而无法给一个对象分配内存。如果出现这种情况,就会发生运行时异常。
权限差异表权限范围:同一类 < 同一个包 < 不同包的子类 < 不同包的非子类。public: Java语言中访问限制最宽的修饰符,一般称之为“公共的”。被其修饰的类、属性以及方法不仅可以跨类访问,而且允许跨包(package)访问。private: Java语言中对访问权限限制的最窄的修饰符,一般称之为“私有的”。被其修饰的类、属性以及方法只能被该类的对象访问,其子类不能访问,更不能允许跨包访问。protect: 介于public 和 private 之间的一种访问修饰符,一般称之为“保护形”。被其修饰的类、属性以及方法只能被类本身的方法及子类访问,即使子类在不同的包中也可以访问。-default:即不加任何访问修饰符,通常称为“默认访问模式“。该模式下,只允许在同一个包中进行访问。
1. static关键字的用途static方法就是没有this的方法。在static方法内部不能调用非静态方法,反过来是可以的。而且可以在没有创建任何对象的前提下,仅仅通过类本身来调用static方法。这实际上正是static方法的主要用途。简而言之,一句话来描述就是:方便在没有创建对象的情况下来进行调用(方法/变量)。static可以用来修饰类的成员方法、类的成员变量。static关键字一个比较关键的作用就是用来形成静态代码块以优化程序性能。static块可以置于类中的任何地方,类中可以有多个static块。在类初次被加载的时候,会按照static块的顺序来执行每个static块,并且只会执行一次。2. static关键字性能优化实例为什么说static块可以用来优化程序性能,是因为它的特性:只会在类加载的时候执行一次。下面看两段代码,就可以看出来了:class Person{ private Date birthDate; public Person(Date birthDate) { this.birthDate = birthDate; } boolean isBornBoomer() { Date startDate = Date.valueOf("1946"); Date endDate = Date.valueOf("1964"); return birthDate.compareTo(startDate)>=0 && birthDate.compareTo(endDate) < 0; } }class Person{ private Date birthDate; private static Date startDate,endDate; static{ startDate = Date.valueOf("1946"); endDate = Date.valueOf("1964"); } public Person(Date birthDate) { this.birthDate = birthDate; } boolean isBornBoomer() { return birthDate.compareTo(startDate)>=0 && birthDate.compareTo(endDate) < 0; } }3. 参考资料https://www.cnblogs.com/dolphin0520/p/3799052.html
这段时间通过kettle完成了一个产品到期提醒的功能,也就是通过发送会议邀约的方式实现定时提醒。由于kettle自带有发送邮件的组件,但并没有发送会议邀约的组件,所以只能通过Java代码组件来进行自行开发实现。而在实现这个的过程中,我踩了一些小坑,在此做个小总结,便于后期回顾,也期待能够借此帮助到他人。首先介绍下分享流程:1. 了解使用java代码发送会议邀约的流程;2. 介绍通过kettle的java代码组件发送会议邀约的方式。说明:本次分享主要是针对outlook邮箱和foxmail邮箱,其他邮箱我并没有测试,想必效果应该也都是差不多的。1.java代码发送会议邀约代码很详细,注释也很清楚,下面直接上代码。pom.xml依赖:<dependency> <groupId>javax.mail</groupId> <artifactId>mail</artifactId> <version>1.4.7</version> </dependency>code代码:import javax.activation.DataHandler; import javax.activation.FileDataSource; import javax.mail.*; import javax.mail.internet.*; import javax.mail.util.ByteArrayDataSource; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; public class Email { public void send(String fromEmail, String toEmail, String emailPort, String emailHost, String valarmDt, String subject, String location, String emailContent) { try { Properties props = new Properties(); props.put("mail.smtp.port", emailPort); props.put("mail.smtp.host", emailHost); props.put("mail.transport.protocol", "smtp"); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.starttls.enable", "true"); props.put("mail.smtp.ssl", "true"); Authenticator authenticator = new Authenticator() { @Override protected PasswordAuthentication getPasswordAuthentication() { String userId = "faJianRen@YouXiang.DiZhi"; String password = "PassWord"; return new PasswordAuthentication(userId, password); } }; Session session = Session.getInstance(props, authenticator); MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress(fromEmail)); /* 设置多收件人,Message.RecipientType.CC:密送 Message.RecipientType.BCC:抄送 Message.RecipientType.TO:收件人 */ List<String> emails = new ArrayList<>(); String[] emailArrays = toEmail.split(" "); int a = emailArrays.length; for (String email : emailArrays) { emails.add(email); } message.addRecipients(Message.RecipientType.TO, setRecipientT0(emails)); message.setSubject(subject); StringBuffer buffer = new StringBuffer(); buffer.append("BEGIN:VCALENDAR\n" + "PRODID:-//Microsoft Corporation//Outlook 9.0 MIMEDIR//EN\n" + "VERSION:2.0\n" + "METHOD:REQUEST\n" //METHOD:CANCEL 取消会议 METHOD:REQUEST 创建和更新会议 + "BEGIN:VEVENT\n" + "ATTENDEE;ROLE=REQ-PARTICIPANT;RSVP=TRUE:MAILTO:" + toEmail + "\n" + "ORGANIZER:MAILTO:" + toEmail + "\n" + "DTSTART:" + getUtc(valarmDt + " 08:00") + "\n" + "DTEND:" + getUtc(valarmDt + " 19:00") + "\n" + "LOCATION:" + location + "\n" + "UID:" + UUID.randomUUID().toString() + "\n"//如果id相同的话,outlook会认为是同一个会议请求,所以使用uuid。 + "CATEGORIES:待办提醒\n" + "DESCRIPTION:" + emailContent + "\n\n" // 会议内容换行为\\n + "SUMMARY:汇总 \n" + "PRIORITY:5\n" + "CLASS:PUBLIC\n" + "BEGIN:VALARM\n" + "TRIGGER:-PT15M\n" + "ACTION:DISPLAY\n" + "DESCRIPTION:Reminder\n" + "END:VALARM\n" + "END:VEVENT\n" + "END:VCALENDAR"); BodyPart messageBodyPart = new MimeBodyPart(); // 测试下来如果不这么转换的话,会以纯文本的形式发送过去, //如果没有method=REQUEST;charset=\"UTF-8\",outlook会议附件的形式存在,而不是直接打开就是一个会议请求 messageBodyPart.setDataHandler(new DataHandler(new ByteArrayDataSource(buffer.toString(), "text/calendar;method=REQUEST;charset=\"UTF-8\""))); Multipart multipart = new MimeMultipart(); multipart.addBodyPart(messageBodyPart); // 添加附件 String[] paths = { "D:\\log\\2020-6-30.log", "D:\\log\\2020-7-1.log" }; for (String filePath : paths) { MimeBodyPart part = new MimeBodyPart(); FileDataSource fds = new FileDataSource(filePath); part.setFileName(MimeUtility.encodeWord(fds.getName()));// MimeUtility.encodeWord文件名解决中文乱码 part.setDataHandler(new DataHandler(fds)); multipart.addBodyPart(part); } message.setContent(multipart); Transport.send(message); } catch (Exception ex) { ex.printStackTrace(); } } /*** 设置收件人/抄送人/密送人地址信息*/ private InternetAddress[] setRecipientT0(List<String> recipientT0List) throws Exception { if (recipientT0List.size() > 0) { InternetAddress[] sendTo = new InternetAddress[recipientT0List.size()]; for (int i = 0; i < recipientT0List.size(); i++) { System.out.println("发送到:" + recipientT0List.get(i)); sendTo[i] = new InternetAddress(recipientT0List.get(i), "", "UTF-8"); } return sendTo; } return null; } /** * 转utc时间 */ private static String getUtc(String str) throws Exception { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm"); long millionSeconds = 0; millionSeconds = sdf.parse(str).getTime(); //utc时间差8小时 long currentTime = millionSeconds - 8 * 60 * 60 * 1000; Date date = new Date(currentTime); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowTime = df.format(date); String utcTime = nowTime.replace("-", "").replace(" ", "T").replace(":", ""); return utcTime; } public static void main(String[] args) { try { Email email = new Email(); String fromEmail = "faJianRen@YouXiang.DiZhi"; String toEmail = "XIAGUANGHUI@houzhui.com"; String emailPort = "25"; String emailHost = "10.8.88.88"; String valarmDt = "20200910"; String subject = "subject产品到期提醒20200909"; String location = "1号会议室(此处完全可以写成自定义的内容,不必拘泥于会议邀约)"; String emailContent = "2020-0001|产品1|2020-08-30|200000.00|固定分配|李飞\\n" + "2020-0002|产品2|2020-08-30|200000.00|固定分配2|李飞2"; email.send(fromEmail, toEmail, emailPort, emailHost, valarmDt, subject, location, emailContent); System.out.println("success"); System.out.println(System.currentTimeMillis()); } catch (Exception e) { e.printStackTrace(); } } }效果展示(outlook邮箱):outlook会议邀约效果展示2.使用kettle java代码组件发送会议邀约说明:我的kettle版本8.2,在开发前,需copy到依赖的jar包到kettle\data-integration\lib目录下,依赖jar包分别为javaws.jar、javax.mail.jar、rt.jar。kettlejava代码:// 导入依赖jar import javax.activation.DataHandler; import javax.activation.FileDataSource; import javax.mail.*; import javax.mail.internet.*; import javax.mail.util.ByteArrayDataSource; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws Exception { Object[] r = getRow(); if (r == null) { setOutputDone(); return false; } String haha; if (first) { first = false; } Object[] outputRow = createOutputRow(r, data.outputRowMeta.size()); String fromEmail = "faJianRen@YouXiang.DiZhi"; String toEmail = getParameter("toEmail"); String valarmDt = getParameter("valarmDt"); String emailContent = null; String dqCount = getParameter("DQCOUNT"); int count = Integer.parseInt(dqCount); if(count > 20){ emailContent = valarmDt+"存在" + dqCount + "个产品项目到期,具体详情请查看附件!"; }else{ emailContent = "产品到期列表如下:\\n" + get(Fields.In, "content_dqtx").getString(r); } String result = send(fromEmail, toEmail, valarmDt, emailContent); //get(Fields.Out, "RESULTMSG").setValue(outputRow, result); //输出参数 putRow(data.outputRowMeta, outputRow); return true; } private class EmailAuthenticator extends Authenticator { protected PasswordAuthentication getPasswordAuthentication() { String userId = "faJianRen@YouXiang.DiZhi"; String password = "PASSWORD"; return new PasswordAuthentication(userId, password); } } public String send(String fromEmail, String toEmail, String valarmDt, String emailContent) { try { Properties props = new Properties(); try { props.put("mail.smtp.port", "25"); props.put("mail.smtp.host", "10.8.88.88"); props.put("mail.transport.protocol", "smtp"); props.put("mail.smtp.auth", "true"); props.put("mail.smtp.starttls.enable", "true"); props.put("mail.smtp.ssl", "true"); } catch (Exception e) { e.printStackTrace(); return "-1"; } Session session; Authenticator authenticator = new EmailAuthenticator(); session = Session.getInstance(props, authenticator); MimeMessage message = new MimeMessage(session); message.setFrom(new InternetAddress(fromEmail)); //message.addRecipient(Message.RecipientType.TO, new InternetAddress(toEmail)); List emails = new ArrayList(); String[] emailArrays = toEmail.split(" "); for (int i = 0; i < emailArrays.length; i++) { emails.add(emailArrays[i]); } message.addRecipients(Message.RecipientType.TO,setRecipientT0(emails)); message.setSubject("产品到期提醒"); StringBuffer buffer = new StringBuffer(); buffer.append("BEGIN:VCALENDAR\n" + "PRODID:-//Microsoft Corporation//Outlook 9.0 MIMEDIR//EN\n" + "VERSION:2.0\n" + "METHOD:REQUEST\n" + "BEGIN:VEVENT\n" + "ATTENDEE;ROLE=REQ-PARTICIPANT;RSVP=TRUE:MAILTO:" + toEmail + "\n" + "ORGANIZER:MAILTO:" + toEmail + "\n" + "DTSTART:" + getUtc(valarmDt + " 09:00") + "\n" + "DTEND:" + getUtc(valarmDt + " 09:30") + "\n" + "LOCATION:到期提醒\n" + "UID:" + UUID.randomUUID().toString() + "\n"//如果id相同的话,outlook会认为是同一个会议请求,所以使用uuid。 + "CATEGORIES:待办提醒\n" + "DESCRIPTION:" + emailContent + "\n\n" + "SUMMARY:产品到期提醒\n" + "PRIORITY:5\n" + "CLASS:PUBLIC\n" + "BEGIN:VALARM\n" + "TRIGGER:-PT15M\n" + "ACTION:DISPLAY\n" + "DESCRIPTION:Reminder\n" + "END:VALARM\n" + "END:VEVENT\n" + "END:VCALENDAR"); BodyPart messageBodyPart = new MimeBodyPart(); // 测试下来如果不这么转换的话,会以纯文本的形式发送过去, //如果没有method=REQUEST;charset=\"UTF-8\",outlook会议附件的形式存在,而不是直接打开就是一个会议请求 messageBodyPart.setDataHandler(new DataHandler(new ByteArrayDataSource(buffer.toString(), "text/calendar;method=REQUEST;charset=\"UTF-8\""))); Multipart multipart = new MimeMultipart(); multipart.addBodyPart(messageBodyPart); MimeBodyPart part = new MimeBodyPart(); FileDataSource fds = new FileDataSource("D:\\upload\\kettle_job\\\product_dqfp_rili_tixing\\产品到期项目列表.xls"); part.setFileName(MimeUtility.encodeWord(fds.getName()));// MimeUtility.encodeWord文件名解决中文乱码 part.setDataHandler(new DataHandler(fds)); multipart.addBodyPart(part); message.setContent(multipart); Transport.send(message); } catch (MessagingException me) { me.printStackTrace(); return me.getMessage(); } catch (Exception ex) { ex.printStackTrace(); return "-3"; } return "Success"; } private InternetAddress[] setRecipientT0(List recipientT0List) throws Exception { if (recipientT0List.size() > 0) { InternetAddress[] sendTo = new InternetAddress[recipientT0List.size()]; for (int i = 0; i < recipientT0List.size(); i++) { System.out.println("发送到:" + recipientT0List.get(i)); sendTo[i] = new InternetAddress(recipientT0List.get(i).toString(), "", "UTF-8"); } return sendTo; } return null; } /** * 转utc时间 */ private static String getUtc(String str) { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm"); long millionSeconds = 0; try { millionSeconds = sdf.parse(str).getTime(); } catch (ParseException e1) { e1.printStackTrace(); } //utc时间差8小时 long currentTime = millionSeconds - 8 * 60 * 60 * 1000; Date date = new Date(currentTime); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String nowTime = df.format(date); String utcTime = nowTime.replace("-", "").replace(" ", "T").replace(":", ""); return utcTime; }在使用kettle的java代码组件时,发现如下小坑,在此总结说明一下,以防后面开发重复踩坑:(1) 不支持@override注解,故以上代码EmailAuthenticator,没有使用内部类的方式进行初始化;(2) 无法识别泛型的尖括号<>,可以观察到,原本应该使用泛型的地方,都删除掉了泛型。3.参考资料https://blog.csdn.net/han949417140/article/details/90206475https://www.cnblogs.com/lyxy/p/4568820.htmlhttps://blog.csdn.net/zccbbg/article/details/84162162
最近业务提了一个导出excle报表的功能需求,而之前我记忆中的导出功能,都是直接使用代码画表格的方式,往往是一大坨代码,看着就让人头大。于是我就想着有没有基于excle模板直接替换变量的方式呢,于是就找到jxls,的确很简单,数行代码就解决了我的导出需求,甚是欢喜。特此根据网上查询的资料和自己的实践,整理一下笔记,以便后期学习回顾,也期望能帮助到其他人。直接上代码,代码有注释,注释很详细,注意看注释添加pom依赖或Jar依赖<!-- https://mvnrepository.com/artifact/net.sf.jxls/jxls-core --> <dependency> <groupId>net.sf.jxls</groupId> <artifactId>jxls-core</artifactId> <version>1.0.3</version> </dependency>导出功能在和我原来旧的项目融合时,是只添加了jxls-core即可。但在我搭建的单独测试工程中,还另外依赖了如下的依赖包。工程依赖jar包列表1. 使用jxls基于模板导出单个sheet的excle1.1 单sheet模板样式单个sheet模板样式一般数据直接绑定跟 jstl比较像,直接${name},循环就是<jx:forEach items="${data}" var="item" ></jx:forEach>,上图中红色框框住的区间就是列表遍历的位置。1.2 单sheet代码演示代码具体含义,详见注释:import net.sf.jxls.transformer.XLSTransformer; import org.apache.poi.ss.usermodel.Workbook; import org.junit.Test; import java.io.BufferedOutputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class JxlsDemo { @Test public void method1() throws Exception { // 循环数据 List<Object> list = new ArrayList<>(); for (int i = 0; i < 5; i++) { Map<String,Object> map = new HashMap<>(); map.put("A1", (int)(Math.random()*100)); map.put("A2", (int)(Math.random()*100)+""); map.put("A3", (int)(Math.random()*100)+""); map.put("A4", (int)(Math.random()*100)+""); map.put("A5", (int)(Math.random()*100)+""); map.put("A6", (int)(Math.random()*100)+""); list.add(map); } // 表格使用的数据 Map map = new HashMap(); map.put("data",list); map.put("title","java基于excle模板导出excle"); map.put("val","演示合并单元格"); // 获取模板文件 InputStream is = this.getClass().getClassLoader().getResourceAsStream("temp02.xls"); // 实例化 XLSTransformer 对象 XLSTransformer xlsTransformer = new XLSTransformer(); // 获取 Workbook ,传入 模板 和 数据 Workbook workbook = xlsTransformer.transformXLS(is,map); // 写出文件 OutputStream os = new BufferedOutputStream(new FileOutputStream("D://file2020.xls")); // 输出 workbook.write(os); // 关闭和刷新管道,不然可能会出现表格数据不齐,打不开之类的问题 is.close(); os.flush(); os.close(); } }1.3 单sheet导出效果导出效果2. 使用jxls基于模板导出带有多个sheet的excle2.1 多sheet模板样式代码具体含义,详见注释:多sheet模板2.2 多sheet模板导出代码@Test public void method2() throws Exception { // 循环数据 List<Object> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { Map<String,Object> map = new HashMap<>(); map.put("A1", (int)(Math.random()*100)); map.put("A2", (int)(Math.random()*100)+""); map.put("A3", (int)(Math.random()*100)+""); map.put("A4", (int)(Math.random()*100)+""); map.put("A5", (int)(Math.random()*100)+""); map.put("A6", (int)(Math.random()*100)+""); list.add(map); } // 表格使用的数据 Map map = new HashMap(); map.put("data",list); map.put("title","java基于excle模板导出多sheet的Excle"); map.put("val","演示合并单元格"); List<Object> list1 = new ArrayList<>(); for (int i = 0; i < 10; i++) { Map<String,Object> map3 = new HashMap<>(); map3.put("A1", (int)(Math.random()*100)+""); map3.put("A2", (int)(Math.random()*100)+""); map3.put("A3", (int)(Math.random()*100)+""); map3.put("A4", (int)(Math.random()*100)+""); map3.put("A5", (int)(Math.random()*100)+""); map3.put("A6", (int)(Math.random()*100)+""); list1.add(map3); } // 表格使用的数据 Map map2 = new HashMap(); map2.put("data",list1); map2.put("title","标题title"); map2.put("val","合并测试2"); // 获取模板文件 InputStream is = this.getClass().getClassLoader().getResourceAsStream("tempmultisheet01.xls"); // 实例化 XLSTransformer 对象 XLSTransformer xlsTransformer = new XLSTransformer(); //模板文件对应的sheet名称列表 List templateSheetNameList = new ArrayList<String>(); templateSheetNameList.add("Template1"); templateSheetNameList.add("Template2"); //生成文件对应的sheet名称列表 List sheetNameList = new ArrayList<String>(); sheetNameList.add("表单1"); sheetNameList.add("表单2"); //模板文件不同sheet对应的不同数据,注意保持顺序一致 List beanParamsList = new ArrayList<Map<String,Object>>(); beanParamsList.add(map); beanParamsList.add(map2); // 获取 Workbook ,传入 模板 和 数据 Workbook workbook =xlsTransformer.transformXLS(is,templateSheetNameList,sheetNameList,beanParamsList); // 写出文件 OutputStream os = new BufferedOutputStream(new FileOutputStream("D://multisheetExcleFile2020.xls")); // 输出 workbook.write(os); // 关闭和刷新管道,不然可能会出现表格数据不齐,打不开之类的问题 is.close(); os.flush(); os.close(); }2.3 多sheet导出效果多sheet导出效果3 使用jxls基于模板导出复杂sheet的excle3.1 模板和导出结果样例3.2 复杂sheet模板导出代码@Test public void genComplexExcle() throws Exception { // 循环数据 List<Object> list = new ArrayList<>(); for (int i = 0; i < 5; i++) { Company company = new Company(); company.setName("名称" + Math.random() * 100); company.setAddress("地址" + Math.random() * 100); company.setAge((int) (Math.random() * 100)); list.add(company); } List<Object> list2 = new ArrayList<>(); for (int i = 0; i < 3; i++) { Company company = new Company(); company.setName("名称2" + Math.random() * 100); company.setAddress("地址2" + Math.random() * 100); company.setAge((int) (Math.random() * 100)); list2.add(company); } // 表格使用的数据 Map map = new HashMap(); map.put("data", list); map.put("data2", list2); map.put("title", "这个是标题"); map.put("val", "这是演示合并单元格的填值情况"); map.put("adminName", "戴晓年"); map.put("adminClass", "2021-100班"); // 获取模板文件 InputStream is = this.getClass().getClassLoader().getResourceAsStream("temp03.xls"); // 实例化 XLSTransformer 对象 XLSTransformer xlsTransformer = new XLSTransformer(); // 获取 Workbook ,传入 模板 和 数据 Workbook workbook = xlsTransformer.transformXLS(is, map); // 写出文件 OutputStream os = new BufferedOutputStream(new FileOutputStream("D://companys2021complex.xls")); // 输出 workbook.write(os); // 关闭和刷新管道,不然可能会出现表格数据不齐,打不开之类的问题 is.close(); os.flush(); os.close(); }4. web环境下运行web环境下跑和上面的代码基本上一样,不一样的就是 OutputStream 的地方换成了 response.getOutputStream(),然后 response 设置一下文件名,response.setHeader("Content-Disposition", "attachment;fileName=" + URLEncoder.encode("报表.xls" ,"UTF-8"));代码如下:@RestController @RequestMapping(value = "/report") public class ReportController { private final static Logger LOGGER = LoggerFactory.getLogger(ReportController.class); @RequestMapping(value = "/downLoadDataFile/{createdMonth}", method = RequestMethod.GET) public String downLoadDataFile(HttpServletResponse response,@PathVariable String createdMonth) throws Exception { InputStream is = null; OutputStream os = null; LOGGER.info("downLoadDataFile createdMonth : -> [{}]", createdMonth); try { // 循环数据 List<Object> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { Student student = new Student(); student.setAge1((int) (Math.random() * 100)+"字符串"); student.setAge2((int) (Math.random() * 100)); student.setAge3((int) (Math.random() * 100)); student.setAge4((int) (Math.random() * 100)); student.setAge5((int) (Math.random() * 100)); student.setAge6((int) (Math.random() * 100)); list.add(student); } // 表格使用的数据 Map map = new HashMap(); map.put("data", list); map.put("title", "java基于模板导出"); map.put("val", "演示合并单元格"); // 获取模板文件 is = this.getClass().getClassLoader().getResourceAsStream("temp02.xls"); // 实例化 XLSTransformer 对象 XLSTransformer xlsTransformer = new XLSTransformer(); // 获取Workbook,传入 模板 和 数据 Workbook workbook = xlsTransformer.transformXLS(is, map); // 写出文件 // 设置文件名 response.setHeader("Content-Disposition", "attachment;fileName=" + URLEncoder.encode("报表224.xls" ,"UTF-8")); // 写出文件 os = new BufferedOutputStream( response.getOutputStream() ); // 输出 workbook.write(os); return "Downloading..."; } catch (Exception e) { LOGGER.error("DOWNLOAD data FILE THROW EXCEPTION:" + e.getMessage(), e); return "Error"; } finally { if (os != null) { os.flush(); os.close(); } if (is != null) { is.close(); } } } }5.参考文档https://blog.csdn.net/yali_aini/article/details/85804466https://www.cnblogs.com/haoxiu1004/p/7799028.htmlhttps://www.jianshu.com/p/1f821b519374
使用Java + Freemarker 导出word文档关于使用java导出word文档,网上有很多资料,但基本上来说使用freemarker模板导出的教程居多。所以这次基于网上查到的资料和自己的实践,记录下自己的实践过程,以便日后查阅,也希望能帮到一些人。下面是基本的例子,以实现简单的word导出:1. 组织word对应ftl模板要导出的word模板的内容,启动拼音部分为要在代码种替换的部分。模板编辑好word后将文件另存为.xml文件,然后再将.xml文件后缀改为.ftl。打开ftl文件,依次将变量替换为用${}包裹。注意:替换的内容需要包裹在<w:t> </w:t>之中。另外,最好使用全中文作为占位符。因为使用英文的话,转为xml时,word可能会将一个单词拆分成两个,比如我使用Title作为占位符,转化为xml后,搜索的时候一直找不到。然后你会发现,其实word将其拆分成T和itle。这种事也不是绝对的(同一个单词如果有不同的样式就会保存在不同的<w:r>中),所以只是建议,即便同一个单词被拆分了,也不用急等到后面就有解决方案。word文档的结构对于List类型的内容来说需要进行遍历。对于上面的数据结构来说,我们需要对list进行遍历。在这之前,我们首先了解一下word xml的大概结构<w:wordDocument> <w:body> <w:p> <w:pPr> </w:pPr> <w:r> <w:rPr>属性:加粗,倾斜,字体颜色等</w:rPr> <w:t> 文本内容</w:t> </w:r> </w:p> </w:body> </<w:wordDocument><w:p> 会包裹一段数据,(段落)<w:pPr> 段落的属性,可选元素。 段落属性的一些示例包括对齐方式、边框、断字覆盖、缩进、行距、底纹、文本方向和孤行控制<w:r> 它是具有一组共同属性(如格式设置)的文本区域。它可以包含多个<w:t>元素。如果示例文本中只有一个字是粗体,粗体将会分离到一个<w:r>中<w:rPr>用于指定<w:r>属性。 连续文本属性的一些示例包括粗体、边框、字符样式、颜色、字体、字号、斜体、字距调整、禁用拼写/语法检查、底纹、小号大写字母、删除线、文字方向和下划线<w:t> 实际的文本内容下面我们用一个例子来说明,写了一些内容,并配置了颜色示例另存为xml文件后的部分代码<w:p wsp:rsidR="0084377C" wsp:rsidRPr="002827FA" wsp:rsidRDefault="009C2113"> <w:pPr> <w:rPr> <w:color w:val="000000"/> </w:rPr> </w:pPr> <w:r> <w:rPr><w:rFonts w:hint="fareast"/></w:rPr> <w:t>哈哈</w:t> </w:r> <w:r wsp:rsidRPr="009C2113"> <w:rPr> <w:rFonts w:hint="fareast"/> <w:color w:val="FF0000"/> </w:rPr> <w:t>嗝</w:t> </w:r> <w:r wsp:rsidRPr="002827FA"> <w:rPr> <w:rFonts w:hint="fareast"/> <w:color w:val="000000"/> </w:rPr> <w:t>哈哈</w:t> </w:r> </w:p>从上面可以清楚的看到,上面的内容在一个段落里包裹。同时在一个段落里可以设置多个不同的文字样式,这部分数据就会存放在 <w:r> 中,样式数据就存放在<w:rPr> 里面。所以说如果我们需要遍历,首先要找到需要遍历的位置在哪里?找好以后就完成了一半的工作。例如上面的小案例,我们需要遍历学号和内容。 所以首先定位到 “xuehao” 所在的<w:p> 然后查找 “选项”所在的</w:p>。 然后将这么内容使用<#list> </#list>包裹就可以了。<#list list as stu> <w:tr wsp:rsidR="00B362B3" wsp:rsidRPr="00B55103" wsp:rsidTr="00B55103"> <w:trPr><w:trHeight w:val="563"/></w:trPr> <w:tc> <w:tcPr><w:tcW w:w="4148" w:type="dxa"/><w:shd w:val="clear" w:color="auto" w:fill="auto"/></w:tcPr> <w:p wsp:rsidR="00B362B3" wsp:rsidRPr="00B55103" wsp:rsidRDefault="00B362B3"> <w:proofErr w:type="spellStart"/> <w:r wsp:rsidRPr="00B55103"><w:t>${stu.xuehao}</w:t></w:r> <w:proofErr w:type="spellEnd"/> </w:p> </w:tc> <w:tc> <w:tcPr><w:tcW w:w="4148" w:type="dxa"/><w:shd w:val="clear" w:color="auto" w:fill="auto"/></w:tcPr> <w:p wsp:rsidR="00B362B3" wsp:rsidRPr="00B55103" wsp:rsidRDefault="00B362B3"> <w:proofErr w:type="spellStart"/> <w:r wsp:rsidRPr="00B55103"><w:rPr><w:rFonts w:hint="fareast"/></w:rPr></w:r> <w:r wsp:rsidRPr="00B55103"><w:t>${stu.neirong}</w:t></w:r> <w:proofErr w:type="spellEnd"/> </w:p> </w:tc> </w:tr> </#list>2. 添加freemarker依赖<dependency> <groupId>org.freemarker</groupId> <artifactId>freemarker</artifactId> <version>2.3.30</version> </dependency>3. 测试代码package demo; import freemarker.template.Configuration; import freemarker.template.Template; import java.io.*; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class WordTest { private Configuration configuration = null; public WordTest() { configuration = new Configuration(); configuration.setDefaultEncoding("UTF-8"); } public static void main(String[] args) { WordTest test = new WordTest(); test.createWord(); } public void createWord() { Map<String, Object> dataMap = new HashMap<String, Object>(); getData(dataMap); configuration.setClassForTemplateLoading(this.getClass(), "/");//模板文件所在路径,此处我是存放在resource目录下 try { Template t = configuration.getTemplate("wordtemplate.ftl"); //获取模板文件 File outFile = new File("D:/outFile" + Math.random() * 10000 + ".doc"); //导出文件 Writer out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile))); t.process(dataMap, out); //将填充数据填入模板文件并输出到目标文件 } catch (Exception e) { e.printStackTrace(); } } private void getData(Map<String, Object> dataMap) { dataMap.put("title", "标题"); dataMap.put("nian", "2020"); dataMap.put("yue", "09"); dataMap.put("ri", "08"); dataMap.put("shenheren", "李小龙"); List<Map<String, Object>> list = new ArrayList<Map<String, Object>>(); for (int i = 0; i < 10; i++) { Map<String, Object> map = new HashMap<String, Object>(); map.put("xuehao", i); map.put("neirong", "内容" + i); list.add(map); } dataMap.put("list", list); } }4. 文件结构文件结构5. 导出文件效果效果图6. 参考文档https://blog.csdn.net/yamadeee/article/details/82771035https://www.cnblogs.com/lcngu/p/5247179.html
一 所需软件:Redis、Ruby语言运行环境、Redis的Ruby驱动redis-xxxx.gem、创建Redis集群的工具redis-trib.rb。安装Redis集群,至少需要运行3个Redis实例节点,因为低于3个实例节点集群无法完成创建,此次就规划三个节点;使用redis-trib.rb工具来创建Redis集群,由于该文件是用ruby语言写的,所以需要安装Ruby开发环境,以及驱动redis-xxxx.gem。二 下载并安装Redis1 下载Redisredis下载地址 https://github.com/MSOpenTech/redis/releases ; 下载Redis-x64-3.2.100.zip。把 redis 解压后,再复制出 2 份,配置 一主两从集群。 由于 redis 默认端口号为 6379,那么其它2份的端口可以为6380,6381。 目录如下:redis集群目录2 更改配置分别打开每个节点目录下的redis.windows.conf,修改里面的端口号和其他集群支持配置,配置如下。port 6379 # 相应其他2个redis修改端口号为6380,6381 cluster-enabled yes #开启集群 cluster-config-file nodes-6379.conf #为该节点的配置信息,这里使用 nodes-端口.conf命名方法。服务启动后会在目录生成该文件 cluster-node-timeout 15000 #超时时间设置 appendonly yes注意:为了避免不必要的错误,配置文件尽量保存为utf8格式,并且不要包含注释。3 编写启动脚本编写一个bat文件来启动redis,在每个节点目录下建立start.bat,内容如下:title redis-6379 redis-server.exe redis.windows.conf三 下载并安装Ruby1 安装Ruby下载路径如下:http://dl.bintray.com/oneclick/rubyinstaller/rubyinstaller-2.2.4-x64.exe。下载后,双击安装即可。为了操作方便,也是建议安装在盘符根目录下,如: C:\Ruby22-x64 ,安装时这里选中后两个选项,意思是将ruby添加到系统的环境变量中,在cmd命令中能直接使用ruby的命令。安装Ruby2 安装Ruby下的Redis驱动下载Ruby环境下Redis的驱动,考虑到兼容性,这里下载的是3.2.2版本:https://rubygems.org/gems/redis/versions/3.2.2,注意下载功能在页面右下角相关连接一项中。将下载的驱动文件复制到Ruby安装目录下,并执行如下命令进行安装驱动:gem install --local path_to_gem/filename.gem安装Redis驱动3 下载创建Redis集群的ruby脚本工具redis-trib.rb下载Redis官方提供的创建Redis集群的ruby脚本文件redis-trib.rb,路径如下:https://raw.githubusercontent.com/MSOpenTech/redis/3.0/src/redis-trib.rb。打开该链接如果没有下载,而是打开一个页面,那么将该页面保存为redis-trib.rb,建议保存到Redis的目录下。另外,因为redis-trib.rb是ruby代码,必须用ruby来打开。四 创建Redis集群1 启动每个redis节点点击每个节点目录下的start.bat文件即可运行。2 使用redis-trib.rb来创建Redis集群cmd下切换到redis_6379目录,使用redis-trib.rb来创建Redis集群,执行命令行如下:ruby redis-trib.rb create --replicas 0 127.0.0.1:6379 127.0.0.1:6380 127.0.0.1:6381在出现 Can I set the above configuration? (type 'yes' to accept): 请确定并输入 yes 。创建Redis集群3 检验集群是否创建成功检验是否真的创建成功,输入以下命令:ruby redis-trib.rb check 127.0.0.1:6379,出现以下信息,说明创建的Redis集群是没问题。检验参考文章:1 .Windows系统下搭建Redis集群 2.在windows上搭建redis集群(redis-cluster)
4 怎么回收垃圾下面讨论几种常见的垃圾收集算法的核心思想。4.1 标记-清除算法(Mark-Sweep)标记清除算法(Mark-Sweep)是最基础的一种垃圾回收算法,它分为2部分,从根集合(GC Roots)进行扫描,对存活的对象进行标记,标记完毕后,再扫描整个空间中未被标记的对象,如上图所示,标记-清除算法不需要进行对象的移动,只需对不存活的对象进行处理,在存活对象较多的情况下极为高效,但由于标记-清除算法直接回收不存活的对象,因此会造成内存碎片。我们知道开辟内存空间时,需要的是连续的内存区域,如果内存碎片都是1M大小的话,这时候我们若需要一个 2M的内存区域,其中有2个 1M 是没法用的。这样就导致,其实我们本身还有这么多的内存的,但却用不了。4.2 复制算法(Copying)复制算法(Copying)是在标记清除算法上演化而来,解决标记清除算法的内存碎片问题。它开始时把堆分成一个对象面和多个空闲面, 程序从对象面为对象分配空间,当对象面满了,基于copying算法的垃圾收集就从根集合(GC Roots)中扫描活动对象,并将每个活动对象复制到空闲面(使得活动对象所占的内存之间没有空闲洞),这样空闲面变成了对象面,原来的对象面变成了空闲面,程序会在新的对象面中分配内存。这样就保证了内存空间的连续可用,内存分配时也就不用考虑内存碎片等复杂情况,逻辑清晰,运行高效。然而很明显暴露了另一个问题,空间浪费,代价实在太高。4.3 标记-整理算法(Mark-Compact)标记整理算法(Mark-Compact)标记过程仍然与标记清除算法一样,但后续步骤不是直接对可回收对象进行清理,而是让所有存活的对象都向一端移动,并更新对应的空闲指针,然后再清理掉端指针边界以外的内存区域。标记整理算法一方面在标记-清除算法上做了升级,解决了内存碎片的问题,也规避了复制算法只能利用一半内存区域的弊端。看起来很美好,但从上图可以看到,它对内存变动更频繁,需要整理所有存活对象的引用地址,在效率上比复制算法要差很多。4.4 分代收集算法分代收集算法(Generational Collection)严格来说并不是一种思想或理论,而是融合上述3种基础的算法思想,而产生的针对不同情况所采用不同算法的一套组合拳。对象存活周期的不同将内存划分为几块,一般是把 Java 堆分为新生代和老年代,这样就可以根据各个年代的特点采用最适当的收集算法。在新生代中,每次垃圾收集时都发现有大批对象死去,只有少量存活,那就选用复制算法,只需要付出少量存活对象的复制成本就可以完成收集。而老年代中因为对象存活率高、没有额外空间对它进行分配担保,就必须使用标记-清理或者标记 - 整理算法来进行回收。so,另一个问题来了,那内存区域到底被分为哪几块,每一块又有什么特别适合什么算法呢?4.5 内存模型与回收策略Java 堆(Java Heap)是JVM所管理的内存中最大的一块,堆又是垃圾收集器管理的主要区域,这里我们主要分析一下 Java 堆的结构。Java堆主要分为2个区域:新生代与老年代,其中新生代内存按照8:1:1的比例又分为 Eden 区和 两个Survivor区( From 和 To 2个区)。可能这时候大家会有疑问,为什么要分为新生代与老年代呢?而新生代为什么又需要Survivor 区,为什么Survivor区还要再分2个区呢。别急,下面咱就絮叨絮叨。4.5.1 新生代的回收算法(回收以Copying复制算法为主)所有新生成的对象首先都是放在新生代的,新生代的目标就是尽可能的快速收集那些生命周期短的对象。大多数情况下,对象会在新生代 Eden 区中进行分配,当Eden区没有足够空间进行分配时,虚拟机会发起一次Minor GC,Minor GC 相比 Major GC 更频繁,回收速度也更快。进行Minor GC时,会将Eden区无需回收的对象复制到Survivor的From区(若From区不够,则直接进入Old区),然后清空Eden区。当From区也存放满了时,会将Eden区和From存活的对象放到Survivor的To区,然后清空Eden区和Survivor的From区。此时Survivor的From区是空的,然后将Survivor的From区和To区交换,即保持Survivor的To区为空,如此往复。当Survivor的To区空间不够,不足以存放Eden 区和 From 存活的对象事,就会将存活对象直接存放到 老年代(Old 区)。新生代发生的GC也叫做Minor GC,MinorGC发生频率比较高(不一定等Eden区满了才触发)。(1)为啥需要Survivor区?不就是新生代到老年代么,直接 Eden 到 Old 不好了吗,为啥要这么复杂。想想如果没有 Survivor 区,Eden 区每进行一次 Minor GC,存活的对象就会被送到老年代,老年代很快就会被填满。而有很多对象虽然一次 Minor GC 没有消灭,但其实也并不会蹦跶多久,或许第二次,第三次就需要被清除。这时候移入老年区,很明显不是一个明智的决定。所以,Survivor 区相当于是 Eden 区和 Old 区的一个缓冲,类似于我们交通灯中的黄灯。它存在意义就是减少被送到老年代的对象,进而减少 Major GC 的发生。Survivor 的预筛选保证,只有经历16次 Minor GC 还能在新生代中存活的对象,才会被送到老年代。(2)为啥Survivor需要两个区?设置两个Survivor区最大的好处就是解决内存碎片化。我们先假设一下,Survivor如果只有一个区域会怎样。Minor GC执行后,Eden区被清空了,存活的对象放到了Survivor区,而之前Survivor区中的对象,可能也有一些是需要被清除的。问题来了,这时候我们怎么清除它们?在这种场景下,我们只能标记清除,而我们知道标记清除最大的问题就是内存碎片,在新生代这种经常会消亡的区域,采用标记清除必然会让内存产生严重的碎片化。因为Survivor有2个区域,所以每次Minor GC,会将之前Eden区和From区中的存活对象复制到To区域。第二次Minor GC时,From与To职责兑换,这时候会将 Eden区和To区中的存活对象再复制到From区域,以此反复。(职责会互换)这种机制最大的好处就是,整个过程中,永远有一个Survivor space是空的,另一个非空的Survivor space是无碎片的。那么,Survivor为什么不分更多块呢?比方说分成三个、四个、五个?显然,如果Survivor区再细分下去,每一块的空间就会比较小,容易导致Survivor区满,两块Survivor区可能是经过权衡之后的最佳方案。4.5.2 老年代的回收算法(回收以标记-整理算法为主)老年代占据着2/3的堆内存空间,只有在 Major GC 的时候才会进行清理,每次 GC 都会触发“Stop-The-World”。内存越大,STW 的时间也越长,所以内存也不仅仅是越大就越好。由于复制算法在对象存活率较高的老年代会进行很多次的复制操作,效率很低,所以老年代这里采用的是标记-整理算法。除了上述所说,在内存担保机制下,无法安置的对象会直接进到老年代,以下几种情况也会进入老年代。(1)大对象:大对象指需要大量连续内存空间的对象,这部分对象不管是不是“朝生夕死”,都会直接进到老年代。这样做主要是为了避免在 Eden 区及2个 Survivor 区之间发生大量的内存复制。当你的系统有非常多“朝生夕死”的大对象时,得注意了。(2)长期存活对象: 虚拟机给每个对象定义了一个对象年龄(Age)计数器。正常情况下对象会不断的在 Survivor 的 From 区与 To 区之间移动,对象在 Survivor 区中没经历一次 Minor GC,年龄就增加1岁。当年龄增加到15岁时,这时候就会被转移到老年代。当然,这里的15,JVM 也支持进行特殊设置。(3)动态对象年龄: 虚拟机并不重视要求对象年龄必须到15岁,才会放入老年区,如果 Survivor 空间中相同年龄所有对象大小的总合大于 Survivor 空间的一半,年龄大于等于该年龄的对象就可以直接进去老年区,无需等你“成年”。这其实有点类似于负载均衡,轮询是负载均衡的一种,保证每台机器都分得同样的请求。看似很均衡,但每台机的硬件不通,健康状况不同,我们还可以基于每台机接受的请求数,或每台机的响应时间等,来调整我们的负载均衡算法。5 GC是什么时候触发的GC分为两种:Major GC(或称为Full GC)和minor GC,老年代采用标记-整理算法的Major GC,新生代采用复制算法的minor GC。新生代是GC收集垃圾的频繁区域。在最近几个版本的JDK里默认包括了对永生带即方法区的回收(JDK8中无永生带了),出现Full GC的时候经常伴随至少一次的Minor GC,但非绝对的。Major GC的速度一般会比Minor GC慢10倍以上。下边看看有那种情况触发JVM进行Full GC及应对策略。Minor GC触发条件:一般情况下,当新对象生成,并且在Eden区申请空间失败时,就会触发触发Minor GC。Full GC触发条件:(1)System.gc()方法的调用此方法的调用是建议JVM进行Full GC,虽然只是建议而非一定,但很多情况下它会触发 Full GC,从而增加Full GC的频率,也即增加了间歇性停顿的次数。强烈影响系建议能不使用此方法就别使用,让虚拟机自己去管理它的内存,可通过通过-XX:+ DisableExplicitGC来禁止RMI(Java远程方法调用)调用System.gc。(2)老年代空间不足老年代空间只有在新生代对象转入及创建为大对象、大数组时才会出现不足的现象,当执行Full GC后空间仍然不足,则抛出如下错误: java.lang.OutOfMemoryError: Java heap space为避免以上两种状况引起的Full GC,调优时应尽量做到让对象在Minor GC阶段被回收、让对象在新生代多存活一段时间及不要创建过大的对象及数组。(3)方法区空间不足JVM规范中运行时数据区域中的方法区,在HotSpot虚拟机中又被习惯称为永生代或者永生区,Permanet Generation中存放的为一些class的信息、常量、静态变量等数据,当系统中要加载的类、反射的类和调用的方法较多时,Permanet Generation可能会被占满,在未配置为采用CMS GC的情况下也会执行Full GC。如果经过Full GC仍然回收不了,那么JVM会抛出如下错误信息:java.lang.OutOfMemoryError: PermGen space。为避免Perm Gen占满造成Full GC现象,可采用的方法为增大Perm Gen空间或转为使用CMS GC。(4)通过Minor GC后进入老年代的平均大小大于老年代的可用内存如果发现统计数据说之前Minor GC的平均晋升大小比目前old gen剩余的空间大,则不会触发Minor GC而是转为触发full GC(5)由Eden区、From Space区向To Space区复制时,对象大小大于To Space可用内存,则把该对象转存到老年代,且老年代的可用内存小于该对象大小。
配置加载方式canal配置方式有两种:ManagerCanalInstanceGenerator: 基于manager管理的配置方式,目前alibaba内部配置使用这种方式。大家可以实现CanalConfigClient,连接各自的管理系统,即可完成接入。SpringCanalInstanceGenerator:基于本地spring xml的配置方式,目前开源版本已经自带该功能所有代码,建议使用。Spring配置spring配置的原理是将整个配置抽象为两部分:xxx-instance.xml: canal组件的配置定义,可以在多个instance配置中共享xxx-properties: 每个instance通道都有各自的一份定义,因为每个mysql的ip,账号,密码等等信息不会相同通过spring的PropertyPlaceholderConfigurer通过机制将两部分融合,生成一个instance实例对象,每个instance对应的组件都是相互独立的,互不影响。properties配置properties配置文件分为两部分:canal.properties: 系统根配置文件,对应一个canal serverinstance.properties: instance级别的配置文件,每个instance一份instance.properties配置介绍a.在canal.properties定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件比如:canal.destinations = example1,example2,这时需要创建example1和example2两个目录,每个目录里各自有一份instance.properties.ps. canal自带了一份instance.properties demo,可直接复制conf/example目录进行配置修改。cp -R example example1/ cp -R example example2/b. 如果canal.properties未定义instance列表,但开启了canal.auto.scan时server第一次启动时,会自动扫描conf目录下,将文件名做为instance name启动对应的instance server运行过程中,会根据canal.auto.scan.interval定义的频率,进行扫描1.发现目录有新增,启动新的instance2.发现目录有删除,关闭老的instance3.发现对应目录的instance.properties有变化,重启instance.xxx-instance.xml配置介绍目前默认支持的xxx-instance.xml有四种:memory-instance.xmlfile-instance.xmldefault-instance.xmlgroup-instance.xml.spring/memory-instance.xml介绍:所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析。特点:速度最快,依赖最少(不需要zookeeper)场景:一般应用在quickstart,或者是出现问题后,进行数据分析的场景,不应该将其应用于生产环境spring/file-instance.xml介绍:所有的组件(parser , sink , store)都选择了基于file持久化模式;特点:不支持HA机制.支持单机持久化.场景:生产环境,无HA需求,简单可用.spring/default-instance.xml介绍:所有的组件(parser , sink , store)都选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.特点:支持HA场景:生产环境,集群化部署spring/group-instance.xml介绍:主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。场景:分库业务。比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可.参考文章:https://github.com/alibaba/canal/wiki/AdminGuide
一 架构设计简单架构设计说明:两个mysql库中均创建有canal/canal的账户;这里A、B两个mysql库是用来模拟t_dept进行分库分表;另外,在A、B两种表中都创建有表t_canal.canal原理: 可查看文章【了解canal,看这个就够了】安装与搭建流程:可参考文章【canal应用-1个server+2个instance+2个client+2个mysql】处理分表分库的场景,主要是要使用配置group-instance.xml。group-instance主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。比如产品数据拆分了4个库,每个库会有一个instance,如果不用group,业务上要消费数据时,需要启动4个客户端,分别链接4个instance实例。使用group后,可以在canal server上合并为一个逻辑instance,只需要启动1个客户端,链接这个逻辑instance即可。二 关键实现流程2.1 canal.properties配置文件canal.properties是对应一个canal server的全局配置,保存位置:/usr/local/hadoop/app/canal_group/conf/canal.properties。配置修改内容如下:canal.id = 1 #唯一标识 canal.ip =192.168.175.20 # client访问canal server的ip地址 canal.port = 11111 # client访问canal server的端口 #canal.instance.global.spring.xml = classpath:spring/file-instance.xml #原来是这个 canal.instance.global.spring.xml = classpath:spring/group-instance.xml #启动这个 #其他配置保持默认即可.2.2 instance.properties配置文件使用如下命令复制出两个代表canal instance的文件夹:cp -R example t_dept; cp -R example t_canal; rm -rf example;调整配置文件/usr/local/hadoop/app/canal/conf/t_dept/instance.properties如下:#canal.instance.master.address=192.168.175.21:3306 #原来的 canal.instance.master1.address=192.168.175.21:3306 #新增,与group-instance.xml的对应 canal.instance.master2.address=192.168.175.22:3306 #新增,与group-instance.xml的对应 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal # mq config canal.mq.topic=t_dept调整配置文件/usr/local/hadoop/app/canal/conf/t_canal/instance.properties如下:#canal.instance.master.address=192.168.175.21:3306 #原来的 canal.instance.master1.address=192.168.175.21:3306 #新增,与group-instance.xml的对应 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal # mq config canal.mq.topic=t_canal2.3 group-instance.xml配置文件配置文件/usr/local/hadoop/app/canal/conf/spring/group-instance.xml不需要做调整。2.4 启动canal server进入文件夹/usr/local/hadoop/app/canal/bin执行如下启动命令:./startup.sh查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.20:11111] 2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......2.5 使用canal client连接canal server注意运行canal客户端代码时,一定要先启动canal server!!!(1) 添加pom依赖<!--canal--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>(2) canal client代码:package com.xgh.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class CanalClientGroupTest1 { public static void main(String args[]) { //String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181"; // 创建链接 //CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example","",""); CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.21", 11111), "t_dept", "", ""); int batchSize = 1000; int emptyCount = 0; long batchId = 0; //外层死循环:在canal节点宕机后,抛出异常,等待zk对canal处理切换,切换完后,继续创建连接处理数据 while(true) { try { connector.connect(); connector.subscribe(".*\\..*");//订阅所有库下面的所有表 //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal connector.rollback(); //内层死循环:按频率实时监听数据变化,一旦收到变化数据,立即做消费处理,并ack,考虑消费速度,可以做异步处理并ack. while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 batchId = message.getId(); int size = message.getEntries().size(); //// 偏移量不等于-1 或者 获取的数据条数不为0 时,认为拿到消息,并处理 if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據 Thread.sleep(1000); //1000ms拉一次变动数据 } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // } }catch(Exception e){ e.printStackTrace(); connector.rollback(batchId); // 处理失败, 回滚数据 } finally { connector.disconnect(); } } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }2.6 其他将canal client代码CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.21", 11111), "t_dept", "", "");中的队列名t_dept换成t_canal再执行,就可以监听t_canal对应数据变化了.三 运行测试及总结1. 监听t_dept的canal client可以接收到数据库A和B的数据变化2. 监听t_canal的canal client只能接收到数据库B的数据变化3. 数据过滤的设置问题当在instance.properties和canal client中对设置filter时,canal client的设置会覆盖instance.properties中的配置。所以不如干脆保持instance.properties为默认状态,也即是不过滤,然后过滤全部设置在canal client中,如下:connector.connect(); connector.subscribe(".*\\..*");//订阅所有库下面的所有表 //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal四 高可用架构设计高可用架构设计参考文章:https://www.cnblogs.com/yulu080808/p/8819260.htmlhttps://github.com/alibaba/canalhttps://blog.csdn.net/my201110lc/article/details/78836270
一 高可用架构设计架构设计图配置说明:zookeeper x 3 + canal x 2 + mysql x 2组件说明:linux内核版本(CentOS Linux 7):(命令:uname -a)Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linuxmysql版本:(SQL命令:select version(); 或 status)Server version: 5.6.43-log MySQL Community Server (GPL)canal版本:canal-1.1.3zookeeper版本:zookeeper-3.4.5-cdh5.7.0JDK版本: 1.8canal工作原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)了解更多详细更新可以查看文章:【了解canal,看这个就够了】二 配置与部署流程2.1 安装mysql数据库1. 下载安装在192.168.175.21和192.168.175.22两台服务器上分别安装mysql,具体安装流程可参考文章:Linux-安装MySQL.2. 创建canal账户在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; mysql> GRANT ALL ON canal.* TO 'canal'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%'; mysql>FLUSH PRIVILEGES;3. 验证登录#远程登录 mysql -h 192.168.175.22 -P 3306 -u canal -pcanal #本地登录 mysql -ucanal -pcanal4. 修改my.cnf配置(这一步非常关键!!!)分别在175.21和175.22两台服务器修改my.conf配置,查找my.cnf配置位置命令:whereis my.示例,在192.168.175.21的my.cnf配置新增如下内容:log_bin=mysql-bin #指定bin-log的名称,尽量可以标识业务含义 binlog_format=row #选择row模式,必须!!! server_id=1 #mysql服务器id2.2 搭建zookeeper集群搭建zookeeper集群地址为192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181,具体搭建流程,可查看文章【Zookeepr3.4.5集群搭建】。2.3 搭建canal server集群前提: mysql已打开binlog功能,且配置binlog模式为row.1. 下载最新canal安装包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz2.上传并解压进入192.168.175.20服务器,使用rz命令上传,使用如下命令进行解压至/usr/local/hadoop/app/canal:tar xzvf canal.deployer-1.1.3.tar.gz -C canal3. 修改配置instance.properties新解压的文件夹/usr/local/hadoop/app/canal/conf/有一个example文件夹,一个example就代表一个instance实例.而一个instance实例就是一个消息队列,所以这里可以将文件名改为example1,同时再复制出来一个叫example2.(命名可以使用监听的数据库名)修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:canal.instance.master.address=192.168.175.21:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example1修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:canal.instance.master.address=192.168.175.22:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example2配置文件参数说明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide4. 修改配置canal.properties配置/usr/local/hadoop/app/canal/conf/canal.properties是一个对应canal server的全局配置(instance.properties是对应canal instance的配置)。canal.id = 2 #保证每个canal server的id不同 canal.port = 11111 canal.zkServers =192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml #其他配置默认即可.注意: 两台机器上的instance目录的名字需要保证完全一致,HA模式是依赖于instance name进行管理,同时必须都选择default-instance.xml配置。配置完成,将文件从192.168.175.20远程复制一份到192.168.175.22上:#需要确保已开通免密 scp -rp /usr/local/hadoop/app/canal slave2:/usr/local/hadoop/app/5. 启动canal server进入文件夹/usr/local/hadoop/app/canal/bin执行如下启动命令:./startup.sh查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111] 2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......在zk集群中查看canal节点注册情况:[zk: localhost:2181(CONNECTED) 27] ls2 /otter/canal/destinations [example2, example1] [zk: localhost:2181(CONNECTED) 26] ls2 /otter/canal/cluster [192.168.175.22:11111, 192.168.175.20:11111]可以看到canal server节点已经在zk集群上注册成功.当停掉一个canal server时,可以看到zk上对应的临时节点也会删除.2.4 使用canal client通过zookeeper连接canal server集群注意运行canal客户端代码时,一定要先启动canal server!!!(1) 添加pom依赖<!--canal--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>(2) canal client代码:package com.xgh.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class TestCanalByZk { public static void main(String args[]) { String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181"; // 创建链接 CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example1","",""); /*CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.22", 11111), "example", "", "");*/ int batchSize = 1000; int emptyCount = 0; long batchId = 0; //外层死循环:在canal节点宕机后,抛出异常,等待zk对canal处理切换,切换完后,继续创建连接处理数据 while(true) { try { connector.connect(); connector.subscribe(".*\\..*");//订阅所有库下面的所有表 //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal connector.rollback(); //内层死循环:按频率实时监听数据变化,一旦收到变化数据,立即做消费处理,并ack,考虑消费速度,可以做异步处理并ack. while (true) { Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 batchId = message.getId(); int size = message.getEntries().size(); //// 偏移量不等于-1 或者 获取的数据条数不为0 时,认为拿到消息,并处理 if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據 Thread.sleep(200); //200ms拉一次变动数据 } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 } }catch(Exception e){ e.printStackTrace(); connector.rollback(batchId); // 处理失败, 回滚数据 } finally { connector.disconnect(); } } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }canal client运行实例:empty count : 1 empty count : 2 empty count : 3 empty count : 46. 触发数据库变更创建库:create database canal;创建表:create table t_canal (id int,name varchar(20),status int);插入数据:insert into t_canal values(11,'xxiao',1);canal client输出日志:================> binlog[mysql-bin.000001:6973] , name[canal,t_canal] , eventType : INSERT id : 11 update=true name : xxiao update=true status : 1 update=true7. 其他将canal client代码CanalConnectors.newClusterConnector(zkHost,"example1","","");中的队列名example1换成example2再执行,就可以监听example2对应数据变化了.8. 问题:为何设置了数据表的过滤条件,但貌似没有生效?答:首先看文档AdminGuide,了解canal.instance.filter.regex的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\) 。常见例子:所有表:.* or .\..canal schema下所有表: canal\..*canal下的以canal打头的表:canal\.canal.*canal schema下的一张表:canal.test1多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)。检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据 【特别注意】三 运行测试及总结1. 启动两个监听example1的canal client,启动两个监听example2的canal client:在example1或example2对应的数据发生变化时,两个canal client只有一个消费消息。当两个监听同一个队列的canal client有一个宕掉时,再有数据变化时,剩下的一个canal client就会开始消费数据。这就验证了canal client的HA机制:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序.2. 启动两个canal server并在zk上注册当停掉其中一个canal server时,当产生数据变化时,整个canal server集群仍可以正常对外提供服务。这就验证了canal server的HA机制:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.3. 在canal server切换过程中,canal client存在重复消费数据的问题这点需要在消费端自行进行处理。参考文章:https://www.2cto.com/database/201609/547661.htmlhttps://www.cnblogs.com/yulu080808/p/8819260.htmlhttps://github.com/alibaba/canalhttps://blog.csdn.net/my201110lc/article/details/78836270
一 canal应用架构设计组件说明:linux内核版本(CentOS Linux 7):(命令:uname -a)Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linuxmysql版本:(SQL命令:select version(); 或 status)Server version: 5.6.43-log MySQL Community Server (GPL)canal版本:canal-1.1.3JDK版本: 1.8canal工作原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)了解更多详细更新可以查看文章:【了解canal,看这个就够了】二 架构落地实现流程2.1 mysql配置与安装1. 下载安装在192.168.175.21和192.168.175.22上分别安装mysql,具体安装流程可参考文章:Linux-安装MySQL.2. 创建canal账户在创建root账号并设置远程访问之后,接着创建canal账号并设置远程访问和权限:mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; mysql> GRANT ALL ON canal.* TO 'canal'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'canal'@'%'; mysql>FLUSH PRIVILEGES;3. 验证登录#远程登录 mysql -h 192.168.175.22 -P 3306 -u canal -pcanal #本地登录 mysql -ucanal -pcanal4. 修改my.cnf配置分别在175.21和175.22两台服务器修改my.conf配置,查找my.cnf配置位置命令:whereis my.192.168.175.21中的my.cnf配置新增如下内容:log_bin=mysql-bin #指定bin-log的名称,尽量可以标识业务含义 binlog_format=row #选择row模式,必须!!! server_id=1 #mysql服务器id2.2 canal server配置与启动1. 下载canal下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz2.上传并解压进入192.168.175.20服务器,使用rz命令上传,使用如下命令进行解压至/usr/local/hadoop/app/canal:tar xzvf canal.deployer-1.1.3.tar.gz -C canal3. 修改配置新解压的文件夹/usr/local/hadoop/app/canal/conf/有一个example文件夹,一个example就代表一个instance实例.而一个instance实例就是一个消息队列,所以这里可以将文件名改为example1,同时再复制出来一个叫example2.(命名可以使用监听的数据库名)修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:canal.instance.master.address=192.168.175.21:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example1修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:canal.instance.master.address=192.168.175.22:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example2配置文件参数说明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide4. 启动canal server进入文件夹/usr/local/hadoop/app/canal/bin执行如下命令:./startup.sh查看日志/usr/local/hadoop/app/canal/logs/canal/canal.log,出现如下内容,即表示启动成功:2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111] 2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......5. 启动canal client注意运行canal客户端代码时,一定要先启动canal server!!!(1) 添加pom依赖<!--canal--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>(2) canal client代码:package com.xgh.canal; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; public class CanalClientTest { public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.20", 11111), "example1", "", "");//或者example2 int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*");//订阅所有库下面的所有表 //connector.subscribe("canal.t_canal");//订阅库canal库下的表t_canal connector.rollback(); int totalEmtryCount = 1200; while (emptyCount < totalEmtryCount) {//实际生产中需要设置为true,死循环 Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交确认 // connector.rollback(batchId); // 处理失败, 回滚数据 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }canal client运行实例:empty count : 1 empty count : 2 empty count : 3 empty count : 46. 触发数据库变更创建库:create database canal;创建表:create table t_canal (id int,name varchar(20),status int);插入数据:insert into t_canal values(10,'hello',1);canal client输出日志:================> binlog[mysql-bin.000001:6764] , name[canal,t_canal] , eventType : INSERT id : 10 update=true name : hello update=true status : 1 update=true三. 自问自答-为何设置了数据表的过滤条件,但貌似没有生效?答:首先看文档AdminGuide,了解canal.instance.filter.regex的书写格式。mysql 数据解析关注的表,Perl正则表达式.多个正则之间以逗号(,)分隔,转义符需要双斜杠(\)常见例子:所有表:.* or .\..canal schema下所有表: canal\..*canal下的以canal打头的表:canal\.canal.*canal schema下的一张表:canal.test1多个规则组合使用:canal\..*,mysql.test1,mysql.test2 (逗号分隔)检查binlog格式,过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)。检查下CanalConnector是否调用subscribe(filter)方法;有的话,filter需要和instance.properties的canal.instance.filter.regex一致,否则subscribe的filter会覆盖instance的配置,如果subscribe的filter是...,那么相当于你消费了所有的更新数据 【特别注意】参考文章:https://www.cnblogs.com/jayinnn/p/9606466.htmlhttps://github.com/alibaba/canal
一. canal概述canal是Alibaba旗下的一款开源项目,纯Java开发.它是基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持mysql。应用场景:1.数据同步,比如:做在线、离线数据库之间的数据同步操作;2.数据消费,比如:需要根据关注的数据库表的变化,做搜索增量;3.数据脱敏,比如:需要将线上动态数据导入到其他地方,做数据脱敏。二. canal工作原理1. mysql主备复制实现(1) master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);(2) slave将master的binary log events拷贝到它的中继日志(relay log);(3) slave重做中继日志中的事件,将改变反映它自己的数据。2. canal的工作原理(1) canal模拟mysql salve的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;(2) mysql master收到dump请求,开始推送binary log给slave(也就是canal);(3) canal解析binary log对象(原始byte流).三. canal架构设计说明:server代表一个canal运行实例,对应与一个jvm;instance对应于一个数据队列(1个server对应1..n个instance).instance下的子模块:eventParser: 数据源接入,模拟slave协议和master进行交互,协议解析;eventSink: parser和store链接器,进行数据的过滤,加工和分发工作;eventStore: 数据存储;metaManager: 增量订阅&消费信息管理器.1. EventParser设计整个parser过程大致可分为几部:1.Connection获取上一次解析成功的位置(如果第一次启动,则获取初始制定的位置或者是当前数据库的binlog位点);2.Connection建立连接,发生BINLOG_DUMP命令3.Mysql开始推送Binary Log;4.接收到的Binary Log通过Binlog parser进行协议解析,补充一些特定信息;5.传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功存储成功后,定时记录Binary Log位置.2. EventSink设计数据过滤:支持通配符的过滤模式,表名,字段内容等;数据路由/分发:解决1:n (1个parser对应多个store的模式);数据归并:解决n:1 (多个parser对应1个store);数据加工:在进入store之前进行额外的处理,比如join.3. EventStore设计目前canal实现了memory内存、本地file存储以及持久化到zookeeper以保障数据集群共享。memory内存的RingBuffer设计如下图:定义了3个cursor:Put : Sink模块进行数据存储的最后一次写入位置Get : 数据订阅获取的最后一次提取位置Ack : 数据消费成功的最后一次消费位置借鉴Disruptor的RingBuffer的实现,将RingBuffer拉直来看:4. 增量订阅、消费设计get/ack/rollback协议介绍:Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id(唯一标识)和entries(具体的数据对象),具体格式后面会进行介绍。void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作;void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.流式api设计的好处:get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能);get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化.数据格式:Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [发生的变更] schemaName tableName eventType [insert/update/delete类型] entryType [事务头BEGIN/事务尾END/数据ROWDATA] storeValue [byte数据,可展开,对应的类型为RowChange] RowChange isDdl [是否是ddl变更操作,比如create table/drop table] sql [具体的ddl sql] rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理] beforeColumns [Column类型的数组] afterColumns [Column类型的数组] Column index sqlType [jdbc type] name [column name] isKey [是否为主键] updated [是否发生过变更] isNull [值是否为null] value [具体的内容,注意为文本]四. canal的HA机制设计canal的HA机制主要是依赖zookeeper的特性,共分为canal server和canal client两部分:canal server:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序.大致步骤:canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.Canal Client的方式和canal server方式类似,也是利用zokeeper的抢占EPHEMERAL节点的方式进行控制.HA配置架构图:HA配置架构设计图canal其他链接方式:1. 单连2. 两个client+两个instance+1个mysql(其实就是两个单连)3. 一个server+两个instance+两个mysql+两个client4. instance的standby配置五. canal总结canal原理:模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;mysql master收到dump请求,开始推送binary log给slave(也就是canal);解析binary log对象(原始为byte流)存在重复消费问题:需要在消费端解决。canal需要维护EventStore,可以存取在Memory, File, zk.canal需要维护客户端的状态,同一时刻一个instance只能有一个消费端消费.支持binlog format 类型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter.有ACK机制.
Elastic-Job的分片任务在调度执行中,由于某种原因未执行完毕,下一次调度任务触发后,如果在同一个Job实例中出现两个线程处理同一个分片上的数据,这样就会造成两个线程处理到相同的数据。为了避免上述问题,Elastic-Job引入任务错过机制(misfire)与幂等机制(monitorExecution),来确保同一条数据不会被多个Job同时处理,避免同一条数据被同一个Job实例的多个线程处理。重申一次Elastci-Job的分布式是数据的分布式,一个任务在多个Job实例上运行,每个Job实例处理该Job的部分数据(数据分片)。1、Elastic-Job如何确保同一个Job实例的多个线程不会处理相同的数据。场景:任务调度周期为每5s执行一次,正常每次调度任务处理需要耗时2s,如果在某一段时间由于数据库压力变大,导致原本只需要2s就能处理完成的任务,现在需要16s才能完成。在这个数据处理的过程中,每5s又会触发一次调度(任务处理),如果不加以控制的话,在同一个实例上根据分片条件去查询数据库,查询到的数据有可能相同(部分相同),这样同一条任务数据将被多次运行。如果这个任务是处理转账业务,如果在业务方法不实现幂等,则会引发非常严重的问题,那ElasticJob是否可以避免这个问题呢?答案是肯定。elastic-Job提供了一个配置参数:monitorExecution=true,开启幂等性。幂等机制开启后的工作流程:(1)Elastic-Job在开启monitorExecution(true)【幂等机制】机制的情况下,在分片任务开始时会在注册中心zookeeper上创建${namespace}/jobname/sharding/{item}/running临时节点,在任务结束后会删除该目录。(2)在判断是否有分片正在运行时,只需判断是否存在上述节点即可。如果存在,调用setMisfire方法,将分片状态设置为mirefire,表示错失了一次任务执行。如果该分片被设置为mirefire并开启了事件跟踪,将事件跟踪保存在数据库中。(3)设置misfire的方法会为分配给该实例下的所有分片创建持久节点${namespace}/jobname/shading/{item}/misfire节点。(4)注意,只要分配给该实例的任何一分片未执行完毕,则在该实例下的所有分片都增加misfire节点,然后忽略本次任务触发执行,等待任务结束后再执行其他未忽略的任务。(5)在任务执行完成后检查是否存在${namespace}/jobname/sharding/{item}/misfire节点,如果存在,则首先清除misfie相关的文件,然后执行任务。Elastic-Job的misfire实现方案总结:在下一个调度周期到达之后,只要发现这个分片的任何一个分片正在执行,则为该实例分片的所有分片都设置为misfire,等任务执行完毕后,再统一执行下一次任务调度。技术原理其实很简单,就是通过zookeeper来实现分布式锁来完成幂等性。2、Elastic-Job如何确保数据不会被多个Job实例处理?Elastic-Job基于数据分片,不同分片根据分片参数(人为配置),从数据库中查询各自数据(任务数据分片),如果当节点宕机,数据会重新分片,如果任务未执行完成,然后执行分片,数据是否会被不同的任务同时处理呢?答案是不会,因为当节点宕机(作业执行节点)后,是否需要重新分片事件监听器会监听到Job实例代表的节点删除,设置重新分片。在任务被调度执行具体处理逻辑之前,需要重新分片,重新分片的前提就是要所有的分片任务全部执行完毕,这也依赖是否开启幂等控制(monitorExecution)。如果开启幂等机制,Elastic-Job能感知正在执行处理逻辑的分片,重新分片需要等待当前所有分片任务全部运行完毕后才会触发,故不会存在不同节点处理相同数据的问题。场景:一个任务JOB的调度频率为每10s一次,在某个时间,该job执行耗时用了33s(平时只需执行5s),按照正常调度,应该后续会触发3次调度,那该job后执行完,会连续执行3次调度吗?答案:在33s这次任务执行完成后,如果后面的任务执行在10s内执行完毕的话,只会触发一次,不会补偿3次,因为ElasticJob记录任务错失执行,只是创建了misfire节点,并不会记录错失的次数,因为也没这个必要。参考文章:https://blog.csdn.net/prestigeding/article/details/80140777
1. Elastic-Job源码使用lombok实现极简代码。这点在我当前负责的多个工程项目代码中都有使用,以后可以进一步借鉴使用。话说“极简代码”,我还是第一次见到这种叫法。2.注册中心的最终配置以哪个客户端的配置为准?Elastic-Job-Lite采用无中心化设计,若每个客户端的配置不一致,不做控制的话,最后一个启动的客户端配置将会成为注册中心的最终配置。Elastic-Job-Lite提出了overwrite概念,可通过JobConfiguration或Spring命名空间配置。overwrite=true即允许客户端配置覆盖注册中心,反之则不允许。如果注册中心无相关作业的配置,则无论overwrite是否配置,客户端配置都将写入注册中心。现身说法:我实际工作中的情况是这样的,相同的工程代码,即使部署多地,用的是同一处的配置,都是使用的分布式配置管理平台disconf来进行管理的,这种方式从源头上就解决了客户端配置不一致的问题。3. Elastic-Job有何使用限制?(1)作业启动成功后修改作业名称视为新作业,原作业废弃。(2)同一台作业服务器可以运行多个相同的作业实例,但每个作业实例必须使用不同的JobInstanceId,因为作业运行时是按照IP和JobInstanceId注册和管理的。JobInstanceId可在作业配置中设置。其实在我看来,把作业实例的id改掉了还能算是相同的作业实例吗?(3)一旦有服务器波动,或者修改分片项,将会触发重新分片;触发重新分片将会导致运行中的流式处理的作业在执行完本次作业后不再继续执行,等待分片结束后再恢复正常。4.如何保证不会在多个作业服务器运行同一个分片开启monitorExecution(监控作业运行时状态)才能实现分布式作业幂等性(即不会在多个作业服务器运行同一个分片)的功能,但monitorExecution对短时间内执行的作业(如每5秒一触发)性能影响较大,建议关闭并自行实现幂等性。每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。5. 注册中心与作业部署机无从属关系elastic-job-lite为jar包,由开发或运维人员负责启动。启动时自动向注册中心注册作业信息并进行分布式协调,因此并不需要手工在注册中心填写作业信息。 但注册中心与作业部署机无从属关系,注册中心并不能控制将单点的作业分发至其他作业机,也无法将远程服务器未启动的作业启动6. 作业暂停和作业失效的区别作业暂停和失效都会停止当前节点作业的运行。但作业暂停和恢复不会触发重分片,而作业失效和生效将触发重分片。7. 作业与注册中心无法通信会如何?为了保证作业的在分布式场景下的一致性,一旦作业与注册中心无法通信,运行中的作业会立刻停止执行,但作业的进程不会退出,这样做的目的是为了防止作业重分片时,将与注册中心失去联系的节点执行的分片分配给另外节点,导致同一分片在两个节点中同时执行。 当作业节点恢复与注册中心联系时,将重新参与分片并恢复执行新的分配到的分片。8. elastic-job的分片策略一般情况Elastic-Job是通过平均分配算法的分片策略数据的,但也可以选择哈希及轮转等策略,或者自己定义作业分片策略。
一 maxwell组件介绍Maxwell是一个守护进程,它能监听并读取MySQL的binlog,然后解析输出为json,支持将数据输出到Kafka、Kinesis或其他流媒体平台,支持表和库过滤。注意:对于增删改都有输出,但对于truncate操作,没有输出。源码地址:https://github.com/zendesk/maxwell下载地址: https://github.com/zendesk/maxwell/releases/download/v1.21.1/maxwell-1.21.1.tar.gz示意如下:mysql> insert into `test`.`maxwell` set id = 1, daemon = 'Stanislaw Lem'; maxwell: { "database": "test", "table": "maxwell", "type": "insert", "ts": 1449786310, "xid": 940752, "commit": true, "data": { "id":1, "daemon": "Stanislaw Lem" } } mysql> update test.maxwell set daemon = 'firebus! firebus!' where id = 1; maxwell: { "database": "test", "table": "maxwell", "type": "update", "ts": 1449786341, "xid": 940786, "commit": true, "data": {"id":1, "daemon": "Firebus! Firebus!"}, "old": {"daemon": "Stanislaw Lem"} }二 设备与组件版本梳理:1. linux内核版本(CentOS Linux 7):(命令:uname -a)Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux2. mysql版本:(SQL命令:select version(); 或 status)Server version: 5.6.43-log MySQL Community Server (GPL)3. maxwell版本:maxwell-1.21.14. kafka版本:kafka_2.11-2.2.05. zookeeper版本:zookeeper-3.4.5-cdh5.7.0三 设备介绍二台虚拟机,分别为salve1(192.168.175.21)和slave2(192.168.175.22),slave1上安装mysql和kafka,slave2上启动maxwell守护进程和zookeeper。四 简要过程梳理主流程示意图如上图:这次主要介绍从binlog > maxwell > kafka的过程,而kafka后面的过程,就可以有很多种了,比如:(1)binlog > maxwell > kafka > spark streaming > hdfs、kudu;(2)binlog > maxwell > kafka > flume > hdfs;(3)binlog > maxwell > kafka > es > kibana;第一种spark streaming+hdfs、kudu,是目前我所在公司中使用的场景。简要流程梳理如下:1. 在slave1上安装mysql;2. 在slave2上启动maxwell,测试是否可以正常读取binlog;3. maxwell初步测试ok;4. 在slave2上安装并启动zk;5. 在slave1上安装并启动kafka server;6. 通过kafka producer和consumer测试启动是否成功;7. 启动maxwell将解析后的json数据发送到kafka;8. 启动kafka consumer测试数据是否成功发送。五 详细过程1 在slave1(192.168.175.21)上安装mysql详细过程,可参考笔记: https://www.jianshu.com/p/09936d9c7bf2(1)在创建root账号并设置远程访问之后,接着创建maxwell账号并设置远程访问和权限:mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX'; mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%'; mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';权限:Maxwell需要权限将状态存储在schema_database选项(默认Maxwell)指定的数据库中。(2)针对maxwell配置mysql: 确保配置了server_id,并打开了基于行的复制。可参考maxwell快速入门文档: https://github.com/zendesk/maxwell/blob/master/docs/docs/quickstart.md$ vi my.cnf [mysqld] server_id=1 log-bin=master binlog_format=row注意: binlog_format是一个基于会话的属性需要关闭所有活动连接才能完全转换为基于行的复制。mysql配置文件my.cnf查找方式:$ whereis my my:/etc/my.cnf配置完成后,要重启mysql服务,方可生效2 在slave2上启动maxwell,测试是否可以正常读取binlog(1)在slave2上测试是否可以进行远程访问数据mysql -h 192.168.175.21 -P 3306 - u root -proot #root登录成功后,窗口无需关闭,后面还要接着测试用 mysql -h 192.168.175.21 -P 3306 -u maxwell -p111111(2)下载并解压maxwell#解压到指定的文件夹 tar xzvf maxwell-1.21.1.tar.gz -C /usr/loca/hadoop/app(3)命令行启动maxwell,将解析后的日志输出到控制台进行测试[root@slave2 maxwell-1.21.1]# pwd /usr/local/hadoop/app/maxwell-1.21.1 [root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout启动成功后,会展示如下日志内容:[root@slave2 maxwell-1.21.1]# bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=stdout Using kafka version: 1.0.0 15:49:04,967 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured. 15:49:08,334 INFO Maxwell - Maxwell v1.21.1 is booting (StdoutProducer), starting at Position[BinlogPosition[master.000002:984013], lastHeartbeat=1555141484690] 15:49:10,071 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550]) 15:49:12,759 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0]) 15:49:13,122 INFO MysqlSavedSchema - beginning to play deltas... 15:49:13,138 INFO MysqlSavedSchema - played 1 deltas in 12ms 15:49:13,426 INFO BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:984013 15:49:13,997 INFO BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/984013 (sid:6379, cid:59) 15:49:14,003 INFO BinlogConnectorLifecycleListener - Binlog connected.在slave2上root连接mysql的窗口中,执行insert,delete,update操作:MySQL [mysql]> insert into tb_dept (id,name,description) values(16,'xiaoman','manger'); Query OK, 1 row affected (0.02 sec) MySQL [mysql]> delete from tb_dept where id = 16; Query OK, 1 row affected (0.01 sec) MySQL [mysql]> update tb_dept set name='xiaofei' where id=14; Query OK, 1 row affected (0.02 sec) Rows matched: 1 Changed: 1 Warnings: 0在maxwell的stdout窗口中会产生如下日志:{"database":"mysql","table":"tb_dept","type":"insert","ts":1555142065,"xid":6349,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}} {"database":"mysql","table":"tb_dept","type":"delete","ts":1555142096,"xid":6361,"commit":true,"data":{"Id":16,"Name":"xiaoman","description":"manger"}} {"database":"mysql","table":"tb_dept","type":"update","ts":1555142157,"xid":6383,"commit":true,"data":{"Id":14,"Name":"xiaofei","description":"sales"},"old":{"Name":"xiaoming1"}}3 在slave2上安装并启动zk具体安装方式,可参考笔记:https://www.jianshu.com/p/10d5a20ab9b7注意启动完成后,要检查zk是否安装成功#查看状态 zkServer.sh status4 在slave1上安装并启动kafka server:(1)具体启动安装方式,可参考笔记:https://www.jianshu.com/p/3d017bdbfb3c修改kafka配置文件 $KAFKA_HOME/config/server.properties:broker.id=1 listeners=PLAINTEXT://slave1:9092 log.dirs=/usr/local/app/tmp/kafka-logs zookeeper.connect=slave2:2181(2)通过kafka producer和consumer测试启动是否成功5 在slave1的kafka上创建名为maxwell的topic(1)创建topickafka-topics.sh --create --zookeeper slave2:2181 --replication-factor 1 --partitions 1 --topic maxwell(2)检查topic是否创建成功#查看topic列表: kafka-topics.sh --list --zookeeper slave2:2181 #查看topic具体描述: kafka-topics.sh --describe --zookeeper slave2:2181 --topic maxwell6 在slave1上启动消费topic名称为maxwell的kafka consumer:kafka-console-consumer.sh --zookeeper slave2:2181 --topic maxwell --from-beginning7 在slave2上启动maxwell进程,将解析后的json数据输出到kafka:bin/maxwell --user='maxwell' --password='111111' --host='192.168.175.21' --producer=kafka --kafka.bootstrap.servers=192.168.175.21:9092 --kafka_topic=maxwell注意:启动之前,需要将输出到stdout上的maxwell进程停掉,否则会报错。同样,启动成功后,会输出Binlog连接成功的日志信息。如下:16:22:19,127 INFO AppInfoParser - Kafka version : 1.0.0 16:22:19,129 INFO AppInfoParser - Kafka commitId : aaa7af6d4a11b29d 16:22:19,391 INFO Maxwell - Maxwell v1.21.1 is booting (MaxwellKafkaProducer), starting at Position[BinlogPosition[master.000002:1084309], lastHeartbeat=1555143612778] 16:22:20,916 INFO MysqlSavedSchema - Restoring schema id 2 (last modified at Position[BinlogPosition[master.000001:40754], lastHeartbeat=1555112822550]) 16:22:22,861 INFO MysqlSavedSchema - Restoring schema id 1 (last modified at Position[BinlogPosition[master.000001:3816], lastHeartbeat=0]) 16:22:23,178 INFO MysqlSavedSchema - beginning to play deltas... 16:22:23,192 INFO MysqlSavedSchema - played 1 deltas in 7ms 16:22:23,482 INFO BinlogConnectorReplicator - Setting initial binlog pos to: master.000002:1084309 16:22:23,887 INFO BinaryLogClient - Connected to 192.168.175.21:3306 at master.000002/1084309 (sid:6379, cid:64) 16:22:23,894 INFO BinlogConnectorLifecycleListener - Binlog connected.8 验证(1)在slave1中的root账号登录的mysql窗口中,执行一条更新操作:MySQL [mysql]> update tb_dept set name='xiaofei123' where id=14; Query OK, 1 row affected (0.02 sec) Rows matched: 1 Changed: 1 Warnings: 0(2)随机在消费topic=maxwell的kafka consumer中输出日志如下:{"database":"mysql","table":"tb_dept","type":"update","ts":1555143780,"xid":6888,"commit":true,"data":{"Id":14,"Name":"xiaofei123","description":"sales"},"old":{"Name":"xiaofei"}}到此,流程梳理完毕!六 遗留问题:1 mysql数据库设置远程访问之后,在本地一直访问不了,尝试修改密码同样访问不了。猜测是由于设置远程访问时的%百分号影响的。故而这里访问数据库用的都是远程访问的,其实正常生产环境,我们也都是远程进行访问的。ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)2 kafka:在将maxwell进程和kafka server运行在同一台虚拟机上时,启动kafka consumer时,经常报ConsumerRebalanceFailedException的异常,如下:没有找到具体原因,最终通过kafka server和maxwell不运行在一起将问题解决。其实生产环境,一般kafka都是有专用的集群,也都不会和maxwell运行在一起。[2019-04-13 13:12:03,599] ERROR Unknown error when running consumer: (kafka.tools.ConsoleConsumer$) kafka.common.ConsumerRebalanceFailedException: console-consumer-72779_slave2-1555132312263-9d33f2d8 can't rebalance after 4 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:660) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) at kafka.consumer.ZookeeperConsumerConnector$WildcardStreamsHandler.<init>(ZookeeperConsumerConnector.scala:1001) at kafka.consumer.ZookeeperConsumerConnector.createMessageStreamsByFilter(ZookeeperConsumerConnector.scala:163) at kafka.consumer.OldConsumer.<init>(BaseConsumer.scala:75) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:63) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:47) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部仅依赖Zookeeper;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。笔者项目中采用的Lite版本,所以以下是梳理Elastic-Job-Lite的内容:一. 整体架构图整体架构图二. 功能列表分布式调度协调Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。弹性扩容缩容运行中的作业服务器崩溃,或新增加n台作业服务器,作业框架将在下次作业执行前重新分片,不影响当前作业执行。失效转移Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。错过执行作业重触发自动记录错过执行的作业,并在上次作业完成后自动触发。作业分片一致性,保证同一分片在分布式环境中仅一个执行实例自诊断并修复分布式不稳定造成的问题支持并行调度支持作业生命周期操作丰富的作业类型Elastic-Job提供Simple、Dataflow和Script 3种作业类型。Spring整合以及命名空间提供支持spring容器,自定义命名空间,支持占位符。运维平台提供运维界面,可以管理作业和注册中心。三. 作业类型:Elastic-Job提供Simple、Dataflow和Script 3种作业类型。 方法参数shardingContext包含作业配置、片和运行时信息。可通过getShardingTotalCount(), getShardingItem()等方法分别获取分片总数,运行在本作业服务器的分片序列号等。a. Simple类型作业意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。b. Dataflow类型作业Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。c. Script类型作业Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。四. 原理:第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。主节点选举,服务器上下线,分片总数变更均更新重新分片标记。定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。服务启动流程:启动流程图作业执行流程:作业执行流程图五. Elastic-Job有何使用限制•作业启动成功后修改作业名称视为新作业,原作业废弃。•同一台作业服务器可以运行多个相同的作业实例,但每个作业实例必须使用不同的JobInstanceId,因为作业运行时是按照IP和JobInstanceId注册和管理的。JobInstanceId可在作业配置中设置。•一旦有服务器波动,或者修改分片项,将会触发重新分片;触发重新分片将会导致运行中的流式处理的作业在执行完本次作业后不再继续执行,等待分片结束后再恢复正常。•开启monitorExecution才能实现分布式作业幂等性(即不会在多个作业服务器运行同一个分片)的功能,但monitorExecution对短时间内执行的作业(如每5秒一触发)性能影响较大,建议关闭并自行实现幂等性。六. Simple类型作业应用示例1.引入Maven依赖:<!-- 引入elastic-job-lite核心模块 --> <dependency> <groupId>io.elasticjob</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>${latest.release.version}</version> </dependency> <!-- 使用springframework自定义命名空间时引入 --> <dependency> <groupId>io.elasticjob</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>${latest.release.version}</version> </dependency>2.作业开发:public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }3.作业配置:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置作业注册中心 --> <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置作业--> <job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> </beans>4.作业启动:Spring启动方式: 将配置Spring命名空间的xml通过Spring启动,作业将自动加载。5.配置属性详细说明:可查看官方配置文档:http://elasticjob.io/docs/elastic-job-lite/02-guide/config-manual/更多详细内容可以查看elasticjob开发指导文档:http://elasticjob.io/docs/elastic-job-lite/02-guide
在上一篇关于Kettle发送邮件并在正文中以表格形式展示内容的文章中,已经提到Kettle邮箱正文中支持HTML格式,要在邮箱正文中展示表格,那就行拼出HTML,然后扔到邮箱正文中.方法就是这么简单直接,上一篇文章中拼接HTML是在数据库SQL查询时完成的,而这次咱尝试使用Kettle支持的JavaScript脚本来完成.下面就结合一个小的需求案例,来说明一下.一. 需求说明结算系统每天实时接收积分系统推送过来的交易数据,现在需要按照交易类型分组统计检查两个系统之间的数据差异.邮件正文展示效果如下:邮件正文展示效果二. 实现过程1. 作业概览作业概览2. 转换-邮箱正文准备转换-邮箱正文准备2.1 统计数据准备统计数据准备(1) 表输入-积分系统:SELECT 1001 AS transaction_type_p,10001 AS value_p FROM DUAL UNION ALL SELECT 1002,20001 FROM DUAL UNION ALL SELECT 1003,20001 FROM DUAL UNION ALL SELECT 1007,70001 FROM DUAL UNION ALL SELECT 1008,80001 FROM DUAL(2) 表输入-清算系统:SELECT 1001 AS transaction_type_s,10001 AS value_s FROM DUAL UNION ALL SELECT 1002,20001 FROM DUAL UNION ALL SELECT 1003,30001 FROM DUAL UNION ALL SELECT 1004,40001 FROM DUAL UNION ALL SELECT 1005,50001 FROM DUAL(3) 记录集连接:注意: 这里使用的全连接(FULL OUTER),另外使用"记录集连接"前要对前面的数据根据"连接列"进行排序.记录集连接2.2 数据拼接html(1) JS代码_行拼接此步骤是数据准备过程中最核心的一步,将多列转成一列.JS代码JS脚本如下://Script here var transaction_type; if(transaction_type_p == null){ transaction_type = String(transaction_type_s); }else{ transaction_type = String(transaction_type_p); } var value_points; if(value_p == null){ value_points = 0; }else{ value_points = value_p; } var value_settle; if(value_s == null){ value_settle = 0; }else{ value_settle = value_s; } var info; var value_cha = value_points - value_settle; //多行拼接成一列 if(value_cha == 0){ info = "<tr><td>"+transaction_type+"</td><td>"+value_points+"</td<td>"+value_settle+"</td><td>"+value_cha+"</td></tr>"; }else{ info = "<tr style='color:red'><td>"+transaction_type+"</td<td>"+value_points+"</td><td>"+value_settle+"</td><td>"+value_cha+"</td</tr>"; } //新增一列,用于分组聚合 var seq = '1';(2) 分组_拼接行此步骤是将一列中的多行数据使用空字符串拼接成一个大的字符串info_new.分组(3) JS代码_拼接头部在info_new基础上拼接html表格的头标签://Script here var content = "<table border='1'><tr><th>交易类型</th><th>积分系统数量</th<th>结算系统数量</th><th>差异数量</th></tr>"+info_new+"</table>";2.3 传入变量供作业中使用3. 邮件通知邮件通知获取QQ邮箱授权码的方式,简单如下图:获取QQ邮箱授权码邮件通知至此,整个通过JS方式拼接Html方式实现邮件正文展示结果集的过程梳理完毕!下面是之前关于使用Kettle发送邮件的总结:(1) Kettle发送邮箱并在正文中以表格形式展示内容[基础版]该文是通过SQL拼接HTML串来实现邮箱正文展示表格.而在文章Kettle性能调优汇总中我曾提过,能用数据库层面实现就尽量用数据库实现,因为JS脚本的方式会很影响性能.(2) Kettle通过邮箱附件的方式发送数据库报表统计该文介绍了如何通过邮箱附件的方式发送结果集,是非常常用的功能.希望以上分享能够帮助到你,如果你有更好的用法和心得,欢迎留言进行更多的互动学习.
之前曾多次使用kettle进行作业监控,当Kettle需要通过邮件发送统计结果时,之前我的做法都是通过邮箱附件的方式实现.而对于结果集很小的场景,如果依然使用附件方式,整个邮件的核心内容会显得非常不直观.而如何在邮箱正文中通过表格方式展现统计结果呢?邮件正文中支持HTML格式,那解决办法就是自行拼出HTML,然后填到邮箱正文中.自行拼接HTML串,是此任务中最麻烦的点,也是最关键的点.另外,要注意使用组件"设置变量"将统计结果写入变量中.下面展示一下,将Oracle查询得到的结果集拼成HTML的实例:(Sql拼接HTML是一种方式,也可以尝试使用其他方式.)概览1. 准备邮箱正文1.1 表输入:WITH stu as ( SELECT 1001 AS ID,'小明' AS name,12 AS age,'北京' AS address FROM dual UNION ALL SELECT 1002 ,'小东',10,'南京' FROM dual UNION ALL SELECT 1003 ,'小飞',14,'天津' FROM dual UNION ALL SELECT 1004 ,'小连',13,'深圳' FROM dual UNION ALL SELECT 1005 ,'小楠',12,'大连' FROM dual UNION ALL SELECT 1006 ,'小红',9,'合肥' FROM dual ) SELECT replace(wm_concat(v_stu.info),',','') AS v_info from ( SELECT '<table border ="1">' AS info FROM dual union all SELECT '<tr><th>学号</th><th>姓名</th><th>年龄</th><th>城市</th></tr>' AS info FROM dual UNION all SELECT '<tr><td align="center">'||stu.id||'</td><td align="center">'||stu.name||'</td><td align="center">'||stu.age||'</td><td align="center">'||stu.address||'</td></tr>' from stu UNION ALL SELECT '</table>' AS info FROM dual ) v_stu1.2 设置变量设置变量2. 邮件通知image.png3. 邮件效果邮箱正文效果至此,通过SQL将查询结果拼接HTML串的方式实现邮箱正文展示结果集的过程梳理完毕,希望能够对你有用!
1 问题测试代码public static void main(String[] args) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); List<Calendar> list = new ArrayList<>(); for (int i = 0; i < 20; i++) { Calendar startDay = new GregorianCalendar(); Calendar checkDay = new GregorianCalendar(); checkDay.setTime(startDay.getTime());//不污染入参 checkDay.add(checkDay.DATE,i); list.add(checkDay); checkDay = null; startDay = null; } list.stream().forEach(day -> System.out.println(sdf.format(day.getTime()))); System.out.println("-----------------------"); list.parallelStream().forEach(day -> System.out.println(sdf.format(day.getTime()))); System.out.println("-----------------------"); }说明:(1) 使用stream().forEach(),就是单纯的串行遍历循环,和使用for循环得到的效果一样,只是这种方式可以使代码更精简;(2) 使用parallelStream().forEach(),是并行遍历循环,相当于是使用了多线程处理.这样可以在一定程度上提高执行效率.而程序在运行过程中具体会使用多少个线程进行处理,系统会根据运行服务器的资源占用情况自动进行分配.2 运行结果image.png3 原因排查网上搜索查询搜索到相关的文章如下:<<JAVA使用并行流(ParallelStream)时要注意的一些问题>>,<<java8的ParallelStream踩坑记录>>.这些文章中描述的问题归根结底都是同一类问题,那就是在使用parallelStream().forEach()时,都操作了线程不安全的对象(ArrayList).查看ArrayList的源码如下:transient Object[] elementData; // non-private to simplify nested class access /** * Appends the specified element to the end of this list. * @param e element to be appended to this list * @return <tt>true</tt> (as specified by {@link Collection#add}) */ public boolean add(E e) { ensureCapacityInternal(size + 1); // Increments modCount!! elementData[size++] = e; return true; }通过查看源码可以看到,ArrayList本身底层是通过一个名为elementData的数组实现的,而add()方法上并没有加同步锁,可见在多线程并发情况下存在线程不安全的问题.这些文章最后的解决方案都是将操作ArrayList转化为一个同步的集合:Collections.synchronizedList(new ArrayList<>())这样并行流操作同一个ArrayList的对象中add()方法时,就都是同步串行操作的了,就不存在线程安全的问题了,也即是解决了文章中反馈的问题.那么出问题的原因就找到了,那就是在使用parallelStream().forEach()时,都操作了线程不安全的对象.4 结合自己的问题上面找到的出问题的原因,就是在parallelStream().forEach()中使用了线程不安全的对象.SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); ... list.parallelStream().forEach(**day** -> System.out.println(sdf.format(day.getTime())));如上面代码所示,从list中遍历的day和day.getTime()肯定不会有线程安全问题.那么就只剩下SimpleDateFormat实例对象了.下面咱查看SimpleDateFormat对象的format()源码深挖得到如下信息:public abstract class DateFormat extends Format { /** * The {@link Calendar} instance used for calculating the date-time fields * and the instant of time. This field is used for both formatting and * parsing. * * <p>Subclasses should initialize this field to a {@link Calendar} * appropriate for the {@link Locale} associated with this * <code>DateFormat</code>. * @serial */ protected Calendar calendar; ... // Called from Format after creating a FieldDelegate private StringBuffer format(Date date, StringBuffer toAppendTo, FieldDelegate delegate) { // Convert input date to time field list calendar.setTime(date); boolean useDateFormatSymbols = useDateFormatSymbols(); for (int i = 0; i < compiledPattern.length; ) { int tag = compiledPattern[i] >>> 8; int count = compiledPattern[i++] & 0xff; if (count == 255) { count = compiledPattern[i++] << 16; count |= compiledPattern[i++]; } switch (tag) { case TAG_QUOTE_ASCII_CHAR: toAppendTo.append((char)count); break; case TAG_QUOTE_CHARS: toAppendTo.append(compiledPattern, i, count); i += count; break; default: subFormat(tag, count, delegate, toAppendTo, useDateFormatSymbols); break; } } return toAppendTo; }format()方法中操作了一个成员变量calendar,且该方法上未加同步锁,说明该方法在多线程并发访问时,存在线程安全问题.这就是上面测试代码中出现重复数据的根本原因.进一步查询得知,Java8以前的老版本中的日期和时间类全部都是线程不安全的,而在Java8新推出的日期类LocalDate和LocalDateTime非常友好的解决了上述问题.5 针对测试代码中问题的根本解决之道弃用Java8之前旧版本中的日期和时间类,改用新版本中的时间类.新修改后的代码如下:public static void main1(String[] args) { DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd"); LocalDate date = LocalDate.now(); List<LocalDate> list = new ArrayList<>(); for (int i = 0; i < 20; i++) { LocalDate date1 = date.plusDays(i); list.add(date1); } list.stream().forEach(day -> System.out.println(day.format(fmt))); System.out.println("-----------------------"); list.parallelStream().forEach(day -> System.out.println(day.format(fmt))); } public static void main2(String[] args) { DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd"); LocalDateTime date = LocalDateTime.now(); List<LocalDateTime> list = new ArrayList<>(); for (int i = 0; i < 20; i++) { LocalDateTime date1 = date.plusDays(i); list.add(date1); } list.stream().forEach(day -> System.out.println(day.format(fmt))); System.out.println("-----------------------"); list.parallelStream().forEach(day -> System.out.println(day.format(fmt))); }看一下LocalDate和LocalDateTime的源码:通过查看源码,可以看到LocalDate和LocalDateTime类都是不可变和线程安全的.这样的下面的代码中的day每一次都是不同的对象list.parallelStream().forEach(day -> System.out.println(day.format(fmt)));再来对比最初问题代码:并行操作时,在使用同一个sdf实例.SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); ... list.parallelStream().forEach(day -> System.out.println(sdf.format(day.getTime())));LocalDate类源码:* @implSpec * This class is immutable and thread-safe. * @since 1.8 */ public **final** class LocalDate implements Temporal, TemporalAdjuster, ChronoLocalDate, Serializable { ...LocalDateTime类源码:* @implSpec * This class is immutable and thread-safe. * * @since 1.8 */ public final class LocalDateTime implements Temporal, TemporalAdjuster, ChronoLocalDateTime<LocalDate>, Serializable {至此,测试代码中出问题的根本原因找到,根本解决之道找到.OK!
1. Hive的本质Hive是基于Hadoop的一个数据仓库工具,它的本质是将HQL语句转化成MapReduce程序.在它的底层,HDFS负责存储数据,YARN负责进行资源管理,MapReduce负责数据处理.2.Hive架构image.png架构组成:(1) 用户接口(Client):ClientCLI(hive shell)、JDBC/ODBC(java访问hive),WEBUI(浏览器访问hive)(2) 元数据(Metastore):Metastore元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的derby数据库中,推荐使用采用关系型数据库MySQL存储Metastore;(3) 驱动器(Driver)包含:解析器、编译器、优化器、执行器;解析器:将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误(比如select中被判定为聚合的字段在group by中是否有出现);编译器:将抽象语法树AST编译生成逻辑执行计划;优化器:对逻辑执行计划进行优化;执行器:把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/TEZ/Spark.(4) Hive使用HDFS存储数据.Hive本身不存在数据,Hive虽有表的定义但表是纯的逻辑表,数据是存在HDFS上的.HIVE中的内容不支持改写和删除,适合读多写少的场景.3.Hive执行流程如上图所示,我们通过Hive提供的一系列交互接口(Client),向Hive提交SQL指令(HSQL).如果我们提交的是创建表的DDL语句(数据定义语言),Hive会通过使用自己的执行引擎(Driver)将数据表的信息记录在Metastore元数据组件中,正如上面提到的,元数据组件通常用一个关系型数据库实现,其记录着表名,字段名,字段类型以及关联的HDFS文件路径等元信息.如果我们提交的是DQL语句(数据查询分析语句),Hive的执行引擎(Drive)会结合元数据信息对该语句进行转换,语法分析,语法优化等操作,最后生成一个MapReduce执行计划.说具体一点,就是Hive执行引擎(Drive)会将该语句提交给自己的解析器(SQL Parser),解析器接收到语句之后,会将SQL查询字符串转换成抽象语法树,并对抽象语法树进行语法分析,比如检测表是否存在,字段是否存在,SQL语义是否有误等,之后将经过语法分析后的抽象语法树提交给编译器.编译器再将抽象语法树编程成逻辑执行计划(Physical Plan),然后再由优化器(Query Optimizer)对逻辑执行计划进行优化,最后由执行器(Execution)将逻辑执行计划转换成可以运行的物理计划,也即是MapReduce执行计划,然后再根据执行计划生成一个MapReduce的作业,提交到Yarn上执行,最后,将执行返回的结果输出到用户交互接口.Hive内部预置了很多函数,Hive的执行计划就是根据用户提交的HQL语句生成这些函数的DAG(有向无环图),然后封装进MapReduce的map和reduce函数中.
在PL/SQL中是没有数组(Array)概念的,但是如果程序员想用Array的话,可以用TYPE和 Table of Record变通一下,来代替多维数组.1 单维数组--单维数组 DECLARE TYPE emp_ssn_array IS TABLE OF NUMBER INDEX BY BINARY_INTEGER; best_employees emp_ssn_array; worst_employees emp_ssn_array; BEGIN best_employees(1) := '123456'; best_employees(2) := '888888'; worst_employees(1) := '222222'; worst_employees(2) := '666666'; FOR i IN 1 .. best_employees.count LOOP DBMS_OUTPUT.PUT_LINE('i=' || i || ', best_employees= ' || best_employees(i) || ', worst_employees= ' || worst_employees(i)); END LOOP; END;2 多维数组--多维数组 DECLARE TYPE emp_type IS RECORD( emp_id employee_table.emp_id%TYPE, emp_name employee_table.emp_name%TYPE, emp_gender employee_table.emp_gender%TYPE); TYPE emp_type_array IS TABLE OF emp_type INDEX BY BINARY_INTEGER; emp_rec_array emp_type_array; emp_rec emp_type; BEGIN emp_rec.emp_id := 300000000; emp_rec.emp_name := 'Barbara'; emp_rec.emp_gender := 'Female'; emp_rec_array(1) := emp_rec; emp_rec.emp_id := 300000008; emp_rec.emp_name := 'Rick'; emp_rec.emp_gender := 'Male'; emp_rec_array(2) := emp_rec; FOR i IN 1 .. emp_rec_array.count LOOP DBMS_OUTPUT.PUT_LINE('i=' || i || ', emp_id =' || emp_rec_array(i) .emp_id || ', emp_name =' || emp_rec_array(i) .emp_name || ', emp_gender = ' || emp_rec_array(i) .emp_gender); END LOOP; END;3 实例DECLARE -- 声明数组类型 TYPE ts_varray IS VARRAY(20) OF NUMBER(10); --20是最大的下标 -- 声明数组变量 vs_varray ts_varray := ts_varray(); BEGIN FOR i IN 1 .. 20 LOOP -- 自增数组大小 vs_varray.extend; --注释该行的话,会报下标越界的错误 -- 数组赋值 vs_varray(i) := trunc(dbms_random.value(100, 200)); END LOOP; -- 循环数组 FOR i IN 1 .. vs_varray.count LOOP dbms_output.put_line('i = ' || i || ',value = ' || vs_varray(i)); END LOOP; END;
一、分析函数是什么?分析函数是Oracle专门用于解决复杂报表统计需求的功能强大的函数,它可以在数据中进行分组然后基于组计算某种统计值,并且每一组的每一行都可以返回一个统计值。说白了,分析函数就是 over([query_partition_clase] order_by_clause)。比如说,我采用sum求和,rank排序等等,根据什么来呢?over提供一个窗口,使用partition by进行分组,在组内使用order by进行排序。over不能单独使用,要和分析函数:rank(),dense_rank(),row_number()等一起使用二、Oracle分析函数与聚合函数的区别:分析函数用于计算基于组的某种聚合值,它和聚合函数的不同之处是对于每个组返回多行,而聚合函数对于每个组只返回一行。三、分析函数:用于合计的函数:sum()函数;rollup()函数;cube()函数;grouping()函数;max() over;min() over;avg() over用于排列的函数:rank() over 函数;dense_rank() over 函数;row_number() over 函数;其他:lag() over;lead() over1. sum()函数:许多分析函数同时也是聚合函数,比如sum()函数,下面这样使用就是聚合函数。--按照月份,统计每个地区的总收入 SELECT earnmonth 月份,area 地区,SUM(personincome) 总收入 FROM earnings GROUP BY earnmonth,area;而这样使用就是分析函数:SELECT DISTINCT earnmonth 月份,area 地区, sum(personincome) OVER (PARTITION BY earnmonth,area) 总收入 FROM earnings;它们得出的结果是相同的,都是:image.png请注意,这里我们用到了distinct 关键字,如果不用distinct,第2个查询将返回20行数据,即earnings表的每行记录都将返回一行总收入,因为不用distinct的含义是:针对每个打工者计算他/她所在的月份和地区的总收入。SELECT earnmonth 月份,area 地区, sum(personincome) OVER (PARTITION BY earnmonth,area) 总收入 FROM earnings;image.png在这个例子中,聚合函数是更好的选择,但在另外一些情形下,我们更应该使用分析函数。下面通过几个实例来介绍排序分析函数的用途。问题:统计每个月份,不同地区工资最高的前3名。2. rank()函数语法:rank() over([query_partition_clause]order_by_clause)利用我们传统的聚合函数max可以方便地取出工资最高的一个员工,但是取出多个就无能为力了,同样,如果不分组我们可以通过排序取出工资最高的前3名,但无法实现对多个月份和地区的分组。而采用rank()分析函数,可以方便地实现我们的要求。完整的语句如下:SELECT t.earnmonth 月份,t.area 地区,t.sname 打工者姓名,t.personincome 收入,t.run 排名 FROM ( SELECT earnmonth,area,sname,personincome, rank() OVER(PARTITION BY earnmonth,area ORDER BY personincome desc) run FROM earnings ) t WHERE t.run<=3;结果为:image.png我们在开窗函数over()中使用earnmonth(月份)和area(地区)作为分组标志,并按照personincome(收入)倒序排列。注意:RANK()函数有3组,分别是rank(), dense_rank(), row_number(),它们的区别是:RANK()如果出现两个相同的数据,那么后面的数据就会直接跳过这个排名,比如:当第2名和第3名的利润相同时,rank的结果是1,2,2,4;而dense_rank()则不会跳过这个排名,结果是1,2,2,3;而row_number()哪怕是两个数据完全相同,排名也会不一样,结果是1,2,3,4.3. dense_rank()语法: dense_rank() over([query_partition_clause] order_by_clause)完整的语句如下:SELECT t.earnmonth 月份,t.area 地区,t.sname 打工者姓名,t.personincome 收入,t.run 排名 FROM ( SELECT earnmonth,area,sname,personincome, dense_rank() OVER(PARTITION BY earnmonth,area ORDER BY personincome desc) run FROM earnings ) t WHERE t.run<=3;结果为:image.png4. row_number()语法:row_number() over([query_partition_clause]order_by_clause)完整的语句如下:SELECT t.earnmonth 月份,t.area 地区,t.sname 打工者姓名,t.personincome 收入,t.run 排名 FROM ( SELECT earnmonth,area,sname,personincome, row_number() OVER(PARTITION BY earnmonth,area ORDER BY personincome desc) run FROM earnings ) t WHERE t.run<=3;结果为:image.png5. rollup()函数:按照月份,地区统计收入--rollup函数:(分组统计之后,再按照月份做一个汇总) --按照月份,统计每个地区的总收入 SELECT earnmonth,area,SUM(personincome) FROM earnings GROUP BY ROLLUP(earnmonth,area);结果为:image.png6. cube()函数:按照月份,地区进行收入总汇总--cube函数:(分组统计之后,按照月份做一个汇总,再按照地区做一个汇总,最后再来一个收入的总汇总) SELECT earnmonth,area,sum(personincome) FROM earnings GROUP BY cube(earnmonth,area) ORDER by earnmonth,area NULLS last;结果为:image.png7. grouping()函数:在以上例子中,是用rollup()和cube()函数都会对结果集产生null,这时候可用grouping函数来确认该记录是由哪个字段得出来的.grouping函数用法,带一个参数,参数为字段名,如果当前行是由rollup或者cube汇总得来的,结果就返回1,反之返回0.完整语句如下:SELECT decode(grouping(earnmonth),1,'所有月份',earnmonth) 月份, decode(grouping(area),1,'所有地区',area) 地区,SUM(personincome) FROM earnings GROUP BY cube(earnmonth,area) ORDER by earnmonth,area NULLS last;结果为:image.png8. max(),min(),avg()和sum()函数综合运用安装月份和地区统计打工收入最高值,最低值,平均值和总额.SELECT DISTINCT earnmonth 月份,area 地区, MAX(personincome) over(PARTITION BY earnmonth,area) 最高值, min(personincome) OVER(PARTITION BY earnmonth,area) 最低值, AVG(personincome) over(PARTITION BY earnmonth,area) 平均值, sum(personincome) over(PARTITION BY earnmonth,area) 总额 FROM earnings;以上语句统计结果和如下语句使用group by的查询结果一样:SELECT earnmonth 月份,area 地区, MAX(personincome) 最高值, min(personincome) 最低值, AVG(personincome) 平均值, sum(personincome) 总额 FROM earnings GROUP BY earnmonth,area;9. lag( )和lead( )函数说明:Lag和Lead函数可以在一次查询中取出某个字段的前N行和后N行的数据(可以是其他字段的数据,比如根据字段甲查询上一行或下两行的字段乙),原来没有分析函数的时候采用子查询方法,但是比较麻烦:语法如下:lag(value_expression [,offset] [,default]) over ([query_partition_clase] order_by_clause); lead(value_expression [,offset] [,default]) over ([query_partition_clase] order_by_clause);其中:value_expression:可以是一个字段或一个内建函数。 offset是正整数,默认为1,指往前或往后几点记录.因组内第一个条记录没有之前的行,最后一行没有之后的行, default就是用于处理这样的信息,默认为空。统计每个打工者上个月和下个月有没有赚钱(personincome大于0即为赚钱):select earnmonth 本月,sname 打工者, lag(decode(nvl(personincome,0),0,'没赚','赚了'),1,0) over(partition by sname order by earnmonth) 上月, lead(decode(nvl(personincome,0),0,'没赚','赚了'),1,0) over(partition by sname order by earnmonth) 下月 from earnings;结果为:image.png实验数据:1.建表create table earnings -- 打工赚钱表 ( earnmonth varchar2(6), -- 打工月份 area varchar2(20), -- 打工地区 sno varchar2(10), -- 打工者编号 sname varchar2(20), -- 打工者姓名 times int, -- 本月打工次数 singleincome number(10,2), -- 每次赚多少钱 personincome number(10,2) -- 当月总收入 ) ;2.插入实验数据insert into earnings values('200912','北平','511601','大魁',11,30,11*30); insert into earnings values('200912','北平','511602','大凯',8,25,8*25); insert into earnings values('200912','北平','511603','小东',30,6.25,30*6.25); insert into earnings values('200912','北平','511604','大亮',16,8.25,16*8.25); insert into earnings values('200912','北平','511605','贱敬',30,11,30*11); insert into earnings values('200912','金陵','511301','小玉',15,12.25,15*12.25); insert into earnings values('200912','金陵','511302','小凡',27,16.67,27*16.67); insert into earnings values('200912','金陵','511303','小妮',7,33.33,7*33.33); insert into earnings values('200912','金陵','511304','小俐',0,18,0); insert into earnings values('200912','金陵','511305','雪儿',11,9.88,11*9.88); insert into earnings values('201001','北平','511601','大魁',0,30,0); insert into earnings values('201001','北平','511602','大凯',14,25,14*25); insert into earnings values('201001','北平','511603','小东',19,6.25,19*6.25); insert into earnings values('201001','北平','511604','大亮',7,8.25,7*8.25); insert into earnings values('201001','北平','511605','贱敬',21,11,21*11); insert into earnings values('201001','金陵','511301','小玉',6,12.25,6*12.25); insert into earnings values('201001','金陵','511302','小凡',17,16.67,17*16.67); insert into earnings values('201001','金陵','511303','小妮',27,33.33,27*33.33); insert into earnings values('201001','金陵','511304','小俐',16,18,16*18); insert into earnings values('201001','金陵','511305','雪儿',11,9.88,11*9.88); commit;3.查询展示全表SELECT * FROM earnings;
1 前言在使用表输出输出数据到数据库时,因数据库约束检查等原因造成数据无法正常入库.这时"表输出"步骤若是没有定义错误处理的话,transformation转换会报出异常并且停止进行运行.为了保证程序的正常运行,我们可以在transform安通转换中"表输出"步骤中定义错误处理.此种方式可以捕获异常数据并持久化保存到硬盘或数据库中,利于后期数据的分析,来提高数据质量.2 错误处理设置错误处理,处理步骤如下:image.png在可能出错的组件上单击鼠标右键,选择Error Handling.填写相应的信息字段,然后将错误信息写入数据库或者文件中.image.pngimage.png入库错误日志表展示如下:image.png3 错误处理里面的坑然而,在转换"表输出"上设置错误处理时,会提示如下信息:image.png翻译为:警告!由于驱动程序的限制,在使用的数据库上不完全支持与批处理结合的错误处理。请谨慎行事,风险自负。也就是说当进行批处理操作时,设置错误处理可能会出现异常信息漏捕获的情况.我案例中生成异常信息1692个,而实际只捕获到168个.而当"表输出"操作取消批处理时(如下设置),就可以全部捕获异常信息.然后为了保证正确数据被处理,同时不缺失异常信息而让每次提交的数据量设置为1,这样会大大降低效率.image.png解决方案: 创建一张和指定表结构完全相同的表,只是不设置任何数据库约束(主键).然后先将数据全部接收进这张表,再在数据库层面进行去重后插入另外一张指定的表中.这样既可以保留所有原数据,又可以实现数据去重.4 参考文章:(1) Kettle中定义错误处理: https://blog.csdn.net/feng19821209/article/details/9120561(2) KETTLE 执行转换时遇到错误,记录并继续运行: https://blog.csdn.net/shenlong_no1/article/details/78819453(3)https://wiki.pentaho.com/display/EAI/.09+Transformation+Steps#.09TransformationSteps-StepErrorHandling
Kettle性能优化是一个系统工程,不仅涉及工具本身的优化,更涉及ETL工具之外的诸多因素,比如,ETL要读取数据库,那么目标DMBS的性能,SQL语句,网络等相关因素都影响到执行效率。根据Kettle对数据ETL的过程性能调优,主要取决于三个因素:上游渠道,工具的大小与数量,下游渠道。一 Kettle调优调整JVM大小进行性能优化,修改Kettle定时任务中的Kitchen或Pan或Spoon脚本。Kettle是Java做的,尽量用大一点的内存参数启动Kettle;##修改脚本代码片段 set OPT=-Xmx512m -cp %CLASSPATH% -Djava.library.path=libswt\win32\ -DKETTLE_HOME="%KETTLE_HOME%" -DKETTLE_REPOSITORY="%KETTLE_REPOSITORY%" -DKETTLE_USER="%KETTLE_USER%" -DKETTLE_PASSWORD="%KETTLE_PASSWORD%" -DKETTLE_PLUGIN_PACKAGES="%KETTLE_PLUGIN_PACKAGES%" -DKETTLE_LOG_SIZE_LIMIT="%KETTLE_LOG_SIZE_LIMIT%" ##参数参考: -Xmx1024m:设置JVM最大可用内存为1024M。 -Xms512m:设置JVM促使内存为512m。此值可以设置与-Xmx相同,以避免每次垃圾回收完成后JVM重新分配内存。 -Xmn2g:设置年轻代大小为2G。整个JVM内存大小=年轻代大小 + 年老代大小 + 持久代大小。持久代一般固定大小为64m,所以增大年轻代后,将会减小年老代大小。此值对系统性能影响较大,Sun官方推荐配置为整个堆的3/8。 -Xss128k:设置每个线程的堆栈大小。JDK5.0以后每个线程堆栈大小为1M,以前每个线程堆栈大小为256K。更具应用的线程所需内存大小进行调整。在相同物理内存下,减小这个值能生成更多的线程。但是操作系统对一个进程内的线程数还是有限制的,不能无限生成,经验值在3000~5000左右。 ##样例: OPT=-Xmx1024m -Xms512m调整提交(Commit)记录数大小进行优化(尽量提高批处理的commit size)如修改“表输出”组件中的“提交记录数量”参数进行优化,Kettle默认Commit数量为:1000,可以根据数据量大小来设置Commitsize:1000~50000。clipboard.png调整记录集合里的记录数(RowSet)RowSet是两个步骤之间的缓存.性能调优的关键是如何找到性能瓶颈:一个重要的方法就是观察RowSet.如下图所示,当左边的in大于右边的out的位置时,很可能就是性能瓶颈的位置.(也可以通过单个执行最长的步骤来确定性能瓶颈.)clipboard.png通过点击转换空白处,可以调整rowset的大小.clipboard.png调整之后,执行效果如下:clipboard.png调整转换动作的并发处理数(改变开始复制的数量)注意:此种方式要用在适合并发操作的场景,比如查询类,要注意死锁问题.当调整rowset大小之后,性能效果仍不明显的话,可以尝试调整转换动作的并发处理数,比如以下转换操作在"数据库查询"处出现性能瓶颈.clipboard.png调整并发处理数:(一般设置成2-8个)clipboard.png执行情况如下图所示,速度明显提高了很多.clipboard.png5. 使用集群,尤其是对于查询类,运算类,排序等;6. 更换其他实现方式,如js使用java类或插件;7. 注意日志级别(Rowlevel日志的性能会严重下降,是Basic的1/10);8. 注意死锁问题:数据库死锁(读写同一张表)和转换本身死锁;9. 尽量使用数据库连接池;使用数据库连接池,可以在一定程度上提高速度.如何查看是否使用了数据库连接池?(这个在详细日志中可以看到,使用了连接池).10. 尽量使用缓存,缓存尽量大一些(主要是文本文件和数据流),比如排序;11. 合适的使用数据库索引,尤其对于数据库查询类.具体可以参考[索引的正确使用];12. 可以使用sql来做的一些操作尽量用sql;Group , merge , stream lookup,split field这些操作都是比较慢的,想办法避免他们.,能用sql就用sql;13. 插入大量数据的时候尽量把索引删掉;14. 尽量避免使用update , delete操作,尤其是update,如果可以把update变成先delete, 后insert;15. 能使用truncate table的时候,就不要使用deleteall row这种类似sql合理的分区,如果删除操作是基于某一个分区的,就不要使用delete row这种方式(不管是deletesql还是delete步骤),直接把分区drop掉,再重新创建;16. 尽量缩小输入的数据集的大小(增量更新也是为了这个目的);17. 尽量使用数据库原生的方式装载文本文件(Oracle的sqlloader, mysql的bulk loader步骤);18. 尽量不要用kettle的calculate计算步骤,能用数据库本身的sql就用sql ,不能用sql就尽量想办法用procedure,实在不行才是calculate步骤;19. 远程数据库用文件+FTP的方式来传数据,文件要压缩。(只要不是局域网都可以认为是远程连接)。20. 在确保结果输出正确的情况下,能使用并行处理的就不要使用串行处理.二 索引的正确使用在ETL过程中的索引需要遵循以下使用原则:1、当插入的数据为数据表中的记录数量10%以上时,首先需要删除该表的索引来提高数据的插入效率,当数据全部插入后再建立索引。2、避免在索引列上使用函数或计算,在where子句中,如果索引列是函数的一部分,优化器将不使用索引而使用全表扫描。3、避免在索引列上使用 NOT和 “!=”,索引只能告诉什么存在于表中,而不能告诉什么不存在于表中,当数据库遇到NOT和 “!=”时,就会停止使用索引转而执行全表扫描。4、索引列上用 >=替代 >高效:select * from temp where deptno>=4 低效:select * from temp where deptno>3两者的区别在于,前者DBMS将直接跳到第一个DEPT等于4的记录而后者将首先定位到DEPTNO=3的记录并且向前扫描到第一个DEPT大于3的记录。三 数据抽取的SQL优化1、Where子句中的连接顺序。2、删除全表是用TRUNCATE替代DELETE。3、尽量多使用COMMIT。4、用EXISTS替代IN。5、用NOT EXISTS替代NOT IN。6、优化GROUP BY。7、有条件的使用UNION-ALL替换UNION。8、分离表和索引。
写在前面的话:kettle很方便的一个点,就是从你有一个想法到落实到行动,可以很快很方便.这点也是使用kettle过程中有趣的点之一.该方式主要是使用了job中的设置变量控件.在设置变量中配置文件的路径:如果使用最简单的方式,那就是文件的路径在此处直接指定.而常用的方式是通过kettle的命名参数在执行脚本的时候通过变量的方式传递给job,然后在属性文件名处引用该文件路径变量即可.如下图详细步骤如下:1. properties配置文件2. 设置job接收参数3. 在job的设置变量控件中引用jdbc_configpath变量4. 在配置数据源时,引用配置文件中的变量配置数据源时,注意共享数据源,使其他作业可见:5. 在表输入控件中使用数据源6. 在执行该job的命令中加入参数--启动带有配置文件的作业任务 nohup ./kitchen.sh -file=/app/kettle/kettle/kettleJob/uat/tmp_test/kjb_db_to_txt.kjb -param:jdbc_configpath=/app/kettle/kettle/kettleJob/uat/tmp_test/conf/jdbc_config.properties -level=Basic -logfile="/app/kettle/kettle/kettle_log/uat/tmp_test/kjb_db_to_txt.kjb.log_"$(date +%Y%m%d) > /app/kettle/kettle/kettle_log/uat/tmp_test/kjb_db_to_txt.kjb.out_$(date +%Y%m%d)&7.参考文章https://blog.csdn.net/andyzhaojianhui/article/details/50344247
本文主要通过一个获取某个区间内质数的例子来说明如何使用java进行多线程并发处理任务。1. 需求:获取某个区间内质数,同时任务在处理完成后,要返回任务执行时间。分析:需求中指明要在任务处理完成后,返回执行时间,所以要监听所有任务执行的状态。高效处理:使用线程池,这里使用接口ExecutorService类;监听任务状态:使用FutureTask类;2. 新建一个Math类,并发处理核心类:package hrt.executor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadPoolExecutor; public class Math { private static final int MAX_THREAD_COUNT = 10; /** * 获取区间中的质数 * @param start * @param end * @return */ public static List<Long> getPrimeNumbers(Long start, Long end) { Long[] points = getPoints(start, end); List<Long> primeNumbers = new ArrayList<Long>(); List<FutureTask<List<Long>>> futureTaskList = new ArrayList<FutureTask<List<Long>>>(); ExecutorService excutorService = Executors.newFixedThreadPool(MAX_THREAD_COUNT); for (int i = 0; i < points.length - 1; i++) { // FutureTask可以看成是一个可以加入线程池的job,用来执行具体的任务 FutureTask<List<Long>> futureTask = new FutureTask<List<Long>>( new GetPrimeTask(points[i], points[i + 1] - 1)); futureTaskList.add(futureTask); // executorService可以看成是一个线程池,调用submit方法提交Task任务 excutorService.submit(futureTask); } for (FutureTask<List<Long>> futureTask : futureTaskList) { try { // get方法获取job计算得到的结果,该方法只有在Task完成任务后才会有返回. List<Long> partPrimeNumbers = futureTask.get(); primeNumbers.addAll(partPrimeNumbers); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } ThreadPoolExecutor tp = (ThreadPoolExecutor) excutorService; System.out.println("池中最大的线程数据" + tp.getMaximumPoolSize() + ",当前池中线程数:" + tp.getPoolSize() + ",完成任务数:" + tp.getCompletedTaskCount()); return primeNumbers; } /** * 判断是否是质数:只能被1和自身整除 * * @param num * @return */ public static boolean isPrimeNumber(Long num) { for (Long i = 2L; i <= num / 2; i++) { if (num % i == 0) { return false; } } return true; } /** * 将数据拆分为多个数据区间 * @param start * @param end * @return */ private static Long[] getPoints(Long start, Long end) { Long[] points = new Long[MAX_THREAD_COUNT]; for (int i = 0; i < MAX_THREAD_COUNT - 1; i++) { points[i] = start + (end - start) / (MAX_THREAD_COUNT - 1) * i; } points[MAX_THREAD_COUNT - 1] = end + 1; return points; } }说明:这个Math类中有两个重要的类,一个是ExecutorService,它可以认为是一个线程池;另外一个是FutureTask, 这个可以认为是可以加入线程池中的一个Job,用来执行具体的任务。在上面的代码中,我们将每一个FutureTask对象保存起来,然后加入到线程池中并submit提交任务,最后遍历每一个FutureTask对象,通过该对象的get方法来获取这个job计算得到的结果,get方法只有在job完成任务后才会返回。3. 新建GetPrimeTask类:package hrt.executor; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; public class GetPrimeTask implements Callable<List<Long>> { private Long start; private Long end; public GetPrimeTask(Long start, Long end) { this.start = start; this.end = end; } public List<Long> call() throws Exception { List<Long> primeNumberList = new ArrayList<Long>(); for (Long i = start; i <= end; i++) { //判断是否是质数,是的话插入集合中 if (Math.isPrimeNumber(i)) { primeNumberList.add(i); } } return primeNumberList; } }说明:必须要实现Callable接口。4. 测试程序入口:package hrt.executor; import java.sql.Time; import java.util.Calendar; import java.util.List; /** * 主程序入口 * */ public class App { public static void main( String[] args ) { System.out.println("start"); Long startTime = getCurTime(); List<Long> primeNumberList = Math.getPrimeNumbers(200000L, 300000L); for (Long primeNumber : primeNumberList) { System.out.println("" + primeNumber); } Long endTime = getCurTime(); System.out.println("消耗时间:" + (endTime - startTime) + " ms"); } private static Long getCurTime() { Calendar c = Calendar.getInstance(); return c.getTimeInMillis(); } }5. 追加说明:在Math类中使用的ExecutorService类其实就是一个线程池中线程数固定的线程池,这里固定的线程数有常量 MAX_THREAD_COUNT=10 进行设置。(1)创建线程池时,初始线程池中线程数为0,当提交的任务数小于线程数时,线程池中的线程数会等于任务数;当提交的任务数大于固定线程数时,多出的任务会处在等待中,当线程池中线程开始有空闲时,才会处理等待中的任务。(2)对于固定线程数的线程池,当提交的任务处理完成后,线程池中空闲的线程还会一直存在不会中断。任务执行完毕后,线程还在6. 参考文章https://blog.csdn.net/m0_37825799/article/details/79088596
需求:获取两个Date时间间隔的天数,并依次输出每一天.示例:两个时间: [2018-10-01,2018-10-05) 时间间隔: 4 输出时间: Mon Oct 01 00:00:00 CST 2018 Tue Oct 02 00:00:00 CST 2018 Wed Oct 03 00:00:00 CST 2018 Thu Oct 04 00:00:00 CST 2018代码实现具体如下:package hrt.executor.test; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; public class DateBetweenTest { public static void main(String[] args) throws ParseException { Calendar compareDay = new GregorianCalendar(); //Calendar compareDay = Calendar.getInstance(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); String startStr = "2018-10-01"; String endStr = "2018-10-05"; Date startDate = sdf.parse(startStr); Date endDate = sdf.parse(endStr); int differentDays = differentDays(startDate, endDate); System.out.println(startStr + " ~ " + endStr + " ,间隔天数为" + differentDays); for (int i = 0; i < differentDays; i++) { compareDay.setTime(startDate); compareDay.add(compareDay.DATE, i); System.out.println(compareDay.getTime()); } } /** * date2比date1多的天数:比较是基于年月日做的比较,不计算时分秒 * * @param date1 * @param date2 * @return */ public static int differentDays(Date date1, Date date2) { Calendar cal1 = Calendar.getInstance(); cal1.setTime(date1); Calendar cal2 = Calendar.getInstance(); cal2.setTime(date2); int day1 = cal1.get(Calendar.DAY_OF_YEAR); int day2 = cal2.get(Calendar.DAY_OF_YEAR); int year1 = cal1.get(Calendar.YEAR); int year2 = cal2.get(Calendar.YEAR); if (year1 != year2) // 不同年 { int timeDistance = 0; for (int i = year1; i < year2; i++) { if (i % 4 == 0 && i % 100 != 0 || i % 400 == 0) // 闰年 { timeDistance += 366; } else // 不是闰年 { timeDistance += 365; } } return timeDistance + (day2 - day1); } else // 同一年 { // System.out.println("判断day2 - day1 : " + (day2-day1)); return day2 - day1; } } }执行结果如下:代码运行结果截图参考文章:https://www.cnblogs.com/0201zcr/p/5000977.html
一. 主流程概述流程示意图实现过程梳理:1 数据输入;2 定义excle报表模板;3 使用excle报表模块生成报表文件;二. 主流程描述1. 数据输入(文本文件输入)输入文件people_age.txt内容如下:id|name|age 1|lili|16 2|xiaoming|15 3|xiaozhao|14 4|feifei|17 5|huahua|14指定输入文件指定内容指定字段2. 定义excle报表模板模板文件people_age_template.xls内容如图所示:模板文件3. 使用excle报表模块生成报表文件设置文件&工作表设置内容运行转换后,生成的报表文件,如下图所示:生成报表效果至此,整理完毕!后记,我尝试通过kettle向excel中传递变量,可是没有成功。
一. 主流程概述主流程主流程主要步骤:1 准备统计信息与报表2 发送邮件二. 流程详解1. 准备统计信息与报表准备统计信息与报表(1) 表输入-统计数量查询数据,统计报表信息条数,设置如下:表输入输入步骤名称-插入SQL代码-预览数据(注意oracle表示查询结束的分号“;”在里面报错,记得删除)由于查询结果中数字总是带着小数点,所以此处做了to_char()转换以去掉小数点.(2) 设置变量设置变量(3) 表输入-查询报表数据表输入-查询报表数据输入步骤名称 -> 获取SQL查询语句,快速插入SQL代码 -> 预览数据(4) Microsoft Excel 输出文件&工作表设置内容设置2. 发送邮件地址设置i服务器设置邮件消息设置附件设置至此,流程说明完毕.以上此流程中只是将查询数据库的结果集写入excle中,然后将excle作为邮箱附件发送出去,并没有使用excle模板.下一篇文章,我会分享如何使用excle模板定义报表格式,你学会后结合本篇文章的内容你就可以发出比较美观的附件报表了.
一. 主流程梳理主流程1 获取sftp上对应文件的存放路径;2 从sftp下载指定的文件到指定的目录;3 从下载后的指定目录,获取数据进行转换操作;二. 获取ftp路径获取ftp路径1. 生成记录生成记录此处要注意: 限制为1;2. JavaScript代码JavaScript代码可通过点击测试脚本按钮测试脚本执行是否正确.//Script here Date.prototype.Format = function (fmt) { var o = { "M+": this.getMonth() + 1, //月份 "d+": this.getDate(), //日 "h+": this.getHours(), //小时 "m+": this.getMinutes(), //分 "s+": this.getSeconds(), //秒 "q+": Math.floor((this.getMonth() + 3) / 3), //季度 "S": this.getMilliseconds() //毫秒 }; if (/(y+)/.test(fmt)) fmt = fmt.replace(RegExp.$1, (this.getFullYear() + "").substr(4 - RegExp.$1.length)); for (var k in o) if (new RegExp("(" + k + ")").test(fmt)) fmt = fmt.replace(RegExp.$1, (RegExp.$1.length == 1) ? (o[k]) : (("00" + o[k]).substr(("" + o[k]).length))); return fmt; } var url=new Date(new Date().getTime()).Format("yyyyMMdd");3.设置变量设置变量可通过点击获取字段按钮快速设置变量.三.SFTP下载通过SFTP下载文件到指定的目录,配置如下:服务器配置配置完成后,可通过点击测试连接进行测试是否配置成功.文件配置配置完成后,可通过点击测试文件夹查看文件夹是否存在.四.进行数据转换数据转换流程1.文件输入文件配置内容配置字段配置2.排序记录排序记录3.记录集连接配置记录集连接类型4.过滤记录设置过滤条件5.文本输出设置输出目录和文件设置输出内容格式指定输出字段OK!至此流程梳理完毕!
一. 数据库1. 查询数据库列表show databases ;2. 使用指定的数据库use default;3. 查看数据库的描述信息desc database extended db_hive_03 ;二. 表1. 查询表列表show tables ;2. 查询表的描述信息:desc student ; desc extended student ; desc formatted student ;3. 创建表create table student( id int, name string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'; load data local inpath '/opt/datas/student.txt'into table student ;4. 创建一张表并复制一个表的结构和数据create table if not exists default.dept_cats as select * from dept ;5. 使用另一张表的结构创建一张新表create table if not exists default.dept_like like default.dept ;6. 清空表:truncate table dept_cats ;7.删除表drop table if exists dept_like_rename ;8. 修改表名alter table dept_like rename to dept_like_rename ;9.查询表select * from student ; select id from student ;三. 功能函数:1. 显示功能函数列表show functions ;2. 查看功能函数的描述信息desc function upper ;3. 查询功能函数的扩展信息desc function extended upper ;4. 测试功能函数select id ,upper(name) uname from db_hive.student ;四. 进阶:1. 创建一个外部表,并指定导入文件的位置和字段分割符:create EXTERNAL table IF NOT EXISTS default.emp_ext2( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' location '/user/hive/warehouse/emp_ext2';2. 创建分区表:create EXTERNAL table IF NOT EXISTS default.emp_partition( empno int, ename string, job string, mgr int, hiredate string, sal double, comm double, deptno int ) partitioned by (month string,day string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ;3. 向分区表中导入数据:load data local inpath '/usr/local/app/hive_test/emp.txt' into table default.emp_partition partition (month='201805',day='31') ;4. 查看分区表列表:show partitions emp_partition ;5. 查询分区表中的数据:select * from emp_partition where month = '201509' and day = '13' ;6. 加载数据到hive:1)加载本地文件到hive表 load data local inpath '/opt/datas/emp.txt' into table default.emp ; 2)加载hdfs文件到hive中 load data inpath '/user/beifeng/hive/datas/emp.txt' overwrite into table default.emp ; 3)加载数据覆盖表中已有的数据 load data inpath '/user/beifeng/hive/datas/emp.txt' into table default.emp ; 4)创建表是通过insert加载 create table default.emp_ci like emp ; insert into table default.emp_ci select * from default.emp ; 5)创建表的时候通过location指定加载7. hive到文件:insert overwrite local directory '/opt/datas/hive_exp_emp' select * from default.emp ; insert overwrite local directory '/opt/datas/hive_exp_emp2' ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' COLLECTION ITEMS TERMINATED BY '\n' select * from default.emp ; bin/hive -e "select * from default.emp ;" > /opt/datas/exp_res.txt8. 将查询结果导出到本地文件中:insert overwrite directory '/hive_test/export_emp.txt' select * from emp; select * from emp ; select t.empno, t.ename, t.deptno from emp t ;五. 进阶查询:1. = >= <= between andselect * from emp limit 5 ; select t.empno, t.ename, t.deptno from emp t where t.sal between 800 and 1500 ;2. is null / is not null /in /not inselect t.empno, t.ename, t.deptno from emp t where comm is null ;3. max/min/count/sum/avgselect count(*) cnt from emp ; select max(sal) max_sal from emp ; select sum(sal) from emp ; select avg(sal) from emp ;4. group by /having 分组emp表 * 每个部门的平均工资 select t.deptno, avg(t.sal) avg_sal from emp t group by t.deptno ; * 每个部门中每个岗位的做高薪水 select t.deptno, t.job, max(t.sal) avg_sal from emp t group by t.deptno, job ;5. >>>having* where 是针对单条记录进行筛选 * having 是针对分组结果进行筛选求每个部门的平均薪水大于2000的部门 select deptno, avg(sal) from emp group by deptno ; select deptno, avg(sal) avg_sal from emp group by deptno having avg_sal > 2000;6. join 两个表进行连接##等值jion join ... on select e.empno, e.ename, d.deptno, d.dname from emp e join dept d on e.deptno = d.deptno ; ##左连接 left join select e.empno, e.ename, d.deptno, d.dname from emp e left join dept d on e.deptno = d.deptno ; ##右连接 right join select e.empno, e.ename, e.deptno, d.dname from emp e right join dept d on e.deptno = d.deptno ; ##全连接 full join select e.empno, e.ename, e.deptno, d.dname from emp e full join dept d on e.deptno = d.deptno ;六. 客户端配置与启停1. 关闭CLI客户端命令:exit #退出hive命令,使用exit,不要直接用ctrl+c,否则进程还在,只是窗口关闭了而已.2. 在启动hive时设置配置属性信息$ bin/hive --hiveconf <property=value>3. 查看当前所有的配置信息hive > set ; hive (db_hive)> set system:user.name ; system:user.name=beifeng4. 查看帮助[beifeng@hadoop-senior hive-0.13.1]$ bin/hive -help5. 执行sql语句* bin/hive -e <quoted-query-string> eg: bin/hive -e "select * from db_hive.student ;"6. 执行指定的文件* bin/hive -f <filename> eg: $ touch hivef.sql select * from db_hive.student ; $ bin/hive -f /opt/datas/hivef.sql #将执行结果输入到指定的文件中 $ bin/hive -f /opt/datas/hivef.sql > /opt/datas/hivef-res.txt7. 在hive cli命令窗口中如何查看hdfs文件系统hive (default)> dfs -ls / ;8. 在hive cli命令窗口中如何查看本地文件系统hive (default)> !ls /opt/datas ;
1. 下载http://archive.cloudera.com/cdh5/cdh/5/hbase-1.2.0-cdh5.7.0.tar.gz2. 解压#解压到指定的文件夹 tar xzvf hbase-1.2.0-cdh5.7.0.tar.gz -C /usr/loca/hadoop/app3. 配置文件修改进入目录: /usr/local/hadoop/app/hbase-1.2.0-cdh5.7.0/conf(1)修改hbase-env.sh#指定JAVA_HOME地址 export JAVA_HOME=/usr/local/hadoop/app/jdk1.8.0_171 #不使用hbase自带的zk export HBASE_MANAGES_ZK=false(2)修改regionservers添加如下内容:(我的是两个从节点)slave1 slave2(3)修改hbase-site.xml配置如下内容:<property> <name>hbase.rootdir</name> <value>hdfs://master:9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>master,slave1,slave2</value> </property>(4) 远程复制到其他两个节点上scp -rp /usr/local/hadoop/app/hbase-1.2.0-cdh5.7.0 slave1:/usr/local/hadoop/app scp -rp /usr/local/hadoop/app/hbase-1.2.0-cdh5.7.0 slave1:/usr/local/hadoop/app并在slave1,slave2节点上分别配置环境变量(5)配置系统环境变量vi ~/.bashrcexport HBASE_HOME=/usr/local/hadoop/app/hbase-1.2.0-cdh5.7.0 export PATH=$HBASE_HOME/bin:$PATHbash一下,刷新配置,使配置生效echo查看配置是否生效echo $HBASE_HOME4. 检查是否安装成功4.1 启动或停止启动hbase之前,**要首先启动hadoop和zookeeper:(1) 启动hadoop:#格式化 hadoop namenode -format #启动hdfs ./sbin/start-dfs.sh #启动yarn ./sbin/start-yarn.shjps一下,确认启动状态(2) 启动zookeeper#启动 zkServer.sh start #查看启动状态 zkServer.sh status(3) 启动hbasestart-hbase.sh stop-hbase.sh4.2 验证hbase是否启动成功:(1) jps验证 是否有对应进程:[root@master conf]# jps 3936 ResourceManager 7601 Jps 3298 NameNode 7491 Main 3449 SecondaryNameNode 4765 QuorumPeerMain 7229 HMaster [root@slave2 conf]# jps 2210 NodeManager 4082 HRegionServer 2085 DataNode 2871 QuorumPeerMain 4297 Jps(2) 执行hbase shell命令进入shell终端:执行status命令,查看集群状态hbase(main):001:0> status 1 active master, 0 backup masters, 2 servers, 0 dead, 1.0000 average load(3) 执行version命令查看一下版本;(4) 创建一个表:create 'member','info','address'(5) 查看一下表的描述:desc 'member'(6) 查看hbase前端页面http://192.168.175.20:60010查看新建的表,及其他信息,观察是否正常.当无法访问时,可尝试在虚拟机内部访问,或执行curl master:60010
1. 下载http://archive.cloudera.com/cdh5/cdh/5/zookeeper-3.4.5-cdh5.7.0.tar.gz2. 解压# 解压到指定的文件夹 tar xzvf zookeeper-3.4.5-cdh5.7.0.tar.gz -C /usr/local/hadoop/app/3. 配置文件修改(1) 在解压目录下创建两个文件夹:/usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0mkdir data mkdir logs(2) 在data目录创建myid文件touch myid echo 1 > myid在其他两个节点分别写入2和3(3) 修改配置文件zoo.cfg加入如下配置:dataDir=/usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0/data dataLogDir=/usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0/logs server.1=master:2888:3888 server.2=slave1:2888:3888 server.3=slave2:2888:38884. 配置系统环境变量vi ~/.bashrcexport ZK_HOME=/usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0 export PATH=$ZK_HOME/bin:$PATHbash一下,让配置生效.测试一下,环境变量是否生效:echo $ZK_HOME5. 将配置之后的解压文件,分发到其他节点上,注意!!!: 修改myid文件6. 执行以下命令依次启动zkServerzkServer.sh start7. 检查是否安装成功#查看状态 zkServer.sh status在各个节点执行查看状态命令,若可以在zookeeper各节点可以看到一个leader和多个follower即表示搭建成功。leader节点查看状态如下:[root@slave2 bin]# ./zkServer.sh status JMX enabled by default Using config: /usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg Mode: leaderfollower节点查看状态如下:[root@slave1 bin]# ./zkServer.sh status JMX enabled by default Using config: /usr/local/hadoop/app/zookeeper-3.4.5-cdh5.7.0/bin/../conf/zoo.cfg Mode: follower8. 启动zk客户端,进行基本操作测试(1) 启动客户端:./zkCli.sh[root@slave2 bin]# ./zkCli.sh Connecting to localhost:2181 Welcome to ZooKeeper! JLine support is enabled WATCHER:: WatchedEvent state:SyncConnected type:None path:null [zk: localhost:2181(CONNECTING) 0]执行登录命令,出现如上日志内容,即表示登录成功.(2) zk基本操作命令zookeeper的基本操作命令和linux命令类似.ls 命令: 查看所有节点:ls /[zk: localhost:2181(CONNECTING) 0] ls / [zookeeper, otter]示例,根节点有两个,分别为[zookeeper, otter].get 命令: 查看节点数据内容和属性信息:get /otter[zk: localhost:2181(CONNECTED) 1] get /otter null cZxid = 0x100000004 ctime = Fri Jun 07 11:54:06 CST 2019 mZxid = 0x100000004 mtime = Fri Jun 07 11:54:06 CST 2019 pZxid = 0x100000005 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1 #有一个子节点ls2 命令: 查看该节点的子节点信息和属性信息:ls2 /otter[zk: localhost:2181(CONNECTED) 3] ls2 /otter [canal] cZxid = 0x100000004 ctime = Fri Jun 07 11:54:06 CST 2019 mZxid = 0x100000004 mtime = Fri Jun 07 11:54:06 CST 2019 pZxid = 0x100000005 cversion = 1 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 0 numChildren = 1create 命令: 创建节点,并指定节点内容[zk: localhost:2181(CONNECTED) 4] create /tmp_test 123 Created /tmp_test [zk: localhost:2181(CONNECTED) 5] ls / [zookeeper, tmp_test, otter] [zk: localhost:2181(CONNECTED) 6] get /tmp_test 123create -s 命令: 创建顺序节点,并指定节点内容[zk: localhost:2181(CONNECTED) 7] create -s /tmp_test/sub_node 111 Created /tmp_test/sub_node0000000000 [zk: localhost:2181(CONNECTED) 8] ls /tmp_test [sub_node0000000000] [zk: localhost:2181(CONNECTED) 9] create -s /tmp_test/sub_node 112 Created /tmp_test/sub_node0000000001 [zk: localhost:2181(CONNECTED) 10] ls2 /tmp_test [sub_node0000000001, sub_node0000000000] [zk: localhost:2181(CONNECTED) 12] get /tmp_test/sub_node0000000001 112delete 命令: 删除指定节点[zk: localhost:2181(CONNECTED) 13] ls /tmp_test [sub_node0000000001, sub_node0000000000] [zk: localhost:2181(CONNECTED) 14] delete /tmp_test/sub_node0000000000 [zk: localhost:2181(CONNECTED) 15] ls /tmp_test [sub_node0000000001]rmr 命令: 删除多个层级的节点[zk: localhost:2181(CONNECTED) 16] ls / [zookeeper, tmp_test, otter] [zk: localhost:2181(CONNECTED) 18] ls /tmp_test [sub_node0000000001] [zk: localhost:2181(CONNECTED) 19] rmr /tmp_test [zk: localhost:2181(CONNECTED) 20] ls / [zookeeper, otter]
KafkaOffsetMonitor是一个可以用于监控Kafka的Topic及Consumer消费状况的工具,其配置和使用特别的方便。源项目Github地址为:https://github.com/quantifind/KafkaOffsetMonitor。最简单的使用方式是从Github上下载一个最新的KafkaOffsetMonitor-assembly-0.2.1.jar,上传到某服务器上,然后执行一句命令就可以运行起来。1. 创建kafka管理台文件夹创建一个kafka-monitor的文件夹,并将KafkaOffsetMonitor-assembly-0.4.1-SNAPSHOT.jar拷贝进去。创建文件夹2. 创建启动&停止脚本启动脚本kafka-monitor-start.shjava -Xms512M -Xmx512M -Xss1024K -XX:PermSize=256m -XX:MaxPermSize=512m \ -cp KafkaOffsetMonitor-assembly-0.4.1-SNAPSHOT.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --offsetStorage kafka \ --kafkaBrokers 192.168.10.128:9092 \ --kafkaSecurityProtocol PLAINTEXT \ --zk 192.168.10.129:2181 \ --port 8088 \ --refresh 10.seconds \ --retain 2.days 1>/app/kafka-monitor/logs/stdout.log 2>/app/kafka-monitor/logs/stderr.log & \停止脚本kafka-monitor-stop.shkillnum=`jps | grep OffsetGetterWeb | awk '{print $1}'` kill -9 ${killnum} echo "OK...."3. 创建日志文件夹&日志文件在kafka-monitor 下mkdir logs,创建logs文件件,touch stdout.log ,touch stderr.log,创建stdout.log和stderr.log日志文件创建文件日志文件4. 运行启动脚本运行启动脚本后,查看日志文件,出现如下信息,基本代码启动成功启动日志查看是否已经成功启动进程:ps -ef | grep 'kafka'5. 访问web控制台启动浏览器,输入部署服务器的ip:8089,看到如下画面即代表OK.展示页面关键页面说明说明:通过对比producer offset和consumer offset的值,也即是offset lag.可以监控消息是否能够及时被消费.若lag一直持续增大,超过一定的量,达到kafka清除消息的话,消息就可能会出现被清除掉而永远不能被消费的风险.所以当lag超过一定量时要提前做预警.
0. 基本环境说明jdk版本: jdk1.7.0_80hadoop版本: hadoop-2.6.1hive版本: apache-hive-1.2.2-bin1. 下载解压#解压到指定的目录 tar xzvf apache-hive-1.2.2-bin.tgz -C /usr/local/src2. 安装mysqlmysql可以和hive装在一个虚拟机上,也可以装在其他的机器上,进行远程连接.我之前已经在本地pc上装的有mysql,所以在这里就不赘述mysql的安装步骤了.添加mysql驱动:copy一个mysql的驱动jar包,到hive的lib目录下.3. 修改配置文件进入解压后的目录/usr/local/src/apache-hive-1.2.2-bin(1) 在conf目录下创建一个名为hive-site.xml文件:添加如下配置:配置数据库连接:配置显示当前数据库,以及查询表的行头信息<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://192.168.175.1:3306/metastore?createDatabaseIfNotExist=true</value> </property> <property> <name>javax.jdo.option.ConnectionDriverName</name> <value>com.mysql.jdbc.Driver</value> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>root</value> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>root</value> </property> <property> <name>hive.cli.print.header</name> <value>true</value> <description>Whether to print the names of the columns in query output.</description> </property> <property> <name>hive.cli.print.current.db</name> <value>true</value> <description>Whether to include the current database in the Hive prompt.</description> </property> </configuration>(2) 修改Hive运行日志存放位置:在/usr/local/src/apache-hive-1.2.2-bin目录下新建一个logs文件夹,用于保存日志文件:mkdir logs修改文件hive-log4j.properties:#hive.log.dir=${java.io.tmpdir}/${user.name} hive.log.dir=/usr/local/src/apache-hive-1.2.2-bin/logs4. 配置环境变量export HIVE_HOME=/usr/local/src/apache-hive-1.2.2-bin export PATH=$HIVE_HOME/bin:$PATH要记得bash下,刷新配置.查看一下环境变量配置是否生效:echo $HIVE_HOMEhive相当于是一个客户端,无需搞成集群模式.所以只需要在一个节点上配置即可.5. 测试输入hive命令,启动Hive CLIhive退出Hive CLI:exit6. 可能遇到的异常信息(1) 连接数据库时,可能会因为varchar字节问题报异常,可参考下面文章:http://blog.csdn.net/wind520/article/details/39890967(2) 启动过程中,可能会出现hive和hadoop中组件版本不一致引起的错误.可参考下面文章:操作内容: 删除share/hadoop/yarn/lib下的旧jar包,copy新的进来,同时分发到其他节点上.https://blog.csdn.net/silentwolfyh/article/details/51568228
批量操作只需要连接一次数据库,在mybatis中执行一次sql即可.相比逐条操作,频繁打开断开数据库连接,效率会提高很多.虽然在逐条循环操作时,引入连接池会优化很大的效率,但相比批量操作效率也是比较低的.批量操作的优点:减少连接数据库的频率;减少提交事务的频率;批量操作的缺点:一次失败,整个批次的操作都会回归,问题不易排查;故一次批量操作,不易将数据设置过大;当数据量很大,mybatis级别的批量操作也是扛不住的,到时需要考虑其他手段了.1. 批量插入<insert id="insertBatch" parameterType="java.util.List"> insert into MBR_POINTS_BATCH_NEW( ID, MEMBER_ID, POINTS_TYPE, TRANSACTION_ID, TRANSACTION_TYPE, TRANSACTION_CREATE_TIME, MERCHANT_CODE, STORE_CODE, POINTS_PRICING_ID, CONTRACT_NO, AGREEMENT_NO, POINTS_PRICE, ISSUE_POINTS_COST, POINTS, POINTS_BALANCE, HOLD_POINTS, VALUE_DATE, MATURITY_DATE, REFER_NO, CREATE_TIME, LAST_UPDATE_TIME ) <foreach collection="list" item="item" index="index" open="(" close=")" separator="UNION ALL"> SELECT #{item.id, jdbcType=DECIMAL}, #{item.memberId, jdbcType=DECIMAL}, #{item.pointsType, jdbcType=VARCHAR}, #{item.transactionId, jdbcType=DECIMAL}, #{item.transactionType, jdbcType=VARCHAR}, #{item.transactionCreateTime, jdbcType=TIMESTAMP}, #{item.merchantCode, jdbcType=VARCHAR}, #{item.storeCode, jdbcType=VARCHAR}, #{item.pointsPricingId, jdbcType=DECIMAL}, #{item.contractNo, jdbcType=VARCHAR}, #{item.agreementNo, jdbcType=VARCHAR}, #{item.pointsPrice, jdbcType=DECIMAL}, #{item.issuePointsCost, jdbcType=DECIMAL}, #{item.points, jdbcType=DECIMAL}, #{item.pointsBalance, jdbcType=DECIMAL}, #{item.holdPoints, jdbcType=DECIMAL}, #{item.valueDate, jdbcType=TIMESTAMP}, #{item.maturityDate, jdbcType=TIMESTAMP}, #{item.referNo, jdbcType=VARCHAR}, #{item.createTime, jdbcType=TIMESTAMP}, #{item.lastUpdateTime, jdbcType=TIMESTAMP} from dual </foreach> </insert>2. 批量更新<update id="updateBatch" parameterType="java.util.List"> update POINTS_TRANSACTION_DETAIL set CLEAN_STATUS= <foreach collection="list" item="item" index="index" separator=" " open="case ID" close="end"> when #{item.id} then #{item.cleanStatus} </foreach> where ID in <foreach collection="list" index="index" item="item" separator="," open="(" close=")"> #{item.id,jdbcType=DECIMAL} </foreach>
1. 应用场景场景描述:项目重构,要匹配交易类型处理数据.交易类型的字段类型为String,最初是在switch中直接使用String类型匹配.为了提升代码的可靠性与重用性,这里将String类型换成枚举类.每次传进来的String,首先处理成枚举类进行匹配.2. 源码如下:public enum TransactionType { P_1000("1000"), P_1001("1001"), P_1002("1002"), P_1003("1003"), P_1004("1004"); private String transactionType; private TransactionType(String transactionType){ this.transactionType = transactionType; } public String getTransactionType() { return transactionType; } public void setTransactionType(String transactionType) { this.transactionType = transactionType; } //用于switch中,将String转化为枚举类 public static TransactionType getByType(String type){ for (TransactionType transactionType:values()) { if (transactionType.getTransactionType().equals(type)) { return transactionType; } } return null; } //测试代码 public static void main(String[] args) { switch (TransactionType.getByType("1000")) { case P_1000: System.out.println("------ok啦----"); break; case P_1001: System.out.println("------哭啦----"); break; default: System.out.println("------摔啦----"); break; } } }
一. Hive入门简介1. Hive是什么Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类SQL查询功能.构建在Hadoop 之上的数据仓库;使用HQL作为查询接口;使用HDFS存储;使用MapReduce计算;本质是:将HQL转化成MapReduce程序.基于其本质,也可以将Hive看成是一个SQL的解析引擎,但这样说并不全面,毕竟有时SQL也会不被解析的;底层: HDFS负责存储数据;YARN负责进行资源管理;MapReduce负责处理数据;2. Hive产生的目的用于解决海量结构化日志的数据统计;结构化日志: 有统一的规范(1) 每一行数据就是一条数据;(2) 很多列之间有统一的标识符进行分割;3. 特点操作接口采用类SQL语法,提供快速开发的能力(简单、容易上手);避免了去写MapReduce,减少开发人员的学习成本;统一的元数据管理,可与impala/spark等共享元数据;灵活性和扩展性比较好:支持UDF,自定义存储格式等;数据存储在hdfs上,存储容量可以平滑扩展;适合离线数据处理;hive本质是跑MapReduce程序,更适合离线数据处理;比如:日志分析,海量结构化数据离线分析…Hive的执行延迟比较高,因此hive常用于数据分析的,对实时性要求不高的场合;Hive优势在于处理大数据,对于处理小数据没有优势,因为Hive的执行延迟比较高。HQL与SQL的区别:HSQL:读时模式,也就是读数据做检查操作,比如检查表是否存在,字段是否存在,sql语法是否正确等;但在加载数据时,不对数据进行检查,仅仅是文件的复制或移动,所以加载数据会比较快.HSQL写快读慢.SQL:写时模式,就是在加载数据时,会建立索引,字段检查等操作,这些操作都是为了提升查询的性能.所以SQL查询时比较快,而写入时比较慢.SQL读快,写慢.UDF,UDAF与UDTF的区别:UDF:用户自定义普通函数,一对一,用于select语句,对查询字段的结构化处理;比如format操作等;UDAF:用户自定义聚合函数,多对一,group byUDTF:用户自定义表生成函数,一对多.比如:split,分词4. 基本概念4.1 表的概念(类似关系型数据库)create table bf_log( ip string, user string, date string, ...... )4.2 HQL(类似SQL)#HiveQL select * from bf_log limit 10 ; select substring(ip,0,4) ip_prex from bg_log ;HQL的本质是通过Hive Engine转换为MapReduce任务.二. Hive知识入门Hive架构2.1 架构组成(1) 用户接口: ClientCLI(hive shell)、JDBC/ODBC(java访问hive),WEBUI(浏览器访问hive)(2) 元数据: Metastore元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;默认存储在自带的derby数据库中,推荐使用采用MySQL存储Metastore;(3) Driver包含:解析器、编译器、优化器、执行器;解析器:将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误(比如select中被判定为聚合的字段在group by中是否有出现);编译器:将抽象语法树AST编译生成逻辑执行计划;优化器:对逻辑执行计划进行优化;执行器:把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/TEZ/Spark.(4) HDFS使用HDFS存储数据.Hive本身不存在数据,Hive虽有表的定义但表是纯的逻辑表,数据是存在HDFS上的.HIVE中的内容不支持改写和删除,适合读多写少的场景.
一. kafka应用流程示意image1. 前端js埋点,就是调用后端提供的对应接口.接口请求示例如下:http://pingserver.com?itemid=111&userid=110&action=show&...为了保证轻量级,并发度高,前端js埋点向后端异步发送的请求不需要关注返回状态,只负责调用即可;2. flume监听log日志,将实时增加的log日志通过flume管道注入kafka中,接下来可以由storm或spark streaming进行实时流处理;3. 方向(1)中应用:storm,spark streaming更偏重于业务处理及数据挖掘;4. 方向(2)中应用:是将非结构化的用户行为日志数据转换成结构化的数据存入hbase中,使用hive进行行为日志的分析,比如统计pv,uv,vv,ctr,dau等.二. 搭建日志采集系统log server流程图日志采集系统上图,就是一个Log Server实现的最简单流程图.Nginx分发器:上面提到了前端js埋点请求,要求速度要快,并发度要高,所以这里使用了Nginx分发器作为web server,实现反向代理与LB(负载均衡);Spawn-cgi:上图只是示例提供一个server服务的场景,同样也可以在不同节点上,提供相同的服务,用nginx实现负载均衡,以能提供更快更高可用的服务;Spawn-cgi的功能: 就是提供了一个网关接口,它可以快速的实现对外暴露server服务的功能,并能使底层的服务变成一个常驻的守护进程;它的请求走的fcgi协议,这种协议更加适合外部请求,因为http请求很容易受到攻击;Thrift RPC:在定义接口规范之后,能够帮助我们快速的生成client和server代码,并能帮助我们实现服务之间的解耦:client只负责字段的解析等轻量级的工作;server才是真正的引擎核心,我们可以在这里实现自己的业务处理逻辑.使用Thrift RPC生成的client和server之间的通信,走的是RPC协议,这种协议有如下好处:跨语言,支持多种语言去生成client和server代码,c++,Python,java等;保证数据的安全,相比http协议更不容易受到外部攻击;速度快,性能好,比如用c++生成代码,实现效果性能更好,速度更快,更能应对高并发的处理请求;RPC协议更加适合底层内部的请求,所以设计上后端一般都是使用RPC协议.另外,RPC的两端client和server只要遵循RPC协议和定义的scheme接口通信规范,两端可以使用不同的开发语言.4. 上面的client server中server,并不只是一种简单的服务,它可以由多个server通过RPC协议构成,比如下面搭建推荐系统:推荐系统三. 模拟日志收集系统的相关技术功能梳理Thrift RPC:在定义接口通信规范后,可以用Thrift命令快速生成server和client代码,完成最基本的C/S架构;这种生成代码的方式,可以帮助我们实现服务之间的解耦,client只负责字段的解析等轻量级的工作,而server才是真正的处理引擎;在server里面,我们可以实现自己的业务处理逻辑.通过glogs可以将收集到用户行为日志快速高效的写入log文件中.Spawn-CGI: 通过cgi提供的网关接口,可以将自己用thrift rpc生成的server服务提供给外部.简单的可以理解为提供了一种代理,可以在非应用程序所在的机器上操作应用程序.Nginx分发器: 就是web server,用于分发用户的请求,实现反向代理与负载均衡;通过它可以将用户的js埋点请求分发给我们的server应用程序去处理;ab压测: 如果Thrift RPC使用c++生成client和server,可以大大的提供性能,这种场景下,可以使用ab压测工具,进行压力测试;上面的技术部分,基本就实现了模拟日志收集系统的搭建,下面再扩展做一下介绍.Flume + Hbase/Hive : 用于用户行为日志分析;Flume+Kafka+Storm/Spark Streaming :用于实时流处理的数据挖掘;
在项目中,是使用Spring集成ignite的方式进行嵌入式启动.那么Spring如何集成Ignite Log4j2日志呢,请见下面介绍:1. Spring集成Log4j2首先来看Spring是如何集成Log4j2日志的.引入对应的maven依赖:<dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-jcl</artifactId> <version>2.7</version> </dependency>再在classpath目录下添加log4j2.xml就可以了,不需要像log4j一样添加web监听器 .log4j2.xml例子如下,关于log4j2配置文件的具体配置和读取优先级可以参考log4j2官网配置参考<?xml version="1.0" encoding="UTF-8"?> <Configuration status="WARN"> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> </Console> </Appenders> <Loggers> <Logger name="org.springframework.beans.factory" level="DEBUG"/> <Root level="error"> <AppenderRef ref="Console"/> </Root> </Loggers> </Configuration>这样配置之后,在项目启动后,log4j2自然就会启动.注意log4j2中获取log的方法与log4j不同,需要从LogManager中获取private static final Logger logger = LogManager.getLogger(Class.class);2. Spring集成Ignite Log4j2日志集成方式,同样很简单.如果项目中使用maven进行依赖管理,那么需要添加如下的依赖:<dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-log4j2</artifactId> <version>${ignite.version}</version> </dependency>将${ignite.version}替换为实际使用的Ignite版本。要使用Log4j2进行日志记录,需要配置IgniteConfiguration的gridLogger属性,如下所示:XML:<bean class="org.apache.ignite.configuration.IgniteConfiguration"> <property name="gridLogger"> <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger"> <constructor-arg type="java.lang.String" value="log4j2.xml"/> </bean> </property> <!-- Other Ignite configurations --> ... </bean>在上面的配置中,log4j2.xml的路径要么是绝对路径,要么是相对路径,相对路径可以相对于META-INF,也可以相对于IGNITE_HOME。只需要进行如上简单的配置,还是原来使用log4j2的方式,即可兼容正常Ignite log4j2日志.日志示例如下:|2018-04-26 11:17:52.410|main|8081|settlement-service|com.crt.settlement.service.business.dailyclearing.points.impl.PointsDailyClearingServiceImpl||INFO||###+-###|##+-##|##+-##||解锁+++++++++++++++++++++++++++++++++ |2018-04-26 11:17:58.415|main|8081|settlement-service|com.crt.settlement.service.business.dailyclearing.points.impl.PointsDailyClearingServiceImpl||INFO||###+-###|##+-##|##+-##||=================flabo========111111111 |2018-04-26 11:18:01.607|grid-timeout-worker-#23%myBatisObjectGrid%|8081|settlement-service|org.apache.ignite.internal.IgniteKernal%myBatisObjectGrid||INFO||###+-###|##+-##|##+-##|| Metrics for local node (to disable set 'metricsLogFrequency' to 0) ^-- Node [id=56ea94bb, name=myBatisObjectGrid, uptime=01:04:00.279] ^-- H/N/C [hosts=1, nodes=1, CPUs=4] ^-- CPU [cur=0%, avg=0.55%, GC=0%] ^-- PageMemory [pages=6359] ^-- Heap [used=579MB, free=67.68%, comm=812MB] ^-- Non heap [used=120MB, free=-1%, comm=122MB] ^-- Outbound messages queue [size=0] ^-- Public thread pool [active=0, idle=0, qSize=0] ^-- System thread pool [active=0, idle=6, qSize=0] |2018-04-26 11:18:01.607|grid-timeout-worker-#23%myBatisObjectGrid%|8081|settlement-service|org.apache.ignite.internal.IgniteKernal%myBatisObjectGrid||INFO||###+-###|##+-##|##+-##||FreeList [name=myBatisObjectGrid, buckets=256, dataPages=1, reusePages=0]3. 参考文章[1] springMVC集成log4j2
2022年05月