当前位置: 首页 > news >正文

java高性能处理10G大文件

背景:读取10G csv文件,然后 根据交易种类分分组,找出每个交易种类的交易最大值,最后给出最大的那个交易信息。

难点:最主要的是怎么快速读文件?

涉及的功能点:MappedByteBuffer 读文件方式、异步返回处理、treeset排序、线程池处理、分段切割排序

处理方式:

1,使用 MappedByteBuffer 读文件 。这里最主要是怎么提取csv中需要的列,怎么划分行,很麻烦(根据\r 字符识别换行,根据,逗号识别列)(漏洞:这里没有处理切割临界数据,太麻烦了

2,多线程分块读取  (可以分块读取的前提是:可以指定文件内容下标读取)

3,把所有文件放到集合中

4,分组处理,异步 在每个分组中找出最大值(如果每个组的数据很多,那么且是单向比较,可以分段找出最大值)

5,最终比较每个分组的最大值。就是最终结果

MappedByteBuffer 可以先获取文件内容的总长度,然后根据机器线程处理核数,把文件分割成对应的核数个数。然后在各自的线程中做单线程处理。

这样就是最快的。重复利用了机器的处理能力。

线程的数量不能多,否则即使多了,只会徒增线程切换带来的消耗,而不能提高性能。

核心代码:

public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();if (args.length == 0) {path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";} else {path = args[0];}raf = new RandomAccessFile(new File(path), "r");FileChannel channel = raf.getChannel();//文件内容总长度long fileSize = channel.size();// 获取头文件,把文件头剔除掉MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);int headIndex = readHead(buffer, fileSize);//分割的每个文件的长度long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);ArrayList<FutureTask> threadList = new ArrayList(threadNum);// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。for (int i = 0; i < threadNum; i++) {int finalI = i;//异步读文件处理Callable callable1 = () -> {return  read(preSize * finalI, preSize);};FutureTask futureTask1 = new FutureTask(callable1);threadList.add(futureTask1);new Thread(futureTask1).start();}//使用callable获取每个文件的读取结果  汇总到一个集合中List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);for (int i = 0; i < threadList.size(); i++) {trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());}HashMap<String, Trade> allData = new HashMap<>();trade2ListPre.stream().forEach(m ->{allData.putAll(m);});long end = System.currentTimeMillis();
//        System.out.println(end - startTime);// 遍历所有数据,分组处理HashMap<String, TreeSet<Trade>> hashMap = new HashMap();for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {if (entry == null) {continue;}String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));if (hashMap.containsKey(key) && hashMap.get(key) != null) {hashMap.get(key).add(entry.getValue());} else {TreeSet<Trade> tradeTreeSet = new TreeSet();tradeTreeSet.add(entry.getValue());hashMap.put(key, tradeTreeSet);}}//再次异步,在每个分组数据中,循环找出最大值hashMap.keySet().stream().forEach(m -> {threadPool2.execute(() -> {TreeSet<Trade> treeSet = hashMap.get(m);Trade maxTrade = treeSet.first();Trade preTrade = treeSet.first();for (Trade curTrade : treeSet) {int volume = curTrade.volume - preTrade.volume;preTrade = curTrade;if (volume > maxTrade.volume) {maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);maxTrade.volume = volume;}}if (result.volume < maxTrade.volume) {result = maxTrade;}});});//线程池为空的时候 关闭线程池,也意味着数据处理完毕threadPool2.shutdown();while (!threadPool2.isTerminated()) {}end = System.currentTimeMillis();
//        System.out.println(end - startTime);//输出最终结果System.out.println(result);}

所有代码:

package test3;import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.*;
import java.util.concurrent.*;public class Solution1 {static int threadNum =  Runtime.getRuntime().availableProcessors();static RandomAccessFile raf;static String path;static Trade result = new Trade();static ThreadPoolExecutor threadPool2 =new ThreadPoolExecutor(threadNum, threadNum, 10, TimeUnit.MILLISECONDS, new LinkedBlockingDeque());public static void main(String[] args) throws Exception {long startTime = System.currentTimeMillis();if (args.length == 0) {path = "D:\\RaceFile\\0002.csv";
//            path = "D:\\RaceFile\\marketdata\\marketdata.csv";} else {path = args[0];}raf = new RandomAccessFile(new File(path), "r");FileChannel channel = raf.getChannel();//文件内容总长度long fileSize = channel.size();// 获取头文件,把文件头剔除掉MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, 3000);int headIndex = readHead(buffer, fileSize);//分割的每个文件的长度long preSize = (long) Math.ceil((fileSize - headIndex) / threadNum);ArrayList<FutureTask> threadList = new ArrayList(threadNum);// 异步读取每个分区,通过分割个数*文件长度 就可以计算出每个分割块的读取起始下标。for (int i = 0; i < threadNum; i++) {int finalI = i;//异步读文件处理Callable callable1 = () -> {return  read(preSize * finalI, preSize);};FutureTask futureTask1 = new FutureTask(callable1);threadList.add(futureTask1);new Thread(futureTask1).start();}//使用callable获取每个文件的读取结果  汇总到一个集合中List<HashMap<String, Trade>> trade2ListPre =new ArrayList(threadNum);for (int i = 0; i < threadList.size(); i++) {trade2ListPre.add((HashMap<String, Trade>) threadList.get(i).get());}HashMap<String, Trade> allData = new HashMap<>();trade2ListPre.stream().forEach(m ->{allData.putAll(m);});long end = System.currentTimeMillis();
//        System.out.println(end - startTime);// 遍历所有数据,分组处理HashMap<String, TreeSet<Trade>> hashMap = new HashMap();for (HashMap.Entry<String, Trade> entry : allData.entrySet()) {if (entry == null) {continue;}String key = entry.getKey().substring(0, entry.getKey().indexOf(MARK));if (hashMap.containsKey(key) && hashMap.get(key) != null) {hashMap.get(key).add(entry.getValue());} else {TreeSet<Trade> tradeTreeSet = new TreeSet();tradeTreeSet.add(entry.getValue());hashMap.put(key, tradeTreeSet);}}//再次异步,在每个分组数据中,循环找出最大值hashMap.keySet().stream().forEach(m -> {threadPool2.execute(() -> {TreeSet<Trade> treeSet = hashMap.get(m);Trade maxTrade = treeSet.first();Trade preTrade = treeSet.first();for (Trade curTrade : treeSet) {int volume = curTrade.volume - preTrade.volume;preTrade = curTrade;if (volume > maxTrade.volume) {maxTrade = new Trade(curTrade.time, curTrade.updateTime, curTrade.updateMillisec,curTrade.volume, curTrade.exchangeID, curTrade.instrumentID);maxTrade.volume = volume;}}if (result.volume < maxTrade.volume) {result = maxTrade;}});});//线程池为空的时候 关闭线程池,也意味着数据处理完毕threadPool2.shutdown();while (!threadPool2.isTerminated()) {}end = System.currentTimeMillis();
//        System.out.println(end - startTime);//输出最终结果System.out.println(result);}/**MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, start, end-3);StandardCharsets.UTF_8.decode(buffer).toString()*/public static HashMap<String, Trade> read(long position, long size) throws IOException {FileChannel channel = raf.getChannel();MappedByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, position, size);return readLine(buffer, position, 0, size);}public static HashMap<String, Trade> readLine(MappedByteBuffer buffer, long position, int start, long end) {HashMap<String, Trade> instruTimeGroupMap = new HashMap<>();StringBuilder lineBuilder = new StringBuilder();int countComma = 0;List<String> dataItem = new ArrayList<>();dataItem.add(String.valueOf(position));for (int i = start + 1; i < end - 1; i++) {byte b = buffer.get();if (b == '\n') {try {if (dataItem.size() == 6) {handleData(dataItem, instruTimeGroupMap,i);}} catch (Exception e) {
//                    e.printStackTrace();} finally {countComma = 0;dataItem.clear();dataItem.add(String.valueOf(position));}}//获取需要的列if(b == ','){countComma++;if(countComma==11 || countComma==20|| countComma==21|| countComma==22|| countComma==45){dataItem.add(lineBuilder.toString());}lineBuilder.setLength(0);}else{lineBuilder.append((char) b);}}return instruTimeGroupMap;}static final String MARK = "_$_";public static void handleData(List<String> columns, HashMap<String, Trade> instruTimeGroupMap,int offset) {//0 19 20 21 22 23 24 25 45String minute =columns.get(2).substring(0, 5);StringBuilder sb = new StringBuilder();sb.append(columns.get(4)).append(MARK).append(minute);String key = sb.toString();Trade trade =new Trade(columns.get(2), columns.get(3), Integer.parseInt(columns.get(1)),columns.get(5), columns.get(4), Long.parseLong(columns.get(0)),offset);//同一个合约 保留position最大的instruTimeGroupMap.put(key, trade);}public static int readHead(MappedByteBuffer buffer, long size) throws IOException {for (int i = 0; i < size; i++) {byte b = buffer.get(i);if (b == '\n') {return i;}}return 0;}
}class Trade implements Comparable<Trade> {long position;int offset;String time;String updateTime;String updateMillisec;int volume;String exchangeID;String instrumentID;public Trade(){}public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID) {this.time = updateTime.substring(0, 5);this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;}public Trade(String updateTime, String updateMillisec, int volume, String exchangeID, String instrumentID,long position,int offset) {this.time = updateTime.substring(0, 5);this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;this.position=position;this.offset=offset;}public Trade(String time, String updateTime, String updateMillisec, int volume, String exchangeID,String instrumentID) {this.time = time;this.updateTime = updateTime;this.updateMillisec = updateMillisec;this.volume = volume;this.exchangeID = exchangeID;this.instrumentID = instrumentID;}@Overridepublic int compareTo(Trade o) {if(this.position-o.position >0){return 1;}if(this.position-o.position ==0 && this.offset>o.offset){return 1;}return -1;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.time).append(",").append(this.exchangeID).append(",").append(this.volume).append(",").append(this.instrumentID);return sb.toString();}
}class BadData implements Comparable<BadData>{long position;int size;@Overridepublic int compareTo(BadData o) {if(this.position>o.position){return 1;}return -1;}
}


http://www.mrgr.cn/news/58680.html

相关文章:

  • 测试WIFI和以太网的TCP带宽、UDP带宽和丢包率、延时
  • 配置nginx服务通过ip访问多网站
  • NVR管理平台EasyNVR多品牌NVR管理工具/设备多协议兼容性:摄像头拉流RTSP和GB28181的区别
  • 桥接模式,外界与主机通,与虚拟机不通
  • 3.cpp基本数据类型
  • 【mysql进阶】2-3. 日志简介
  • 7、哈希表
  • C#从零开始学习(用户界面)(unity Lab4)
  • 软考:缓存击穿和缓存穿透
  • Vue 自定义指令 Directive 的高级使用与最佳实践
  • 线程池——Java
  • Redis和MySQL如何保证数据一致性
  • 洛谷 P1130 红牌
  • 鸿蒙UI系统组件17——富文本展示(RichText)
  • 批量归一化(Batch Normalization)
  • Python爬虫教程:从入门到精通
  • 考研要求掌握的C语言程度(堆排序)1
  • 【数据结构初阶】二叉树---堆
  • 总结性标题:高效导入文本数据,探索 MySQL 与 Java 的最佳实践
  • kaggle在线训练深度学习模型
  • moment.js 获取相关时间节点(今天、本周、本月、本季度、本年)
  • 安全见闻---清风
  • 2024mathorcup大数据竞赛选题建议及思路来啦!
  • 大数据治理平台建设规划方案(71页WORD)
  • 【后端秘籍】【JVM】第二篇
  • 【永中软件-注册/登录安全分析报告】