当前位置: 首页 > news >正文

Seatunnel解决ftp读取json文件无法读取数组以及格式化之后的json无法解析的问题

问题原因

在JsonRead这个方法里面 在源码中使用的逻辑是读取一行 然后把这个json进行解析
但是这样存在一个问题 比如如果json的格式是这样的
{
name:“zhangsan”,
age:25
}
如果是这样的话 第一行读到的内容就是 {
显然 一个 { 并不是一个json 这样会导致解析json失败

问题解决的思路

我的方法是将整个文件中的内容全部解析
然后使用Seatunnel中自带的JackJson这个工具类进行解析
然后再获取到单个的Json对象 之后再解析成一个Json的字符串
因为解析过之后的Json字符串肯定不存在换行 所以这种换行的问题算是规避了
但是这样又引发了另一个问题就是 一下子加载全部的文件内容可能会导致内存飙升 而且解析json 构造对象这个过程也是比较耗费资源的
但是我目前没有想出来更好的方法
我目前的业务需求是 这种ftp的文件都是小文件 不存在特别大的json 所以我的这个方法是可以完成现在的需求的

修改代码的内容

要修改的代码的位置是
org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java

    @Overridepublic void readProcess(String path,String tableId,Collector<SeaTunnelRow> output,InputStream inputStream,Map<String, String> partitionsMap,String currentFileName)throws IOException {InputStream actualInputStream;switch (compressFormat) {case LZO:LzopCodec lzo = new LzopCodec();actualInputStream = lzo.createInputStream(inputStream);break;case NONE:actualInputStream = inputStream;break;default:log.warn("Json file does not support this compress type: {}",compressFormat.getCompressCodec());actualInputStream = inputStream;break;}try (BufferedReader reader =new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {//TODO wxt 优先使用之前的方法try{reader.lines().forEach(line -> {try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",line);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}});}catch (Exception e){//region 我修改的内容//首先读取全部的内容// 将 BufferedReader 内容读取到一个 StringStringWriter stringWriter = new StringWriter();String line;while ((line = reader.readLine()) != null) {stringWriter.write(line);}String jsonContent = stringWriter.toString();// 判断 JSON 类型并处理ObjectMapper objectMapper = new ObjectMapper();JsonNode jsonNode = objectMapper.readTree(jsonContent);if (jsonNode.isArray()) {// 遍历数组并转换为单行字符串for (JsonNode node : jsonNode) {String singleLineJson = objectMapper.writeValueAsString(node);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}} else if (jsonNode.isObject()) {String singleLineJson = objectMapper.writeValueAsString(jsonNode);// region 这一部分是我直接从上面复制下来的try {SeaTunnelRow seaTunnelRow =deserializationSchema.deserialize(singleLineJson.getBytes(StandardCharsets.UTF_8));if (isMergePartition) {int index = seaTunnelRowType.getTotalFields();for (String value : partitionsMap.values()) {seaTunnelRow.setField(index++, value);}}seaTunnelRow.setTableId(tableId);output.collect(seaTunnelRow);} catch (IOException e1) {String errorMsg =String.format("Deserialize this jsonFile data [%s] failed, please check the origin data",singleLineJson);throw new FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,errorMsg,e);}// endregion}//endregion}}}

http://www.mrgr.cn/news/78980.html

相关文章:

  • 如何使用brew安装phpredis扩展?
  • 【C语言】递归的内存占用过程
  • vue获取yyyyMMddHHmmss格式的日期
  • 【洛谷】P5738 【深基7.例4】歌唱比赛(详细注解)
  • Go-MediatR:Go语言中的中介者模式
  • Oracle LinuxR7安装Oracle 12.2 RAC集群实施(DNS解析)
  • AllegroHand 四指灵巧手:机器人领域的创新力量
  • 十,[极客大挑战 2019]Secret File1
  • SciPy Optimize和 CVXPY对比
  • Selenium常见问题
  • 生态环境影像评价、遥感解译与GIS技术生态环境影像评价制作
  • k8s的数据库etcd报 etcdserver: mvcc: database space exceeded的处理办法
  • 三维地形图计算软件(四)-用PYQT5+vtk画任意多面示例
  • Android显示系统(02)- OpenGL ES - 概述
  • 活着就好20241204
  • 项目开发中Vue3和Vue2如何选择?
  • c语言基础之二维数组
  • OD B卷 - 实现 【BOSS的收入】
  • 吉林大学23级数据结构上机实验(第7周)
  • 使用 CFD 仿真进行阀门性能分析:第 II 部分
  • Java 基础面试题
  • Spring入园须知
  • 计算机毕业设计Python+卷积神经网络股票预测系统 股票推荐系统 股票可视化 股票数据分析 量化交易系统 股票爬虫 股票K线图 大数据毕业设计 AI
  • cf EC 172 C(0->-1 的转化+区间和使用前缀和表示,化简式子)+ D(二维的信息,先对一维排序,另一维看情况分析)
  • C语言:指针与数组
  • playwright 学习复仇记-2 Selector选择器定位元素