【flink番外篇】13、Broadcast State 模式示例-简单模式匹配(1)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文详细的介绍了通过broadcast state的实现简单的模式匹配,其中需要用到KeyedBroadcastProcessFunction。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

一、示例:按照分组规则进行图形匹配-KeyedBroadcastProcessFunction

本示例是简单的应用broadcast state实现简单模式匹配,即实现:
1、按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。
2、相同颜色的规则1:长方形后是三角形
3、相同颜色的规则2:正方形后是长方形

如匹配上述规则1或规则2则输出匹配成功。

1、maven依赖

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>

	</dependencies>

2、实现

package org.tablesql.join;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * 
 * @LastEditors: alanchan
 * 
 * @Description: 按照相同颜色进行分组,在相同颜色组中按照规则进行匹配。相同颜色的规则1:长方形后是三角形;规则2:正方形后是长方形
 */
public class TestJoinDimKeyedBroadcastProcessFunctionDemo {

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Shape {
        private String name;
        private String desc;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Colour {
        private String name;
        private Long blue;
        private Long red;
        private Long green;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Item {
        private Shape shape;
        private Colour color;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Rule {
        private String name;
        private Shape first;
        private Shape second;
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // item 实时流
        DataStream<Item> itemStream = env.socketTextStream("192.168.10.42", 9999)
                .map(o -> {
                    // 解析item流
                    // 数据结构:Item[shape(name,desc);color(name,blue,red,green)]
                    String[] lines = o.split(";");
                    String[] shapeString = lines[0].split(",");
                    String[] colorString = lines[1].split(",");
                    Shape shape = new Shape(shapeString[0],shapeString[1]);
                    Colour color = new Colour(colorString[0],Long.valueOf(colorString[1]),Long.valueOf(colorString[2]),Long.valueOf(colorString[3]));
                    return new Item(shape,color);
                });

        // rule 实时流
        DataStream<Rule> ruleStream = env.socketTextStream("192.168.10.42", 8888)
                .map(o -> {
                    // 解析rule流
                    // 数据结构:Rule[name;shape(name,desc);shape(name,desc)]
                    String[] lines = o.split(";");
                    String name = lines[0];
                    String[] firstShapeString = lines[1].split(",");
                    String[] secondShapeString = lines[2].split(",");
                    Shape firstShape = new Shape(firstShapeString[0],firstShapeString[1]);
                    Shape secondShape = new Shape(secondShapeString[0],secondShapeString[1]);
                    return new Rule(name,firstShape,secondShape);
                }).setParallelism(1);

        // 将图形使用颜色进行划分
        KeyedStream<Item, Colour> colorPartitionedStream = itemStream
                .keyBy(new KeySelector<Item, Colour>() {

                    @Override
                    public Colour getKey(Item value) throws Exception {
                        return value.getColor();// 实现分组
                    }
                });

        colorPartitionedStream.print("colorPartitionedStream:---->");

        // 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
        MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
                "RulesBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Rule>() {
                }));

        // 将rule定义为广播流,广播规则并且创建 broadcast state
        BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

        // 连接,输出流,connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入。
        DataStream<String> output = colorPartitionedStream
                .connect(ruleBroadcastStream)
                .process(
                        // KeyedBroadcastProcessFunction 中的类型参数表示:
                        // 1. key stream 中的 key 类型
                        // 2. 非广播流中的元素类型
                        // 3. 广播流中的元素类型
                        // 4. 结果的类型,在这里是 string
                        new KeyedBroadcastProcessFunction<Colour, Item, Rule, String>() {
                            // 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
                            // 用一个数组来存储,因为同时可能有很多第一个元素正在等待
                            private final MapStateDescriptor<String, List<Item>> itemMapStateDesc = new MapStateDescriptor<>(
                                    "items",
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    new ListTypeInfo<>(Item.class));

                            // 与之前的 ruleStateDescriptor 相同,用于存储规则名称与规则本身的 map 存储结构
                            private final MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
                                    "RulesBroadcastState",
                                    BasicTypeInfo.STRING_TYPE_INFO,
                                    TypeInformation.of(new TypeHint<Rule>() {
                                    }));

                            // 负责处理广播流的元素        
                            @Override
                            public void processBroadcastElement(Rule ruleValue,
                                    KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.Context ctx,
                                    Collector<String> out) throws Exception {
                                // 得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
                                // 查询元素的时间戳:ctx.timestamp()
                                // 查询目前的Watermark:ctx.currentWatermark()
                                // 目前的处理时间(processing time):ctx.currentProcessingTime()
                                // 产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)    

                                // 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
                                ctx.getBroadcastState(ruleStateDescriptor).put(ruleValue.getName(), ruleValue);
                            }

                            // 负责处理另一个流的元素
                            @Override
                            public void processElement(Item itemValue,
                                    KeyedBroadcastProcessFunction<Colour, Item, Rule, String>.ReadOnlyContext ctx,
                                    Collector<String> out) throws Exception {

                                final MapState<String, List<Item>> itemMapState = getRuntimeContext().getMapState(itemMapStateDesc);
                                final Shape shape = itemValue.getShape();

                                System.out.println("shape:"+shape);

                                // 在 getBroadcastState() 方法中传入的 stateDescriptor 应该与调用 .broadcast(ruleStateDescriptor) 的参数相同
                                ReadOnlyBroadcastState<String, Rule> readOnlyBroadcastState = ctx.getBroadcastState(ruleStateDescriptor);
                                Iterable<Entry<String, Rule>> iterableRule = readOnlyBroadcastState.immutableEntries();

                                for (Entry<String, Rule> entry : iterableRule) {
                                    final String ruleName = entry.getKey();
                                    final Rule rule = entry.getValue();

                                    // 初始化
                                    List<Item> itemStoredList = itemMapState.get(ruleName);
                                    if (itemStoredList == null) {
                                        itemStoredList = new ArrayList<>();
                                    }

                                    // 比较 shape 
                                    if (shape.getName().equals(rule.second.getName()) && !itemStoredList.isEmpty()) {
                                        for (Item item : itemStoredList) {
                                            // 符合规则,收集匹配结果
                                            out.collect("匹配成功: " + item + " - " + itemValue);
                                        }
                                        itemStoredList.clear();
                                    }

                                    // 规则连续性设置
                                    if (shape.getName().equals(rule.first.getName())) {
                                        itemStoredList.add(itemValue);
                                    }

                                    // 
                                    if (itemStoredList.isEmpty()) {
                                        itemMapState.remove(ruleName);
                                    } else {
                                        itemMapState.put(ruleName, itemStoredList);
                                    }
                                }
                            }
                        });

        output.print("output:------->");


        env.execute();

    }

    
}

3、验证

在netcat中启动两个端口,分别是8888和9999,8888输入规则,9999输入item,然后关键控制台输出。

1)、规则输入

red;rectangle,is a rectangle;tripe,is a tripe
green;square,is a square;rectangle,is a rectangle

2)、item输入

# 匹配成功
rectangle,is a rectangle;red,100,100,100
tripe,is a tripe;red,100,100,100

# 匹配成功
square,is square;green,150,150,150
rectangle,is a rectangle;green,150,150,150

# 匹配不成功
tripe,is tripe;blue,200,200,200

# 匹配成功
rectangle,is a rectangle;blue,100,100,100
tripe,is a tripe;blue,100,100,100

# 匹配不成功
tripe,is a tripe;blue,100,100,100
rectangle,is a rectangle;blue,100,100,100

3)、控制台输出

colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle)
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))shape:TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe)
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=red, blue=100, red=100, green=100))
colorPartitionedStream:---->:9> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
output:------->:9> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=square, desc=is square), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=green, blue=150, red=150, green=150))
colorPartitionedStream:---->:3> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=200, red=200, green=200))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
output:------->:1> 匹配成功: TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100)) - TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=tripe, desc=is a tripe), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))
colorPartitionedStream:---->:1> TestJoinDimKeyedBroadcastProcessFunctionDemo.Item(shape=TestJoinDimKeyedBroadcastProcessFunctionDemo.Shape(name=rectangle, desc=is a rectangle), color=TestJoinDimKeyedBroadcastProcessFunctionDemo.Colour(name=blue, blue=100, red=100, green=100))

以上,本文详细的介绍了通过broadcast state的实现简单的模式匹配,其中需要用到KeyedBroadcastProcessFunction。