java多线程编程核心技术

简介: 一,共享资源 使用sleep()观察数据紊乱注意:以下几份代码其中生产者(Producer.java),消费者(Consumer.java),和测试类(TestDemo.

一,共享资源 使用sleep()观察数据紊乱

注意:以下几份代码其中生产者(Producer.java),消费者(Consumer.java),和测试类(TestDemo.java)都完全一样主要对共享资源文件(Resource.java)操作

img_a2368a7dbc2bb19a18ea1ca4a6fd23bd.png

Resource.java共享资源

//共享资源对象
public class Resource {
private String name;
private String gender;

// 让生产者调用设置共享资源的成员变量以供消费者的打印操作
public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    this.gender = gender;
}

// 供消费者从共享资源取出数据
public void pop() {
    try {

        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(this.name + "-" + this.gender);
}

Producer.java生产者

public class Producer implements Runnable {
public Resource resource = null;

public Producer(Resource resource) {
    this.resource = resource;
}
@Override
public void run() {
    for (int i = 0; i < 100; i++) {
        if (i % 2 == 0) {
            resource.push("凤姐", "女");
        } else {
            resource.push("春哥", "男");
        }
    }
}

Consumer.java消费者

 public class Consumer implements Runnable {
// 消费者拥有共享资源对象以便实现调用方法执行打印操作
public Resource resource = null;

// Creatr Constructor
public Consumer(Resource resource) {
    this.resource = resource;
}
// 重写run()方法 执行pop()方法打印结果
@Override
public void run() {
    for (int i = 0; i < 50; i++) {

        resource.pop();
    }
}

TestDemo.java测试代码

   public class TestDemo {
   public static void main(String[] args) {
    // 创建共享资源对象 开启线程
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();
    new Thread(new Consumer(resource)).start();
}

分析结果:凤姐-男 凤姐-女 凤姐-男 发现性别乱序了
刚开始打印 凤姐-男 生产者先生产出春哥哥-男,此时消费者没有消费,生产者继续生产出姓名为凤姐,此时消费者开始消费了.

二,使用同步锁 避免数据紊乱

Resource.java共享资源

//共享资源对象
public class Resource{
private String name;
private String gender;
//生产者向共享资源存储数据
synchronized public void push(String name, String gender)  {
    this.name = name;
    try{
    Thread.sleep(100);
    }catch(InterruptedException e){
        e.printStackTrace();
    }
    this.gender = gender;
}
//  消费者从共享资源对象取数据
synchronized public void pop(){
    try{
        Thread.sleep(100);
    }catch(InterruptedException e){
        e.printStackTrace();
    }
    System.out.println(this.name + "-" +this.gender);
}

出现性别紊乱的情况.

  • 解决方案:只要保证在生产姓名和性别的过程保持同步,中间不能被消费者线程进来取走数据.
  • 可以使用同步代码块/同步方法/Lock机制来保持同步性.
三,怎么实现出现生产一个数据,消费一个数据.
  • 应该交替出现: 春哥哥-男-->凤姐-女-->春哥哥-男-->凤姐-女.....

  • 解决方案: 使用 等待和唤醒机制.

  • wait():执行该方法的线程对象释放同步锁,JVM把该线程存放到等待池中,等待其他的线程唤醒该线程.
    notify:执行该方法的线程唤醒在等待池中等待的任意一个线程,把线程转到锁池中等待.
    notifyAll():执行该方法的线程唤醒在等待池中等待的所有的线程,把线程转到锁池中等待.
    注意:上述方法只能被同步监听锁对象来调用,否则报错IllegalMonitorStateException..

Resource.java共享资源

//共享资源对象
public class Resource {
private String name;
private String gender;
private boolean isEmpty = true;// 表示共享资源对象是否为空的状态 第一次为空要设置默认值为true

// 生产者向共享资源对象中存储数据
synchronized public void push(String name, String gender) {

    try {
        while (!isEmpty) { // 当共享资源对象有值时 ,不空等着消费者来获取值 使用同步锁对象来调用
            // 表示当前线程释放同步锁进入等待池只能被其他线程唤醒
            this.wait();
        }

        this.name = name;
        Thread.sleep(100);
        this.gender = gender;
        // 生成结束
        isEmpty = false;// 设置共享资源对象为空
        this.notify();// 唤醒一个消费者
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

// 消费者从共享资源对象中取数据
synchronized public void pop() {
    try {
        while (isEmpty) {// 当前共享资源为空 等待生产者来生产
            // 使用同步锁对象来调用此方法 表示当前线程释放同步锁进入等待池只能被其他线程唤醒
            this.wait();
        }
        // 消费开始
        Thread.sleep(100);
        System.out.println(this.name + "-" + this.gender);
        // 消费结束
        isEmpty = true;
        // 唤醒其他线程
        this.notify();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
四, 线程通信-使用Lock和Condition接口

wait和notify方法,只能被同步监听锁对象来调用,否则报错IllegalMonitorStateException.
那么现在问题来了,Lock机制根本就没有同步锁了,也就没有自动获取锁和自动释放锁的概念.
因为没有同步锁,所以Lock机制不能调用wait和notify方法.
解决方案:Java5中提供了Lock机制的同时提供了处理Lock机制的通信控制的Condition接口.

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//共享资源对象
public class Resource {
private String name;
private String gender;
private boolean isEmpty = true;
private final Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();

// 生产者向共享资源存储数据
public void push(String name, String gender) {
    lock.lock();
    try {
        while (!isEmpty) {
            condition.await();
        }
        // 开始生成
        this.name = name;
        Thread.sleep(100);
        this.gender = gender;
        // 生成结束
        isEmpty = false;
        condition.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();// 释放锁
    }
}

// 消费者向共享资源获取数据
public void pop() {
    lock.lock();
    try {
        while (isEmpty) {
            condition.await();
        }
        Thread.sleep(100);
        System.out.println(this.name + "-" + this.gender);
        // 消费结束
        isEmpty = true;
        condition.signalAll();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}
五,线程的生命周期
  • 线程状态


    img_dfdb0760a717e66ef00ba16fd585ab78.png
    线程状态
  • 说法 一


    img_abb67e86d590fdfe0c7f83d14b04b724.png
  • 说法 二


    img_977ecd0b1f6f5d2aa9a942cd6645a826.png

    有人又把阻塞状态,等待状态,计时等待状态合称为阻塞状态.


线程对象的状态存放在Thread类的内部类(State)中:

注意:Thread.State类其实是一个枚举类.
因为线程对象的状态是固定的,只有6种,此时使用枚举来表示是最恰当的.

  • 1: 新建状态(new):使用new创建一个线程对象,仅仅在堆中分配内存空间,在调用start方法之前.
    新建状态下,线程压根就没有启动,仅仅只是存在一个线程对象而已.
    Thread t = new Thread();//此时t就属于新建状态

    当新建状态下的线程对象调用了start方法,此时从新建状态进入可运行状态.
    线程对象的start方法只能调用一次,否则报错:IllegalThreadStateException.

  • 2: 可运行状态(runnable):分成两种状态,ready和running。分别表示就绪状态和运行状态。
    就绪状态:线程对象调用start方法之后,等待JVM的调度(此时该线程并没有运行).
    运行状态:线程对象获得JVM调度,如果存在多个CPU,那么允许多个线程并行运行.

  • 3: 阻塞状态(blocked):正在运行的线程因为某些原因放弃CPU,暂时停止运行,就会进入阻塞状态.
    此时JVM不会给线程分配CPU,直到线程重新进入就绪状态,才有机会转到运行状态.
    阻塞状态只能先进入就绪状态,不能直接进入运行状态.
    阻塞状态的两种情况:

  • 1): 当A线程处于运行过程时,试图获取同步锁时,却被B线程获取.此时JVM把当前A线程存到对象的锁池中,A线程进入阻塞状态.

  • 2):当线程处于运行过程时,发出了IO请求时,此时进入阻塞状态.

  • 4: 等待状态(waiting)(等待状态只能被其他线程唤醒):此时使用的无参数的wait方法,

    • 1):当线程处于运行过程时,调用了wait()方法,此时JVM把当前线程存在对象等待池中.
  • 5: 计时等待状态(timed waiting)(使用了带参数的wait方法或者sleep方

  • 6: 终止状态(terminated):通常称为死亡状态,表示线程终止.

  • 1): 正常执行完run方法而退出(正常死亡).

  • 2): 遇到异常而退出(出现异常之后,程序就会中断)(意外死亡).


线程一旦终止,就不能再重启启动,否则报错(IllegalThreadStateException).

在Thread类中过时的方法(因为存在线程安全问题,所以弃用了):
void suspend() :暂停当前线程
void resume() :恢复当前线程
void stop() :结束当前线程

六, 联合线程:

线程的join方法表示一个线程等待另一个线程完成后才执行。join方法被调用之后,线程对象处于阻塞状态。
有人也把这种方式称为联合线程,就是说把当前线程和当前线程所在的线程联合成一个线程。

class Join extends Thread{
public void run(){
    for(int i=0;i<50;i++){
        System.out.println("join:"+i);
    }
}
}
//联合线程
public class UniteThread {

public static void main(String[] args) throws Exception {
    System.out.println("begin.....");
    Join joinThread = new Join();
    for(int i=0;i<50;i++){
        System.out.println("main:"+i);
        if(i==10){
            //启动join线程
            joinThread.start();
        }
        if(i==20){
            //强制执行该线程,执行结束再执行其他线程
             joinThread.join();
        }   
    }
    System.out.println("end");
  }
}
七, 后台线程

后台线程:在后台运行的线程,其目的是为其他线程提供服务,也称为“守护线程"。JVM的垃圾回收线程就是典型的后台线程。
特点:若所有的前台线程都死亡,后台线程自动死亡,前台线程没有结束,后台线程是不会结束的。
测试线程对象是否为后台线程:使用thread.isDaemon()。
前台线程创建的线程默认是前台线程,可以通过setDaenon(true)方法设置为后台线程,并且当且仅当后台线程创建的新线程时,新线程是后台线程。
设置后台线程:thread.setDaemon(true),该方法必须在start方法调用前,否则出现IllegalThreadStateException异常。

public class DaemonThread extends Thread {
public void run() {
    for (int i = 0; i < 100; i++) {
        System.out.println(super.getName() + "-" + i);
    }
}
public static void main(String[] args) {
    System.out.println(Thread.currentThread().isDaemon());
    for (int i = 0; i < 50; i++) {
        System.out.println("main:" + i);
        if (i == 10) {
            DaemonThread t = new DaemonThread();
            t.setDaemon(true);
            t.start();
        }
    }
  }
}
八,线程池的用法
// Executors.newCachedThreadPool();    
//创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
// Executors.newSingleThreadExecutor();   
//创建容量为1的缓冲池
// Executors.newFixedThreadPool(int);    
//创建固定容量大小的缓冲池
class MyTask implements Runnable {
public MyTask() {
}
@Override
public void run() {
 //do something
}
}

ExecutorService executor =  Executors.newFixedThreadPool(5)
MyTask myTask = new MyTask();
executor.execute(myTask);

对于单次提交数据的数量,当然单次数量越少越快,但是次数会变多,总体时间会变长,单次提交过多,执行会非常慢,以至于可能会失败,经过多次测试数据量在几千到一万时是比较能够接受的。
选择那种线程池呢,是固定大小的,还是无限增长的。当线程数量超过限制时会如何呢?这几种线程池都会抛出异常。
有一定经验的同志会不屑的说阻塞的线程池,基本就比较靠谱,例如加上等待队列,等待队列用一个阻塞的队列。小的缺点是一直创建线程,感觉也不是非常的合理。

  • 带队列的线程池

    ThreadPoolExecutor  executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
               new ArrayBlockingQueue(5));
    

使用生产者与消费者对程序进行改进

img_dea8571800f33ae0116fccdaa0a6488f.jpe

Producer.java 生产者

import java.util.concurrent.ArrayBlockingQueue;
public class Producerlocal implements Runnable {
ArrayBlockingQueue<String> queue;
public Producerlocal(ArrayBlockingQueue<String> queue) {
    this.queue = queue;
}

@Override
public void run() {

    try {
        for (int i = 0; i < 1000; i++) {
            queue.put("s" + i);
        }

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

Consumer.java消费者

import java.util.concurrent.ArrayBlockingQueue;
public class Consumerlocal implements Runnable {

ArrayBlockingQueue<String> queue;

public Consumerlocal(ArrayBlockingQueue<String> queue) {
    this.queue = queue;
}

@Override
public void run() {
    while (true) {
        try {
            final String take = queue.take();

            if ("poisonpill".equals(take)) {
                return;
            }
            //do something
            System.out.println(take);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

main主程序

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Main {

public static void main(String[] args) throws InterruptedException {
    int threadNum = Runtime.getRuntime().availableProcessors() * 2;
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
    ExecutorService executor = Executors.newFixedThreadPool(5);
    for (int i = 0; i < threadNum; i++) {
        executor.execute(new Consumerlocal(queue));
    }
    Thread pt = new Thread(new Producerlocal(queue));
    pt.start();
    pt.join();
    for (int i = 0; i < threadNum; i++) {
        queue.put("poisonpill");
    }

    executor.shutdown();
    executor.awaitTermination(10L, TimeUnit.DAYS);
}
}

程序使用了阻塞队列,队列设置一定的大小,加入队列超过数量会阻塞,队列空了取值也会阻塞,感兴趣的同学可以查看jdk源码。消费者线程数是CPU的两倍,对于这些类的使用需要查看手册和写测试代码。对于何时结束线程也有一定的小技巧,加入足够数量的毒丸。

对于代码使用了新的模式,程序明显加快了,到这里生产者消费者模式基本就结束了。如果你下次想起你的程序也需要多线程,正好适合这种模式,那么套用进来就是很好的选择。当然你现在能做的就是撸起袖子,写一些测试代码,找到这种模式的感觉。

因为程序的大多数时间还是在http请求上,程序的运行时间仍然不能够接受。于是想到了利用异步io加快速度,而不用阻塞的http。但是问题是这次的http客户端为了安全验证进行了修改,有加密验证和单点登录,新的客户端能适配起来有一定难度估计需要一定的时间,还是怕搞不定。异步的非阻塞io,对于前面数据结果选择的经验,非阻塞不一定就是好!其实是没太看懂怎么在多线程中使用,而对于所得到的效果就不得而知了。

maven依赖
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.10.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpasyncclient</artifactId>
        <version>4.1.3</version>
    </dependency>
异步http
/*
 *    ===============================================
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at

 *   http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 * =============================================== *
 * This software consists of voluntary contributions made by many
 * individuals on behalf of the Apache Software Foundation.  For more
 * information on the Apache Software Foundation, please see
 * <http://www.apache.org/>.
 *
 */
package com.github.yfor.bigdata.tdg;

import org.apache.http.HttpHost;
 import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.nio.client.CloseableHttpPipeliningClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.client.methods.AsyncCharConsumer;
 import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.protocol.HttpContext;

import java.io.IOException;
import java.nio.CharBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

/**
  * This example demonstrates a pipelinfed execution of multiple HTTP request / response exchanges
 * with a full content streaming.
 */
public class MainPhttpasyncclient {

public static void main(final String[] args) throws Exception {
    CloseableHttpPipeliningClient httpclient = HttpAsyncClients.createPipelining();
    try {
        httpclient.start();

        HttpHost targetHost = new HttpHost("httpbin.org", 80);
        HttpGet[] resquests = {
                new HttpGet("/"),
                new HttpGet("/ip"),
                new HttpGet("/headers"),
                new HttpGet("/get")
        };

        List<MyRequestProducer> requestProducers = new ArrayList<MyRequestProducer>();
        List<MyResponseConsumer> responseConsumers = new ArrayList<MyResponseConsumer>();
        for (HttpGet request : resquests) {
            requestProducers.add(new MyRequestProducer(targetHost, request));
            responseConsumers.add(new MyResponseConsumer(request));
        }

        Future<List<Boolean>> future = httpclient.execute(
                targetHost, requestProducers, responseConsumers, null);
        future.get();
        System.out.println("Shutting down");
    } finally {
        httpclient.close();
    }
    System.out.println("Done");
}

static class MyRequestProducer extends BasicAsyncRequestProducer {

    private final HttpRequest request;

    MyRequestProducer(final HttpHost target, final HttpRequest request) {
        super(target, request);
        this.request = request;
    }

    @Override
    public void requestCompleted(final HttpContext context) {
        super.requestCompleted(context);
        System.out.println();
        System.out.println("Request sent: " + this.request.getRequestLine());
        System.out.println("=================================================");
    }
}

static class MyResponseConsumer extends AsyncCharConsumer<Boolean> {

    private final HttpRequest request;

    MyResponseConsumer(final HttpRequest request) {
        this.request = request;
    }

    @Override
    protected void onResponseReceived(final HttpResponse response) {
        System.out.println();
        System.out.println("Response received: " + response.getStatusLine() + " -> " + this.request.getRequestLine());
        System.out.println("=================================================");
    }

    @Override
    protected void onCharReceived(final CharBuffer buf, final IOControl ioctrl) throws IOException {
        while (buf.hasRemaining()) {
            buf.get();
        }
    }

    @Override
    protected void releaseResources() {
    }

    @Override
    protected Boolean buildResult(final HttpContext context) {
        System.out.println();
        System.out.println("=================================");
        System.out.println();
        return Boolean.TRUE;
    }
}
}
配置
package com.github.yfor.bigdata.tdg;

public interface KafkaProperties {
final static String zkConnect = "localhost:2181";
final static String groupId = "group21";
final static String topic = "topic4";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;

final static String clientId = "SimpleConsumerDemoClient";
}

kafka的配置需要一定的时间,可以阅读官方文档进行安装并运行。

生产者线程
package com.github.yfor.bigdata.tdg;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
private final int size;

public Producer(String topic) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093");
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, "DemoProducer");
    props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<Integer, String>(props);
    this.topic = topic;
    this.isAsync = true;
    this.size = producer.partitionsFor(topic).size();

}

@Override
public void run() {
    int messageNo = 1;
    while (messageNo < 100) {
        try {
            sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        String messageStr = "Message_" + messageNo;
        long startTime = System.currentTimeMillis();
        if (isAsync) { // Send asynchronously 异步
            producer.send(new ProducerRecord<>(topic, messageNo % size, messageNo, messageStr),
                    new DemoCallBack(startTime, messageNo, messageStr));
        } else { // Send synchronously 同步
            try {
                producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get();
                System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
        ++messageNo;
    }

}

}

class DemoCallBack implements Callback {

private final long startTime;
private final int key;
private final String message;

public DemoCallBack(long startTime, int key, String message) {
    this.startTime = startTime;
    this.key = key;
    this.message = message;
}

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
    long elapsedTime = System.currentTimeMillis() - startTime;
    if (metadata != null) {
        System.out.println(
                "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
                        "), " +
                        "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
    } else {
        exception.printStackTrace();
    }
}
}
消费者线程
package com.github.yfor.bigdata.tdg;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class KafkaConsumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
private final int size;

public KafkaConsumer(String topic) {
    consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    this.topic = topic;
    this.size = 5;
}

private static ConsumerConfig createConsumerConfig() {
    Properties props = new Properties();
    props.put("zookeeper.connect", KafkaProperties.zkConnect);
    props.put("group.id", KafkaProperties.groupId);
    props.put("zookeeper.session.timeout.ms", "40000");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "1000");
    return new ConsumerConfig(props);
}

@Override
public void run() {
    try {
        sleep(2000);
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(size));

    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    ExecutorService executor = Executors.newFixedThreadPool(size);
    for (final KafkaStream stream : streams) {
        executor.submit(new KafkaConsumerThread(stream));
    }
}
}

class KafkaConsumerThread implements Runnable {

private KafkaStream<byte[], byte[]> stream;

public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
    this.stream = stream;
}

public void run() {
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    while (it.hasNext()) {
        MessageAndMetadata<byte[], byte[]> mam = it.next();
        System.out.println(Thread.currentThread().getName() + ": partition[" + mam.partition() + "],"
                + "offset[" + mam.offset() + "], " + new String(mam.message()));

    }
}
}
目录
相关文章
|
8天前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
|
6天前
|
安全 Java 开发者
深入解读JAVA多线程:wait()、notify()、notifyAll()的奥秘
在Java多线程编程中,`wait()`、`notify()`和`notifyAll()`方法是实现线程间通信和同步的关键机制。这些方法定义在`java.lang.Object`类中,每个Java对象都可以作为线程间通信的媒介。本文将详细解析这三个方法的使用方法和最佳实践,帮助开发者更高效地进行多线程编程。 示例代码展示了如何在同步方法中使用这些方法,确保线程安全和高效的通信。
25 9
|
6天前
|
监控 安全 Java
Java中的多线程编程:从入门到实践####
本文将深入浅出地探讨Java多线程编程的核心概念、应用场景及实践技巧。不同于传统的摘要形式,本文将以一个简短的代码示例作为开篇,直接展示多线程的魅力,随后再详细解析其背后的原理与实现方式,旨在帮助读者快速理解并掌握Java多线程编程的基本技能。 ```java // 简单的多线程示例:创建两个线程,分别打印不同的消息 public class SimpleMultithreading { public static void main(String[] args) { Thread thread1 = new Thread(() -> System.out.prin
|
8天前
|
存储 缓存 安全
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见
在 Java 编程中,创建临时文件用于存储临时数据或进行临时操作非常常见。本文介绍了使用 `File.createTempFile` 方法和自定义创建临时文件的两种方式,详细探讨了它们的使用场景和注意事项,包括数据缓存、文件上传下载和日志记录等。强调了清理临时文件、确保文件名唯一性和合理设置文件权限的重要性。
21 2
|
8天前
|
安全 Java
Java多线程集合类
本文介绍了Java中线程安全的问题及解决方案。通过示例代码展示了使用`CopyOnWriteArrayList`、`CopyOnWriteArraySet`和`ConcurrentHashMap`来解决多线程环境下集合操作的线程安全问题。这些类通过不同的机制确保了线程安全,提高了并发性能。
|
9天前
|
Java
java小知识—进程和线程
进程 进程是程序的一次执行过程,是系统运行的基本单位,因此进程是动态的。系统运行一个程序即是一个进程从创建,运行到消亡的过程。简单来说,一个进程就是一个执行中的程序,它在计算机中一个指令接着一个指令地执行着,同时,每个进程还占有某些系统资源如CPU时间,内存空间,文件,文件,输入输出设备的使用权等等。换句话说,当程序在执行时,将会被操作系统载入内存中。 线程 线程,与进程相似,但线程是一个比进程更小的执行单位。一个进程在其执行的过程中产生多个线程。与进程不同的是同类的多个线程共享同一块内存空间和一组系统资源,所以系统在产生一个线程,或是在各个线程之间做切换工作时,负担要比
20 1
|
Java
Java多线程编程核心技术(三)多线程通信(下篇)
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。
685 0
|
Java
Java多线程编程核心技术(三)多线程通信(上篇)
线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体。线程间的通信就是成为整体的必用方案之一,可以说,使线程间进行通信后,系统之间的交互性会更强大,在大大提高CPU利用率的同时还会使程序员对各线程任务在处理的过程中进行有效的把控与监督。
2560 0
|
Java 安全
Java多线程编程核心技术(二)volatile关键字
关键字volatile的主要作用是使变量在多个线程间可见。
885 0
|
Java
Java多线程编程核心技术(一)Java多线程技能
本文为《Java并发编程系列》第一章,主要介绍并发基础概念与API
2442 0