• 关于

    rabbitmq http api

    的搜索结果

回答

有个ack的###### 引用来自“hanzhankang”的答案 有个ack的 ack和noack是consume的确认,就是消费者取了队列中的信息后,确认后,消息才从队列中删除 我问的是publish的确认,发布端的确认 ###### RabbitMQ在发布端支持transaction和confirm两种模式来确认是否发布成功,具体见https://www.rabbitmq.com/confirms.html 但是,成功是指队列收到消息(或队列已持久化消息),而并不是consumer已经读取了消息。 如果需要consumer确认读取到消息,不妨看看rabbitmq tutorials的第六个例子——RPC ###### 事务确认是一种方式,但是还有一种方式,是Publist confirm 我知道在C的API里,知道调用amqp_confirm_select可以打开确认,但是没有找到怎样接收的方法 ###### 可能需要自己处理basic.ack消息###### 引用来自“hncscwc”的答案 可能需要自己处理basic.ack消息 就是不知道调用哪个API接口来处理,看JAVA的API,有ConfirmListen监听回调函数,但是C接口中没看到 ######参阅  http://my.oschina.net/moooofly/blog/142095?p=1#rpl_279025130 中的评论
kun坤 2020-06-06 23:44:14 0 浏览量 回答数 0

回答

形如上面 , 就是  basic authentication 你看看你的账号 有没有权限 访问默认 vhost? ######谢谢大神。找到解决的方法了 原来rabbitMQ 带着http的api的 直接请求就好了 谢谢
kun坤 2020-05-29 11:11:34 0 浏览量 回答数 0

问题

安装openstack,rabbitmq报错,求帮助?报错

# packstack --allinone Welcome to the Packstack setup utility The installation log file is available at: /var/tmp/packs...
爱吃鱼的程序员 2020-06-12 10:45:54 0 浏览量 回答数 1

Quick BI 数据可视化分析平台

2020年入选全球Gartner ABI魔力象限,为中国首个且唯一入选BI产品

回答

webservice  传json###### 是不是关于前后端分离的思考什么什么的... 业务没有那么大的情况下 真心不需要这样... ######不是为了项目而使用的,只是想讨论这种架构的可行性###### 1. REST API 2. Messaging 方式,Redis,RabbitMQ 的消息都可以 ######mc redis 或者RPC 性能会比webservice和http高之类高~ Java实现RPC也比较成熟######谢谢,我去看一下RPC######多语言多系统之间的集成会用到WebService: RPC(Remote Procedure Call Protocol):远程过程调用协议 SOAP(Simple Object Access Protocol):简单对象访问协议 REST(Representational State Transfer):表述性状态传递 Gearman(Client-Job-Worker):Gearman的Client和Worker可以使用不同的语言,Client可以调用Worker的服务. 都不简单,所以还是浏览器JS,服务器PHP来得直接. ######…######应该用rpc会效率高些!######前台用mvvm框架,后台resetful规范,实现前后台分离
爱吃鱼的程序员 2020-05-29 20:43:23 0 浏览量 回答数 0

问题

Python的事件框架?

我正在构建一个可与Web客户端(Django)和远程API(可能是独立守护程序)一起使用的系统。我看到将它们的工作与JavaScript中的某些事件框架进行协调比较容易。不幸的是&#x...
祖安文状元 2020-02-22 15:53:02 0 浏览量 回答数 1

回答

RocketMQ 支持定时消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s, 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类推 如何配置 1、可以直接在服务器端的broker.conf中进行配置, 在服务器端(rocketmq-broker端)的属性配置文件中加入以下行: messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 当然这种方式不够灵活,不推荐 2、第二种方式就是在程序中进行指定,这个会在代码中展示,上述的时间配置述了各级别与延时时间的对应映射关系, 这个配置项配置了从1级开始,各级延时的时间,可以修改这个指定级别的延时时间; 时间单位支持:s、m、h、d,分别表示秒、分、时、天; 默认值就是上面声明的,可手工调整; 默认值已够用,不建议修改这个值。 了解了这些基本的概念后,下面通过一段简单的程序演示一下效果,相对于rabbitmq的延迟消息的使用,rocketmq的延迟消息使用起来简单了很多, 3、我们使用一个controller模拟浏览器调用接口发送一个延迟的消息,这里为了演示方便发送消息的操作直接放在了controller里面了,实际开发中不要这样做, @RestController @RequestMapping("/api/order") public class OrderController { //http://localhost:8088/api/v1/order?msg=hello&tag=testtag @Autowired private MsgProducer msgProducer; @Autowired private PayProducer payProducer; /** * @param msg 支付信息 * @param tag 消息二级分类 * @return */ @GetMapping("/order") public Object order(String msg, String tag) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException{ Message message = new Message("testTopic",tag, msg.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult result = msgProducer.getProducer().send(message); System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus()); return JsonData.buildSuccess(); } //http://localhost:8082/api/order/delay?text=hello order /** * 发送延迟消息 * @param text * @return */ @GetMapping("/delay") public Object sendDelayMsg(String text) throws MQClientException, RemotingException, InterruptedException{ Message message = new Message(JmsConfig.TOPIC, "delay_order",("this is a delay message:" + text).getBytes()); //"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" message.setDelayTimeLevel(3); payProducer.getProducer().send(message, new SendCallback() { //消息发送成功回调 @Override public void onSuccess(SendResult sendResult) { System.out.printf("发送结果=%s, msg=%s ", sendResult.getSendStatus(), sendResult.toString()); } //消息异常回调 @Override public void onException(Throwable e) { e.printStackTrace(); //补偿机制,根据业务情况进行使用,看是否进行重试 } }); return "send ok"; } } 2、payProducer类, @Component public class PayProducer { private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; public PayProducer(){ producer = new DefaultMQProducer(producerGroup); //生产者投递消息重试次数 producer.setRetryTimesWhenSendFailed(3); //指定NameServer地址,多个地址以 ; 隔开 producer.setNamesrvAddr(JmsConfig.NAME_SERVER); start(); } public DefaultMQProducer getProducer(){ return this.producer; } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start(){ try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器,进行关闭 */ public void shutdown(){ this.producer.shutdown(); } } 对了,我的rocketmq得服务器地址放在配置类里面了,如下, public class JmsConfig { public static final String NAME_SERVER = "192.168.111.132:9876"; public static final String TOPIC = "DELAY_TOPIC"; } 基本上就可以了,然后我们启动一下程序,浏览器调用,然后看一下后台打印的日志, http://localhost:8082/api/order/delay?text=hello delayorder
1748847708358317 2019-12-02 03:11:13 0 浏览量 回答数 0

回答

原因 1:性能 Go语言的9大优势和3大缺点 Go 极其地快。其性能与 Java 或 C++相似。在我们的使用中,Go 一般比 Python 要快 30 倍。以下是 Go 与 Java 之间的基准比较: Go语言的9大优势和3大缺点 Go语言的9大优势和3大缺点 Go语言的9大优势和3大缺点 Go语言的9大优势和3大缺点 原因 2:语言性能很重要 对很多应用来说,编程语言只是简单充当了其与数据集之间的胶水。语言本身的性能常常无关轻重。 但是 Stream 是一个 API 提供商,服务于世界 500 强以及超过 2 亿的终端用户。数年来我们已经优化了 Cassandra、PostgreSQL、Redis 等等,然而最终抵达了所使用语言的极限。 Python 非常棒,但是其在序列化/去序列化、排序和聚合中表现欠佳。我们经常会遇到这样的问题:Cassandra 用时 1ms 检索了数据,Python 却需要 10ms 将其转化成对象。 原因 3:开发者效率&不要过于创新 看一下绝佳的入门教程《开始学习 Go 语言》(http://howistart.org/posts/go/1/)中的一小段代码: package main type openWeatherMap struct{}func (w openWeatherMap) temperature(city string) (float64, error) { resp, err := http.Get("http://api.openweathermap.org/data/2.5/weather?APPID=YOUR_API_KEY&q=" + city) if err != nil { return 0, err } defer resp.Body.Close() var d struct { Main struct { Kelvin float64 json:"temp" } json:"main" } if err := json.NewDecoder(resp.Body).Decode(&d); err != nil { return 0, err } log.Printf("openWeatherMap: %s: %.2f", city, d.Main.Kelvin) return d.Main.Kelvin, nil} 如果你是一个新手,看到这段代码你并不会感到吃惊。它展示了多种赋值、数据结构、指针、格式化以及内置的 HTTP 库。 当我第一次编程时,我很喜欢使用 Python 的高阶功能。Python 允许你创造性地使用正在编写的代码,比如,你可以: 在代码初始化时,使用 MetaClasses 自行注册类别 置换真假 添加函数到内置函数列表中 通过奇妙的方法重载运算符 毋庸置疑这些代码很有趣,但也使得在读取其他人的工作时,代码变得难以理解。 Go 强迫你坚持打牢基础,这也就为读取任意代码带来了便利,并能很快搞明白当下发生的事情。 注意:当然如何容易还是要取决于你的使用案例。如果你要创建一个基本的 CRUD API,我还是建议你使用 Django + DRF,或者 Rails。 原因 4:并发性&通道 Go 作为一门语言致力于使事情简单化。它并未引入很多新概念,而是聚焦于打造一门简单的语言,它使用起来异常快速并且简单。其唯一的创新之处是 goroutines 和通道。Goroutines 是 Go 面向线程的轻量级方法,而通道是 goroutines 之间通信的优先方式。 创建 Goroutines 的成本很低,只需几千个字节的额外内存,正由于此,才使得同时运行数百个甚至数千个 goroutines 成为可能。你可以借助通道实现 goroutines 之间的通信。Go 运行时间可以表示所有的复杂性。Goroutines 以及基于通道的并发性方法使其非常容易使用所有可用的 CPU 内核,并处理并发的 IO——所有不带有复杂的开发。相较于 Python/Java,在一个 goroutine 上运行一个函数需要最小的样板代码。你只需使用关键词「go」添加函数调用: package main import ( "fmt" "time")func say(s string) { for i := 0; i < 5; i++ { time.Sleep(100 * time.Millisecond) fmt.Println(s) }}func main() { go say("world") say("hello")} Go 的并发性方法非常容易上手,相较于 Node 也很有趣;在 Node 中,开发者必须密切关注异步代码的处理。 并发性的另一个优质特性是竞赛检测器,这使其很容易弄清楚异步代码中是否存在竞态条件。下面是一些上手 Go 和通道的很好的资源: https://gobyexample.com/channels https://tour.golang.org/concurrency/2 http://guzalexander.com/2013/12/06/golang-channels-tutorial.html https://www.golang-book.com/books/intro/10 https://www.goinggo.net/2014/02/the-nature-of-channels-in-go.html 原因 5:快速的编译时间 当前我们使用 Go 编写的最大微服务的编译时间只需 6 秒。相较于 Java 和 C++呆滞的编译速度,Go 的快速编译时间是一个主要的效率优势。我热爱击剑,但是当我依然记得代码应该做什么之时,事情已经完成就更好了。 Go语言的9大优势和3大缺点 Go 之前的代码编译 原因 6:打造团队的能力 首先,最明显的一点是:Go 的开发者远没有 C++和 Java 等旧语言多。据知,有 38% 的开发者了解 Java,19.3% 的开发者了解 C++,只有 4.6% 的开发者知道 Go。GitHub 数据表明了相似的趋势:相较于 Erlang、Scala 和 Elixir,Go 更为流行,但是相较于 Java 和 C++ 就不是了。 幸运的是 Go 非常简单,且易于学习。它只提供了基本功能而没有多余。Go 引入的新概念是「defer」声明,以及内置的带有 goroutines 和通道的并发性管理。正是由于 Go 的简单性,任何的 Python、Elixir、C++、Scala 或者 Java 开发者皆可在一月内组建成一个高效的 Go 团队。 原因 7:强大的生态系统 对我们这么大小的团队(大约 20 人)而言,生态系统很重要。如果你需要重做每块功能,那就无法为客户创造收益了。Go 有着强大的工具支持,面向 Redis、RabbitMQ、PostgreSQL、Template parsing、Task scheduling、Expression parsing 和 RocksDB 的稳定的库。 Go 的生态系统相比于 Rust、Elixir 这样的语言有很大的优势。当然,它又略逊于 Java、Python 或 Node 这样的语言,但它很稳定,而且你会发现在很多基础需求上,已经有高质量的文件包可用了。 原因 8:GOFMT,强制代码格式 Gofmt 是一种强大的命令行功能,内建在 Go 的编译器中来规定代码的格式。从功能上看,它类似于 Python 的 autopep8。格式一致很重要,但实际的格式标准并不总是非常重要。Gofmt 用一种官方的形式规格代码,避免了不必要的讨论。 原因 9:gRPC 和 Protocol Buffers Go 语言对 protocol buffers 和 gRPC 有一流的支持。这两个工具能一起友好地工作以构建需要通过 RPC 进行通信的微服务器(microservices)。我们只需要写一个清单(manifest)就能定义 RPC 调用发生的情况和参数,然后从该清单将自动生成服务器和客户端代码。这样产生代码不仅快速,同时网络占用也非常少。
游客2q7uranxketok 2021-02-07 20:07:03 0 浏览量 回答数 0

回答

Checkpoint介绍 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。 每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。 CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。 2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理 3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示 两个输入源 checkpoint 过程 假设算子C有A和B两个输入源 在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。 当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。 当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。 持久化存储 目前,Checkpoint持久化存储可以使用如下三种: MemStateBackend 该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及 快照的数据量非常小时使用,并不推荐用作大规模商业部署。 FsStateBackend 该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。 RocksDBStateBackend RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。 如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。 语法 ​ val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) ​ 例子 需求 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理 数据规划 使用自定义算子每秒钟产生大约10000条数据。 
 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 
 数据经统计后,统计结果打印到终端输出。 
 打印输出的结果为Long类型的数据。 
 开发思路 
 source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 window算子每隔1秒钟统计一次最近4秒钟内数据数量。 每隔1秒钟将统计结果打印到终端 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 //发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int) class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } } /** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } } //该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count } //该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 从自定义快照中恢复状态 override def restoreState(state: util.List[UDFState]): Unit = { val udfState = state.get(0) total = udfState.getState } // 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } } flink-SQL Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL: org.apache.flink flink-table_2.11 1.5.0 另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加: org.apache.flink flink-scala_2.11 1.5.0 Table API和SQL程序的结构 Flink的批处理和流处理的Table API和SQL程序遵循相同的模式; 所以我们只需要使用一种来演示即可 要想执行flink的SQL语句,首先需要获取SQL的执行环境: 两种方式(batch和streaming): // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责: 在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment 在内部目录中注册一个表 TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。 输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统 输入表可以从各种来源注册: 现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。 注册一个表 // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable) // Table is the result of a simple projection query val projTable: Table = tableEnv.scan("projectedTable ").select(...) 注册一个tableSource TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource) 注册一个tableSink 注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 例子 //创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path("data1.csv")//文件路径 .field("id" , Types.INT)//第一列数据 .field("name" , Types.STRING)//第二列数据 .field("age" , Types.INT)//第三列数据 .fieldDelimiter(",")//列分隔符,默认是"," .lineDelimiter("\n")//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource("tableA" , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'") //将数据写出去 table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2") //// //将数据写出去 // sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成 1:将DataStream或DataSet转换为表格 在上面的例子讲解中,直接使用的是:registerTableSource注册表 对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。 然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据 语法: // get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) 例子 object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = { // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) //构造数据 val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))) // 根据数据注册表 tEnv.registerDataStream("OrderA", orderA) tEnv.registerDataStream("OrderB", orderB) // union the two tables val result = tEnv.sqlQuery( "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) env.execute() } } case class Order(user: Long, product: String, amount: Int) 将表转换为DataStream或DataSet A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序 1:将表转换为DataStream 有两种模式可以将 Table转换为DataStream: 1:Append Mode 将一个表附加到流上 2:Retract Mode 将表转换为流 语法格式: // get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子: object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env.fromCollection(data) val table: Table = tEnv.fromDataStream(stream) //TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table) //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息 val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table) retractStream.print() env.execute() } } case class Peoject(user: Long, index: Int, content: String) 将表转换为DataSet 语法格式 // get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子: case class Peoject(user: Long, index: Int, content: String) object TableTODataSet{ def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) //初始化环境,加载table数据 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnvironment = TableEnvironment.getTableEnvironment(env) val collection: DataSet[Peoject] = env.fromCollection(data) val table: Table = tableEnvironment.fromDataSet(collection) //TODO 将table转换为dataSet val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table) toDataSet.print() // env.execute() } }
凹凹凸曼 2020-06-16 19:23:12 0 浏览量 回答数 0

回答

Java Java核心技术·卷 I(原书第10版)| Core Java Volume 讲的很全面,书中的代码示例都很好,很适合Java入门。 但是作者不太厚道的是把现在没人用的GUI编程放在了第一卷,基本上10~13章是可以不用读的。 Java性能权威指南|Java Performance: The Definitive Guide 市面上介绍Java的书有很多,但专注于Java性能的并不多,能游刃有余地展示Java性能优化难点的更是凤毛麟角,本书即是其中之一。 通过使用JVM和Java平台,以及Java语言和应用程序接口,本书详尽讲解了Java性能调优的相关知识,帮助读者深入理解Java平台性能的各个方面,最终使程序如虎添翼。 实战Java高并发程序设计|葛一鸣 由部分段落的行文来看,搬了官方文档。 也有一些第一人称的叙述和思考,也能看出作者也是花了一点心思的。胜在比较基础,涉及到的知识点也还很全面(讲到了流水线计算和并发模型这些边边角角的),但是由于是编著,全书整体上不够统一和深入,适合作为学习高并发的第一本工具书。 Java 8实战 对Java8的新特性讲解的十分到位,尤其是lamdba表达式和流的操作。 再者对于Java8并发处理很有独到见解。对于并行数据处理和组合式异步编程还需要更深的思考才能更加掌握。 推荐给再用java8但没有去真正了解的人看,有很多你不知道的细节、原理和类库设计者的用心良苦在里面、内容没有很难,抽出几个小时就能看完,花费的时间和收获相比,性价比很高。 Java并发编程实战 先不谈本书的内容如何,光书名就足够吸引不少目光。“并发”这个词在Java世界里往往和“高级、核心”等字眼相联系起来,就冲着这两个字,都将勾起软件工程师们埋藏在心底那种对技术的探索欲和对高级API的驾驭感。 程序员嘛,多少都有点职业病。其实Java对“并发”优化从未停止过,从5.0到7.0,几乎每个版本的新特性里,都会针对前一版本在“并发”上有所改进。这种改进包括提供更丰富的API接口、JVM底层性能优化等诸多方面。 Thinking in Java 很美味的一本书,不仅有icecreamm,sundae,sandwich,还有burrito!真是越看越饿啊~ Effective Java中文版(第3版)|Effective Java Third Edition Java 高阶书籍,小白劝退。介绍了关于Java 编程的90个经验技巧。 作者功力非常强悍,导致这本书有时知识面迁移很广。总之,非常适合有一定Java开发经验的人阅读提升。 深入理解Java虚拟机(第3版)| 周志明 浅显易懂。最重要的是开启一扇理解虚拟机的大门。 内存管理机制与Java内存模型、高效并发这三章是特别实用的。 Java虚拟机规范(Java SE 8版)|爱飞翔、周志明 整本书就觉得第二章的方法字节码执行流程,第四章的前8节和第五章能看懂一些。其他的过于细致和琐碎了。 把Java字节码讲的很清楚了,本质上Java虚拟机就是通过字节码来构建的一套体系罢了。所以字节码说的非常细致深入。 数据&大数据 数据结构与算法分析|Data Structures and Algorithm Analysis in Java 数据结构是计算机的核心,这部书以java语言为基础,详细的介绍了基本数据结构、图、以及相关的排序、最短路径、最小生成树等问题。 但是有一些高级的数据结构并没有介绍,可以通过《数据结构与算法分析——C语言描述》来增加对这方面的了解。 MySQL必知必会 《MySQL必知必会》MySQL是世界上最受欢迎的数据库管理系统之一。 书中从介绍简单的数据检索开始,逐步深入一些复杂的内容,包括联结的使用、子查询、正则表达式和基于全文本的搜索、存储过程、游标、触发器、表约束,等等。通过重点突出的章节,条理清晰、系统而扼要地讲述了读者应该掌握的知识,使他们不经意间立刻功力大增。 数据库系统概念|Datebase System Concepts(Fifth Edition) 从大学读到现在,每次拿起都有新的收获。而且这本书还是对各个数据相关领域的概览,不仅仅是数据库本身。 高性能MySQL 对于想要了解MySQL性能提升的人来说,这是一本不可多得的书。 书中没有各种提升性能的秘籍,而是深入问题的核心,详细的解释了每种提升性能的原理,从而可以使你四两拨千斤。授之于鱼不如授之于渔,这本书做到了。 高可用MySQL 很实用的书籍,只可惜公司现有的业务和数据量还没有达到需要实践书中知识的地步。 利用Python进行数据分析|唐学韬 内容还是跟不上库的发展速度,建议结合里面讲的库的文档来看。 内容安排上我觉得还不错,作者是pandas的作者,所以对pandas的讲解和设计思路都讲得很清楚。除此以外,作者也是干过金融数据分析的,所以后面专门讲了时间序列和金融数据的分析。 HBase 看完影印版第一遍,开始以为会是大量讲API,实际上除了没有将HBase源代码,该讲的都讲了,CH8,9章留到最后看的,确实有点顿悟的感觉,接下来需要系统的看一遍Client API,然后深入代码,Come ON! Programming Hive Hive工具书,Hive高级特性。 Hadoop in Practice| Alex Holmes 感觉比action那本要强 像是cookbook类型的 整个过完以后hadoop生态圈的各种都接触到了 这本书适合当参考手册用。 Hadoop技术内幕|董西成 其实国人能写这样的书,感觉还是不错的,不过感觉很多东西不太深入,感觉在深入之前,和先有整体,带着整体做深入会更好一点, jobclient,jobtracer,tasktracer之间的关系最好能系统化 Learning Spark 很不错,core的原理部分和api用途解释得很清楚,以前看文档和代码理解不了的地方豁然开朗。 不足的地方是后几章比较弱,mllib方面没有深入讲实现原理。graphx也没有涉及 ODPS权威指南 基本上还算一本不错的入门,虽然细节方面谈的不多,底层也不够深入,但毕竟是少有的ODPS书籍,且覆盖面很全,例子也还行。 数据之巅|徐子沛 从一个新的视角(数据)切入,写美国历史,统计学的发展贯穿其中,草蛇灰线,伏脉千里,读起来波澜壮阔。 消息队列&Redis RabbitMQ实战 很多年前的书了,书中的例子现在已经不适用了,推荐官方教程。 一些基础还是适用,网上也没有太多讲rab的书籍,将就看下也行,我没用过所以…. Apache Kafka源码剖析|徐郡明 虽然还没看,但知道应该不差。我是看了作者的mybatis源码分析,再来看这本的,相信作者。 作者怎么有这么多时间,把框架研究的这么透彻,佩服,佩服。 深入理解Kafka:核心设计与实践原理|朱忠华 通俗易懂,图文并茂,用了很多图和示例讲解kafka的架构,从宏观入手,再讲到细节,比较好,值得推荐。 深入理解Kafka是市面上讲解Kafka核心原理最透彻的,全书都是挑了kafka最核心的细节在讲比如分区副本选举、分区从分配、kafka数据存储结构、时间轮、我认为是目前kafka相关书籍里最好的一本。 Kafka 认真刷了 kafka internal 那章,看了个talk,算是入了个门。 系统设计真是门艺术。 RocketMQ实战与原理解析|杨开元 对RocketMQ的脉络做了一个大概的说明吧,深入细节的东西还是需要自己看代码 Redis设计与实现|黄健宏 部分内容写得比较啰嗦,当然往好了说是对新手友好,不厌其烦地分析细节,但也让整本书变厚了,个人以为精炼语言可以减少20%的内容。 对于有心一窥redis实现原理的读者来说,本书展露了足够丰富的内容和细节,却不至于让冗长的实现代码吓跑读者——伪代码的意义在此。下一步是真正读源码了。 Redis 深度历险:核心原理与应用实践|钱文品 真心不错,数据结构原理+实际应用+单线程模型+集群(sentinel, codis, redis cluster), 分布式锁等等讲的都十分透彻。 一本书的作用不就是系统性梳理,为读者打开一扇窗,读者想了解更多,可以自己通过这扇窗去Google。这本书的一个瑕疵是最后一章吧,写的仓促了。不过瑕不掩瑜。 技术综合 TCP/IP详解 卷1:协议 读专业性书籍是一件很枯燥的事,我的建议就是把它作为一本手册,先浏览一遍,遇到问题再去详细查,高效。 Netty in Action 涉及到很多专业名词新概念看英文原版顺畅得多,第十五章 Choosing the right thread model 真是写得太好了。另外结合Ron Hitchens 写的《JAVA NIO》一起看对理解JAVA NIO和Netty还是很有帮助的 ZooKeeper 值得使用zookeeper的人员阅读, 对于zookeeper的内部机制及api进行了很详细的讲解, 后半部分深入地讲解了zookeeper中ensemble互相协作的流程, 及group等高级配置, 对zookeeper的高级应用及其它类似系统的设计都很有借鉴意义. 从Paxos到Zookeeper|倪超 分布式入门鼻祖,开始部分深入阐述cap和base理论,所有的分布式框架都是围绕这个理论的做平衡和取舍,中间 zk的原理、特性、实战也讲的非常清晰,同时讲cap理论在zk中是如何体现,更加深你对cap的理解. 深入理解Nginx(第2版)|陶辉 云里雾里的快速读了一遍,主要是读不懂,读完后的感受是设计的真好。 原本是抱着了解原理进而优化性能的想法来读的,却发现书中的内容都是讲源码,作者对源码的注释超级详细,非常适合开发者,但不适合使用者,给个五星好评是因为不想因为我这种菜鸡而埋没了高质量内容。 另外别人的代码写的真好看,即便是过程式语言程序也吊打我写的面向对象语言程序。 作者是zookeeper的活跃贡献者,而且是很资深的研究员,内容比较严谨而且较好的把握住了zk的精髓。书很薄,但是没有废话,选题是经过深思熟虑的。 深入剖析Tomcat 本书深入剖析Tomcat 4和Tomcat 5中的每个组件,并揭示其内部工作原理。通过学习本书,你将可以自行开发Tomcat组件,或者扩展已有的组件。 Tomcat是目前比较流行的Web服务器之一。作为一个开源和小型的轻量级应用服务器,Tomcat 易于使用,便于部署,但Tomcat本身是一个非常复杂的系统,包含了很多功能模块。这些功能模块构成了Tomcat的核心结构。本书从最基本的HTTP请求开始,直至使用JMX技术管理Tomcat中的应用程序,逐一剖析Tomcat的基本功能模块,并配以示例代码,使读者可以逐步实现自己的Web服务器。 深入理解计算机系统 | 布莱恩特 无论是内容还是纸张印刷,都是满分。计算机学科的集大成之作。引导你如何练内功的,算是高配版本的计算机导论,目的是釜底抽薪引出来操作系统、组成原理这些专业核心的课程。帮助我们按图索骥,点亮一个一个技能树。 架构探险分布式服务框架 | 李业兵 刚看前几章的时候,心里满脑子想得都是这特么贴一整页pom文件代码上来干鸡毛,又是骗稿费的,买亏了买亏了,后来到序列化那章开始,诶?还有那么点意思啊。 到服务注册中心和服务通讯,60块钱的书钱已经赚回来了。 知识是无价的,如果能花几十块钱帮你扫了几个盲区,那就是赚了。 深入分析JavaWeb技术内幕 | 许令波 与这本书相识大概是四年前是在老家的北方图书城里,当时看到目录的感觉是真的惊艳,对当时刚入行的自己来说,这简直就是为我量身定做的扫盲科普集啊。 但是可惜的是,这本书在后来却一直没机会读上。然后经过四年的打怪升级之后,这次的阅读体验依旧很好。 其中,java编译原理、 Servlet工作原理、 Tomcat、spring和iBatis这几章的收获很大。 前端 jQuery 技术内幕| 高云 非常棒的一本书,大大降低了阅读jquery源码的难度(虽然还是非常难)。 Head First HTML与CSS(第2版) 翻了非常久的时间 断断续续 其实从头翻到尾 才发现一点都不难。 可我被自己的懒惰和畏难情绪给拖累了 简单说 我成了自己往前探索的负担。网页基础的语法基本都涵盖了 限于文本形态 知识点都没法像做题一样被反复地运用和复习到。通俗易懂 这不知算是多高的评价? 作为入门真心算不错了 如果更有耐心 在翻完 HTML 后 对 CSS 部分最好是可以迅速过一遍 找案例练习估计更好 纸上得来终觉浅 总是这样。 JavaScript高级程序设计(第3版) JavaScript最基础的书籍,要看认真,慢慢地看,累计接近1000小时吧。而且对象与继承,性能优化,HTML5 api由于没有实践或缺乏代码阅读量导致看的很糊涂,不过以后可以遇到时再翻翻,或者看更专业的书。 深入理解ES6 Zakas的又一部杰作,他的作品最优秀的地方在于只是阐述,很少评价,这在帮助我们夯实基础时十分有意义,我也喜欢这种风格。 我是中英文参照阅读的,译本后半部分有一些文字上的纰漏,但是总体来说忠实原文,水平还是相当不错,希望再版时可以修复这些文字问题。 高性能JavaScript 还是挺不错的。尤其是对初学者。总结了好多程序方面的好习惯。 不过对于老手来说,这些常识已经深入骨髓了。 深入浅出Node.js|朴灵 本书是我看到现在对Node.JS技术原理和应用实践阐述的最深入,也最全面的一本书。鉴于作者也是淘宝的一位工程师,在技术总是国外好的大环境下,没有理由不给本书五颗星。 作者秉着授人于鱼不如授人于渔的精神,细致入微的从V8虚拟机,内存管理,字符串与Buffer的应用,异步编程的思路和原理这些基础的角度来解释Node.JS是如何工作的,比起市面上众多教你如何安装node,用几个包编写一些示例来比,本书绝对让人受益匪浅。 认真看完本书,几乎可以让你从一个Node的外行进阶到专家的水平。赞! 总结 其实我觉得在我们现在这个浮躁的社会,大家闲暇时间都是刷抖音,逛淘宝,微博……他们都在一点点吞噬你的碎片时间,如果你尝试着去用碎片的时间看看书,我想时间久了你自然能体会这样的好处。 美团技术团队甚至会奖励读完一些书本的人,很多公司都有自己的小图书馆,我觉得挺好的。 文章来自:敖丙
剑曼红尘 2020-03-20 14:52:22 0 浏览量 回答数 0

问题

service内rpc调用其他service报错?报错

有一个提供数据服务的Aservice连接的A库A表;A服务启动正常。B服务引用了Aservice的api;B服务连接另外的数据库B。在启动B服务时就会报B库中找不到A表的错误(B库中确实无A表&#x...
爱吃鱼的程序员 2020-06-06 16:16:53 0 浏览量 回答数 1

问题

程序员报错QA大分享(1)

程序员报错QA征集第一弹来了哦~包含QA分享一期征集的部分内容,链接附带解决方案,可收藏哦~ npm install安装依赖一直报错?报错https://developer.aliyun.com/ask/301...
问问小秘 2020-06-18 15:46:14 1684 浏览量 回答数 2

云产品推荐

上海奇点人才服务相关的云产品 小程序定制 上海微企信息技术相关的云产品 国内短信套餐包 ECS云服务器安全配置相关的云产品 开发者问答 阿里云建站 自然场景识别相关的云产品 万网 小程序开发制作 视频内容分析 视频集锦 代理记账服务 阿里云AIoT