方式二 注解方式
- 对类添加@RabbitListener(queues = "${java.flink.queue}")注解
- 指定队列名称 可从配置文件中读取
- 对方法添加 @RabbitHandler 注解
三个参数
- Object message
任意类型的消息 # 解析mq消息 String messageString=JsonUtils.toJson(message); Message message1=JsonUtils.fromJsonObject(messageString,Message.class); String message2 = new String(message1.getBody(), "UTF-8");
- Message msg
手动确认 //如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉 final long deliveryTag = msg.getMessageProperties().getDeliveryTag(); //通知 MQ 消息已被成功消费,可以ACK了 channel.basicAck(deliveryTag, false);
- Channel channel
// 处理失败,重新压入MQ channel.basicRecover();
线程池
源码
https://gitee.com/pingfanrenbiji/resource/tree/master/flink/code/thread
spring线程相关注解
- @EnableAsync
使用多线程 - @Async
加在线程任务的方法上(需要异步执行的任务) 定义一个线程任务 通过spring提供的ThreadPoolTaskExecutor就可以使用线程池
重要参数
- corePoolSize
核心线程数 - maxPoolSize
最大线程数 - queueCapacity
队列容量 - keepAliveSeconds
活跃时间 - waitForTasksToCompleteOnShutdown
设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean - rejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy())
- setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
- CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
使用线程池中的线程去执行异步任务
分布式内存文件系统Alluxio
环境搭建
- 自定义dokcer网络
docker network create alluxio_nw
- 安装alluxio master
docker run -d --rm \ -p 19999:19999 \ -p 19998:19998 \ --net=alluxio_nw \ --name=alluxio-master \ -e ALLUXIO_JAVA_OPTS=" \ -Dalluxio.master.hostname=alluxio-master" \ alluxio/alluxio master
- 安装alluxio worker
docker run -d --rm \ -p 29999:29999 \ -p 30000:30000 \ --net=alluxio_nw \ --name=alluxio-worker \ --shm-size=3971.64MB \ -e ALLUXIO_JAVA_OPTS=" \ -Dalluxio.worker.memory.size=3971.64MB \ -Dalluxio.master.hostname=alluxio-master \ -Dalluxio.worker.hostname=alluxio-worker" \ alluxio/alluxio worker
域名转发配置
sudo vim /etc/hosts 127.0.0.1 alluxio-worker
上传alluxio文件
下载alluxio文件




