博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink广播的使用
阅读量:2055 次
发布时间:2019-04-28

本文共 2526 字,大约阅读时间需要 8 分钟。

官网参考地址:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/batch/index.html#broadcast-variables

广播的定义:

Broadcast variables allow you to make a data set available to all parallel instances of an operation, in addition to the regular(常规) input of the operation. This is useful for auxiliary(辅助的) data sets, or data-dependent parameterization. The data set will then be accessible at the operator as a Collection.

  • Broadcast: broadcast sets are registered by name via withBroadcastSet(DataSet, String)
     可以使用withBroadcastSet方法,通过名字注册一个广播
  • Access: accessible via getRuntimeContext().getBroadcastVariable(String) at the target operator.
  使用getRuntimeContext().getBroadcastVariable(String) 方法根据名字获取广播变量
直接上代码:

 

package com.daxinimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala._import org.apache.flink.api.scala.extensions._import org.apache.flink.configuration.Configurationimport scala.collection.JavaConverters._  //asScala需要使用隐式转换/**  * Created by Daxin on 2017/4/16.  */object Broadcast {  def main(args: Array[String]) {    val env = ExecutionEnvironment.getExecutionEnvironment    val toBroadcast = env.fromElements(1, 2, 3)    val data = env.fromElements("1", "2", "5")    /**      * 如下是RichMapFunction的注释:      * Rich variant of the MapFunction. As a RichFunction, it gives access to      * the RuntimeContext and provides setup and teardown methods:      * RichFunction.open(org.apache.flink.configuration.Configuration) and RichFunction.close().      * 
RichMapFunction是MapFunction的变体,RichFunction可以访问运行时上下文(RuntimeContext) * 并提供开启和关闭方法 *
*/ val result = data.map(new RichMapFunction[String, String]() { var broadcastSet: Traversable[Integer] = null override def open(config: Configuration): Unit = { // 3. Access the broadcasted DataSet as a Collection broadcastSet = getRuntimeContext().getBroadcastVariable[Integer]("broadcastSetName").asScala } def map(in: String): String = { //... if (broadcastSet.toList.contains(in.toInt)) in //随便简单返回字符串 else in + " " + broadcastSet.toList.size + " " + broadcastSet.toList.contains(in) + " " + broadcastSet.toList(0).getClass //随便简单返回 } }).withBroadcastSet(toBroadcast, "broadcastSetName") // 2. Broadcast the DataSet result.print() }}

由于广播的获取是根据名字的,所以需要注册和获取时候名字一致!关于广播完整实例可以参考:

注意:

   由于广播变量保存在集群的每一个节点的内存中,因此广播变量不应该太大,对于简单的变量像标量值,可以使该变量或者参数成为函数闭包的一部分,或者使用org.apache.flink.api.scala.DataSet#withParameters方法通过Config传递。

转载地址:http://yfjlf.baihongyu.com/

你可能感兴趣的文章
Java多线程知识点总结
查看>>
Java集合框架知识梳理
查看>>
java中IO流知识梳理
查看>>
word2010如何保持在公式后面键入空格后或添加文字不变小?
查看>>
笔试题(一)—— java基础
查看>>
笔试题(二)—— sql语句
查看>>
Redis学习笔记(二)— 在linux下搭建redis服务器
查看>>
Redis学习笔记(三)—— 使用redis客户端连接windows和linux下的redis并解决无法连接redis的问题
查看>>
Eclipse配置错误——An internal error occurred during: "Building workspace".GC overhead limit exceeded
查看>>
Intellij IDEA使用(一)—— 安装Intellij IDEA(ideaIU-2017.2.3)并完成Intellij IDEA的简单配置
查看>>
Intellij IDEA使用(二)—— 在Intellij IDEA中配置JDK(SDK)
查看>>
Intellij IDEA使用(三)——在Intellij IDEA中配置Tomcat服务器
查看>>
Intellij IDEA使用(四)—— 使用Intellij IDEA创建静态的web(HTML)项目
查看>>
Intellij IDEA使用(五)—— Intellij IDEA在使用中的一些其他常用功能或常用配置收集
查看>>
Intellij IDEA使用(六)—— 使用Intellij IDEA创建Java项目并配置jar包
查看>>
Eclipse配置错误 —— Syntax error, annotations are only available if source level is 1.5 or greater
查看>>
Eclipse使用(十)—— 使用Eclipse创建简单的Maven Java项目
查看>>
Eclipse使用(十一)—— 使用Eclipse创建简单的Maven JavaWeb项目
查看>>
Intellij IDEA使用(十三)—— 在Intellij IDEA中配置Maven
查看>>
面试题 —— 关于main方法的十个面试题
查看>>