当前位置: 首页 > news >正文

Flink转换算子——flatMap/map/filter/keyby/reduce综合案例

需求: 对流数据中的单词进行统计,排除敏感词TMD【腾讯美团滴滴】
此处用到了一个windows版本的软件 netcat,具体用法,先解压,然后在路径中输入cmd,来到黑窗口。

 官网地址:netcat 1.11 for Win32/Win64
Netcat介绍及安装使用_netcat安装-CSDN博客

服务端的启动:

image.png


客户端就是双击 nc.exe 即可,里面无需写 nc 命令。

image.png

假如你想随时随地使用nc这个命令,需要配置环境变量。
代码演示:

package com.bigdata.day03;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.expressions.In;
import org.apache.flink.util.Collector;public class ZongHeDemo {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. socket-加载数据DataStream<String> dataStreamSource = env.socketTextStream("localhost", 8889);dataStreamSource.filter(new FilterFunction<String>() {@Overridepublic boolean filter(String line) throws Exception {return !line.contains("TMD");}}).flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String line, Collector<String> collector) throws Exception {String[] arr = line.split(" ");for (String word : arr) {collector.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String s) throws Exception {return Tuple2.of(s,1);}}).keyBy(v -> v.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> reduce(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> t1) throws Exception {return Tuple2.of(tuple2.f0,tuple2.f1 + t1.f1);}}).print();//4. sink-数据输出//5. execute-执行env.execute();}
}
package com.bigdata.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;class JdbcSource extends RichSourceFunction<String> {Connection connection;PreparedStatement statement;@Overridepublic void open(Configuration parameters) throws Exception {//使用jdbc//Class.forName("com.jdbc.cj.mysql.Driver");connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");statement = connection.prepareStatement("select word from fuck_words");}@Overridepublic void close() throws Exception {statement.close();connection.close();}@Overridepublic void run(SourceContext<String> ctx) throws Exception {ResultSet resultSet = statement.executeQuery();while(resultSet.next()){String word = resultSet.getString("word");ctx.collect(word);}}@Overridepublic void cancel() {}
}public class _07综合案例 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载mysql数据库数据DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost", 8889);//3. transformation-数据处理转换DataStreamSource<String> jdbcSource = env.addSource(new JdbcSource());jdbcSource.print();ArrayList<String> words = new ArrayList<>();jdbcSource.map(new MapFunction<String, String>() {@Overridepublic String map(String value) throws Exception {words.add(value);return value;}}).print();// 此路不通,因为我们的代码不是顺序执行的,而且我们的算子还是并行运行的  words没有任何值,悬案!System.out.println(words);String[] arr = {"tmd","fuck"};// 此处的list 只能读取,不能修改和删除List<String> list = Arrays.asList(arr);dataStreamSource.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split("\\s+");for (String word : arr) {// 此处完全可以直接将 不要的单词过滤掉,也可以将来使用filter方法过滤out.collect(word);}}}).filter(new FilterFunction<String>() {@Overridepublic boolean filter(String value) throws Exception {return !list.contains(value);}}).map(new MapFunction<String, Tuple2<String,Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return new Tuple2<>(value,1);}}).keyBy(new KeySelector<Tuple2<String, Integer>, String>() {@Overridepublic String getKey(Tuple2<String, Integer> value) throws Exception {return value.f0;}}).sum(1).print();//4. sink-数据输出//5. execute-执行env.execute();}
}


http://www.mrgr.cn/news/77923.html

相关文章:

  • android 自定义SwitchCompat,Radiobutton,SeekBar样式
  • 73.矩阵置零 python
  • NVIDIA发布GeForce RTX 50 系列,售价549美元起
  • 简单易用的PDF工具箱
  • SpringBoot项目实战(39)--Beetl网页HTML文件中静态图片及CSS、JS文件的引用和展示
  • 探索AGI:智能助手与自我赋能的新时代
  • git使用教程
  • Python学习------第十一天
  • 小白学多线程(持续更新中)
  • 数据结构 (5)栈
  • TCP socket api详解
  • Android 常用命令和工具解析之GPU相关
  • 数字信号处理(Digital Signal Procession)总结
  • 从搭建uni-app+vue3工程开始
  • Linux高阶——1117—TCP客户端服务端
  • HarmonyOS:使用ArkWeb构建页面
  • 工具学习_Docker
  • 用Tauri框架构建跨平台桌面应用:1、Tauri快速开始
  • 学习python的第十三天之函数——函数的返回值
  • 如何使用docker、docker挂载数据,以及让docker使用宿主机器的GPU环境 + docker重启小妙招
  • 华为云鸿蒙应用入门级开发者认证考试题库(理论题和实验题)
  • 论文阅读——Intrusion detection systems using longshort‑term memory (LSTM)
  • 阅读《先进引信技术的发展与展望》识别和控制部分_笔记
  • Glide源码学习
  • 【AI技术赋能有限元分析应用实践】将FEniCS 软件安装在Ubuntu22.04
  • 预训练模型与ChatGPT:自然语言处理的革新与前景